folly/folly/channels/test/ChannelTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <folly/channels/Channel.h>
#include <folly/channels/test/ChannelTestUtil.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/portability/GTest.h>

namespace folly {
namespace channels {

using namespace testing;

class ChannelFixture : public Test,
                       public WithParamInterface<ConsumptionMode>,
                       public ChannelConsumerBase<int> {
 protected:
  ChannelFixture() : ChannelConsumerBase(GetParam()) {}

  ~ChannelFixture() override {
    cancellationSource_.requestCancellation();
    if (!continueConsuming_.isFulfilled()) {
      continueConsuming_.setValue(false);
    }
    executor_.drain();
  }

  folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor() override {
    return &executor_;
  }

  void onNext(Try<int> result) override { onNext_(std::move(result)); }

  folly::ManualExecutor executor_;
  StrictMock<MockNextCallback<int>> onNext_;
};

TEST_P(ChannelFixture, SingleWriteBeforeNext_ThenCancelled) {
  auto [receiver, sender] = Channel<int>::create();
  sender.write(1);

  EXPECT_CALL(onNext_, onValue(1));
  EXPECT_CALL(onNext_, onCancelled());

  startConsuming(std::move(receiver));
  executor_.drain();

  EXPECT_FALSE(sender.isReceiverCancelled());

  cancellationSource_.requestCancellation();
  executor_.drain();

  EXPECT_TRUE(sender.isReceiverCancelled());
}

TEST_P(ChannelFixture, SingleWriteAfterNext_ThenCancelled) {
  auto [receiver, sender] = Channel<int>::create();

  startConsuming(std::move(receiver));
  executor_.drain();

  EXPECT_CALL(onNext_, onValue(1));
  EXPECT_CALL(onNext_, onCancelled());

  sender.write(1);
  executor_.drain();

  EXPECT_FALSE(sender.isReceiverCancelled());

  cancellationSource_.requestCancellation();
  executor_.drain();

  EXPECT_TRUE(sender.isReceiverCancelled());
}

TEST_P(ChannelFixture, MultipleWrites_ThenStopConsumingByReturningFalse) {
  auto [receiver, sender] = Channel<int>::create();

  startConsuming(std::move(receiver));
  executor_.drain();

  EXPECT_CALL(onNext_, onValue(1));
  EXPECT_CALL(onNext_, onValue(2));
  EXPECT_CALL(onNext_, onValue(3));

  sender.write(1);
  sender.write(2);
  executor_.drain();

  sender.write(3);
  sender.write(4);
  continueConsuming_ = folly::SharedPromise<bool>();
  continueConsuming_.setValue(false);
  executor_.drain();
}

TEST_P(
    ChannelFixture,
    MultipleWrites_ThenStopConsumingByThrowingOperationCancelled) {
  auto [receiver, sender] = Channel<int>::create();

  startConsuming(std::move(receiver));
  executor_.drain();

  EXPECT_CALL(onNext_, onValue(1));
  EXPECT_CALL(onNext_, onValue(2));
  EXPECT_CALL(onNext_, onValue(3));

  sender.write(1);
  sender.write(2);
  executor_.drain();

  sender.write(3);
  sender.write(4);
  continueConsuming_ = folly::SharedPromise<bool>();
  continueConsuming_.setException(folly::OperationCancelled());
  executor_.drain();
}

TEST_P(ChannelFixture, Close_NoException_BeforeSubscribe) {
  auto [receiver, sender] = Channel<int>::create();

  std::move(sender).close();

  EXPECT_CALL(onNext_, onClosed());

  startConsuming(std::move(receiver));
  executor_.drain();
}

TEST_P(ChannelFixture, Close_NoException_AfterSubscribeAndWrite) {
  auto [receiver, sender] = Channel<int>::create();

  sender.write(1);
  std::move(sender).close();

  EXPECT_CALL(onNext_, onValue(1));
  EXPECT_CALL(onNext_, onClosed());

  startConsuming(std::move(receiver));
  executor_.drain();
}

TEST_P(ChannelFixture, Close_DueToDestruction_BeforeSubscribe) {
  auto [receiver, sender] = Channel<int>::create();

  { auto toDestroy = std::move(sender); }

  EXPECT_CALL(onNext_, onClosed());

  startConsuming(std::move(receiver));
  executor_.drain();
}

TEST_P(ChannelFixture, Close_DueToDestruction_AfterSubscribeAndWrite) {
  auto [receiver, sender] = Channel<int>::create();

  EXPECT_CALL(onNext_, onValue(1));
  EXPECT_CALL(onNext_, onClosed());

  startConsuming(std::move(receiver));

  sender.write(1);
  { auto toDestroy = std::move(sender); }

  executor_.drain();
}

TEST_P(ChannelFixture, Close_Exception_BeforeSubscribe) {
  auto [receiver, sender] = Channel<int>::create();

  std::move(sender).close(std::runtime_error("Error"));

  EXPECT_CALL(onNext_, onRuntimeError("std::runtime_error: Error"));

  startConsuming(std::move(receiver));
  executor_.drain();
}

TEST_P(ChannelFixture, Close_Exception_AfterSubscribeAndWrite) {
  auto [receiver, sender] = Channel<int>::create();

  sender.write(1);
  std::move(sender).close(std::runtime_error("Error"));

  EXPECT_CALL(onNext_, onValue(1));
  EXPECT_CALL(onNext_, onRuntimeError("std::runtime_error: Error"));

  startConsuming(std::move(receiver));
  executor_.drain();
}

TEST_P(ChannelFixture, CancellationRespected) {
  auto [receiver, sender] = Channel<int>::create();

  EXPECT_CALL(onNext_, onValue(1));

  continueConsuming_ = folly::SharedPromise<bool>();
  sender.write(1);
  startConsuming(std::move(receiver));
  executor_.drain();

  EXPECT_FALSE(sender.isReceiverCancelled());

  cancellationSource_.requestCancellation();
  executor_.drain();

  EXPECT_TRUE(sender.isReceiverCancelled());
}

TEST(Channel, CancelNextWithoutClose) {
  folly::ManualExecutor executor;
  folly::CancellationSource cancelSource;
  auto [receiver, sender] = Channel<int>::create();

  sender.write(1);
  EXPECT_EQ(folly::coro::blockingWait(receiver.next()).value(), 1);

  auto nextTask =
      folly::coro::co_withCancellation(
          cancelSource.getToken(),
          folly::coro::co_invoke(
              [&receiver_2 =
                   receiver]() -> folly::coro::Task<std::optional<int>> {
                co_return co_await receiver_2.next(false /* closeOnCancel */);
              }))
          .scheduleOn(&executor)
          .start();
  executor.drain();

  cancelSource.requestCancellation();
  sender.write(2);
  executor.drain();

  EXPECT_THROW(
      folly::coro::blockingWait(std::move(nextTask)),
      folly::OperationCancelled);

  EXPECT_EQ(folly::coro::blockingWait(receiver.next()).value(), 2);
}

INSTANTIATE_TEST_SUITE_P(
    Channel_Coro_WithTry,
    ChannelFixture,
    testing::Values(ConsumptionMode::CoroWithTry));

INSTANTIATE_TEST_SUITE_P(
    Channel_Coro_WithoutTry,
    ChannelFixture,
    testing::Values(ConsumptionMode::CoroWithoutTry));

INSTANTIATE_TEST_SUITE_P(
    Channel_Callback_WithHandle,
    ChannelFixture,
    testing::Values(ConsumptionMode::CallbackWithHandle));

class ChannelFixtureStress : public Test,
                             public WithParamInterface<ConsumptionMode> {
 protected:
  ChannelFixtureStress()
      : producer_(std::make_unique<StressTestProducer<int>>(
            [value = 0]() mutable { return value++; })),
        consumer_(std::make_unique<StressTestConsumer<int>>(
            GetParam(), [lastReceived = -1](int value) mutable {
              EXPECT_EQ(value, ++lastReceived);
            })) {}

  static constexpr std::chrono::milliseconds kTestTimeout =
      std::chrono::milliseconds{5000};

  std::unique_ptr<StressTestProducer<int>> producer_;
  std::unique_ptr<StressTestConsumer<int>> consumer_;
};

TEST_P(ChannelFixtureStress, Close_NoException) {
  auto [receiver, sender] = Channel<int>::create();

  producer_->startProducing(std::move(sender), std::nullopt /* closeEx */);
  consumer_->startConsuming(std::move(receiver));

  /* sleep override */
  std::this_thread::sleep_for(kTestTimeout);
  producer_->stopProducing();
  EXPECT_EQ(consumer_->waitForClose().get(), CloseType::NoException);
}

TEST_P(ChannelFixtureStress, Close_Exception) {
  auto [receiver, sender] = Channel<int>::create();

  producer_->startProducing(std::move(sender), std::runtime_error("Error"));
  consumer_->startConsuming(std::move(receiver));

  /* sleep override */
  std::this_thread::sleep_for(kTestTimeout);
  producer_->stopProducing();
  EXPECT_EQ(consumer_->waitForClose().get(), CloseType::Exception);
}

TEST_P(ChannelFixtureStress, Cancelled) {
  auto [receiver, sender] = Channel<int>::create();

  producer_->startProducing(std::move(sender), std::nullopt /* closeEx */);
  consumer_->startConsuming(std::move(receiver));

  /* sleep override */
  std::this_thread::sleep_for(kTestTimeout);

  consumer_->cancel();
  EXPECT_EQ(consumer_->waitForClose().get(), CloseType::Cancelled);
  producer_->stopProducing();
}

TEST_P(ChannelFixtureStress, Close_NoException_ThenCancelledImmediately) {
  auto [receiver, sender] = Channel<int>::create();

  producer_->startProducing(std::move(sender), std::nullopt /* closeEx */);
  consumer_->startConsuming(std::move(receiver));

  /* sleep override */
  std::this_thread::sleep_for(kTestTimeout);
  producer_->stopProducing();
  consumer_->cancel();
  EXPECT_THAT(
      consumer_->waitForClose().get(),
      AnyOf(Eq(CloseType::NoException), Eq(CloseType::Cancelled)));
}

TEST_P(ChannelFixtureStress, Cancelled_ThenClosedImmediately_NoException) {
  auto [receiver, sender] = Channel<int>::create();

  producer_->startProducing(std::move(sender), std::nullopt /* closeEx */);
  consumer_->startConsuming(std::move(receiver));

  /* sleep override */
  std::this_thread::sleep_for(kTestTimeout);
  consumer_->cancel();
  producer_->stopProducing();
  EXPECT_THAT(
      consumer_->waitForClose().get(),
      AnyOf(Eq(CloseType::NoException), Eq(CloseType::Cancelled)));
}

INSTANTIATE_TEST_SUITE_P(
    Channel_Coro_WithTry,
    ChannelFixtureStress,
    testing::Values(ConsumptionMode::CoroWithTry));

INSTANTIATE_TEST_SUITE_P(
    Channel_Coro_WithoutTry,
    ChannelFixtureStress,
    testing::Values(ConsumptionMode::CoroWithoutTry));

INSTANTIATE_TEST_SUITE_P(
    Channel_Callback_WithHandle,
    ChannelFixtureStress,
    testing::Values(ConsumptionMode::CallbackWithHandle));
} // namespace channels
} // namespace folly