chromium/chrome/services/sharing/nearby/platform/bidirectional_stream_unittest.cc

// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chrome/services/sharing/nearby/platform/bidirectional_stream.h"

#include <memory>
#include <string>
#include <vector>

#include "base/containers/span.h"
#include "base/memory/scoped_refptr.h"
#include "base/run_loop.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/thread_pool.h"
#include "base/test/bind.h"
#include "base/test/task_environment.h"
#include "base/threading/thread_restrictions.h"
#include "chrome/services/sharing/nearby/platform/input_stream_impl.h"
#include "chrome/services/sharing/nearby/platform/output_stream_impl.h"
#include "chromeos/ash/services/nearby/public/mojom/nearby_connections_types.mojom.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace nearby {
namespace chrome {

class BidirectionalStreamTest : public ::testing::Test {
 public:
  BidirectionalStreamTest()
      : task_runner_(
            base::ThreadPool::CreateSequencedTaskRunner({base::MayBlock()})) {}
  ~BidirectionalStreamTest() override = default;
  BidirectionalStreamTest(const BidirectionalStreamTest&) = delete;
  BidirectionalStreamTest& operator=(const BidirectionalStreamTest&) = delete;

  void SetUp() override {
    mojo::ScopedDataPipeProducerHandle receive_pipe_producer_handle;
    mojo::ScopedDataPipeConsumerHandle receive_pipe_consumer_handle;
    ASSERT_EQ(
        MOJO_RESULT_OK,
        mojo::CreateDataPipe(/*options=*/nullptr, receive_pipe_producer_handle,
                             receive_pipe_consumer_handle));
    receive_stream_ = std::move(receive_pipe_producer_handle);

    mojo::ScopedDataPipeProducerHandle send_pipe_producer_handle;
    mojo::ScopedDataPipeConsumerHandle send_pipe_consumer_handle;
    ASSERT_EQ(MOJO_RESULT_OK, mojo::CreateDataPipe(/*options=*/nullptr,
                                                   send_pipe_producer_handle,
                                                   send_pipe_consumer_handle));
    send_stream_ = std::move(send_pipe_consumer_handle);

    bidirectional_stream_ = std::make_unique<BidirectionalStream>(
        connections::mojom::Medium::kBluetooth, task_runner_,
        std::move(receive_pipe_consumer_handle),
        std::move(send_pipe_producer_handle));
    EXPECT_TRUE(bidirectional_stream_->GetInputStream());
    EXPECT_TRUE(bidirectional_stream_->GetOutputStream());
  }

  void TearDown() override { bidirectional_stream_.reset(); }

 protected:
  base::test::TaskEnvironment task_environment_;
  scoped_refptr<base::SequencedTaskRunner> task_runner_;
  mojo::ScopedDataPipeProducerHandle receive_stream_;
  mojo::ScopedDataPipeConsumerHandle send_stream_;
  std::unique_ptr<BidirectionalStream> bidirectional_stream_;
};

TEST_F(BidirectionalStreamTest, Read) {
  std::string message = "ReceivedMessage";
  size_t bytes_written = 0;
  EXPECT_EQ(MOJO_RESULT_OK, receive_stream_->WriteData(
                                base::as_byte_span(message),
                                MOJO_WRITE_DATA_FLAG_NONE, bytes_written));
  EXPECT_EQ(bytes_written, message.size());
  EXPECT_EQ(Exception::kSuccess, bidirectional_stream_->GetInputStream()
                                     ->Read(message.size())
                                     .exception());

  // Can't read after streams are closed
  EXPECT_EQ(MOJO_RESULT_OK, receive_stream_->WriteData(
                                base::as_byte_span(message),
                                MOJO_WRITE_DATA_FLAG_NONE, bytes_written));
  EXPECT_EQ(bytes_written, message.size());
  EXPECT_EQ(Exception::kSuccess, bidirectional_stream_->Close().value);
  EXPECT_EQ(Exception::kIo, bidirectional_stream_->GetInputStream()
                                ->Read(message.size())
                                .exception());
}

TEST_F(BidirectionalStreamTest, Write) {
  std::string message = "SentMessage";
  EXPECT_EQ(Exception::kSuccess, bidirectional_stream_->GetOutputStream()
                                     ->Write(ByteArray{message})
                                     .value);
  std::vector<char> buffer(1024);
  size_t bytes_read = 0;
  EXPECT_EQ(
      MOJO_RESULT_OK,
      send_stream_->ReadData(MOJO_READ_DATA_FLAG_NONE,
                             base::as_writable_byte_span(buffer), bytes_read));

  // Can't write after streams are closed
  EXPECT_EQ(Exception::kSuccess, bidirectional_stream_->Close().value);
  EXPECT_EQ(Exception::kIo, bidirectional_stream_->GetOutputStream()
                                ->Write(ByteArray{message})
                                .value);
}

TEST_F(BidirectionalStreamTest, CloseCalledFromMultipleThreads) {
  base::RunLoop run_loop;

  const size_t kNumThreads = 2;

  // Quit the run loop after Close() returns on all threads.
  size_t num_close_calls = 0;
  auto quit_callback =
      base::BindLambdaForTesting([&num_close_calls, &run_loop] {
        ++num_close_calls;
        if (num_close_calls == kNumThreads)
          run_loop.Quit();
      });

  // Call Close() from different threads simultaneously to ensure the stream is
  // shutdown gracefully.
  for (size_t thread = 0; thread < kNumThreads; ++thread) {
    base::ThreadPool::CreateSequencedTaskRunner({})->PostTaskAndReply(
        FROM_HERE, base::BindLambdaForTesting([this] {
          base::ScopedAllowBaseSyncPrimitivesForTesting allow;
          EXPECT_EQ(Exception::kSuccess, bidirectional_stream_->Close().value);
        }),
        quit_callback);
  }
  run_loop.Run();
}

}  // namespace chrome
}  // namespace nearby