chromium/chromecast/cast_core/grpc/grpc_unary_test.cc

// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/files/scoped_temp_dir.h"
#include "base/functional/bind.h"
#include "base/strings/strcat.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/bind_post_task.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/thread_pool.h"
#include "base/test/bind.h"
#include "base/test/task_environment.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "base/uuid.h"
#include "chromecast/cast_core/grpc/grpc_server.h"
#include "chromecast/cast_core/grpc/status_matchers.h"
#include "chromecast/cast_core/grpc/test_service.castcore.pb.h"
#include "chromecast/cast_core/grpc/test_service_extra.castcore.pb.h"
#include "chromecast/cast_core/grpc/test_utils.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace cast {
namespace utils {
namespace {

using ::cast::test::StatusIs;

const auto kEventTimeout = base::Seconds(1);
const auto kServerStopTimeout = base::Milliseconds(100);

class GrpcUnaryTest : public ::testing::Test {
 protected:
  GrpcUnaryTest() {
    CHECK(temp_dir_.CreateUniqueTempDir());
    endpoint_ =
        "unix:" +
        temp_dir_.GetPath()
            .AppendASCII(
                "cast-uds-" +
                base::Uuid::GenerateRandomV4().AsLowercaseString().substr(24))
            .value();
  }

  base::test::TaskEnvironment task_environment_{
      base::test::TaskEnvironment::TimeSource::MOCK_TIME};
  base::ScopedTempDir temp_dir_;
  std::string endpoint_;
};

TEST_F(GrpcUnaryTest, SyncUnaryCallSucceeds) {
  GrpcServer server;
  server.SetHandler<SimpleServiceHandler::SimpleCall>(
      base::BindLambdaForTesting(
          [](TestRequest request,
             SimpleServiceHandler::SimpleCall::Reactor* reactor) {
            EXPECT_EQ(request.foo(), "test_foo");
            TestResponse response;
            response.set_bar("test_bar");
            reactor->Write(std::move(response));
          }));
  ASSERT_THAT(server.Start(endpoint_), StatusIs(grpc::StatusCode::OK));

  SimpleServiceStub stub(endpoint_);
  auto call = stub.CreateCall<SimpleServiceStub::SimpleCall>();
  call.request().set_foo("test_foo");
  auto response = std::move(call).Invoke();
  CU_ASSERT_OK(response);
  EXPECT_EQ(response->bar(), "test_bar");

  test::StopGrpcServer(server, kServerStopTimeout);
}

TEST_F(GrpcUnaryTest, SyncUnaryCallReturnsErrorStatus) {
  GrpcServer server;
  server.SetHandler<SimpleServiceHandler::SimpleCall>(
      base::BindLambdaForTesting(
          [&](TestRequest request,
              SimpleServiceHandler::SimpleCall::Reactor* reactor) {
            EXPECT_EQ(request.foo(), "test_foo");
            reactor->Write(
                grpc::Status(grpc::StatusCode::NOT_FOUND, "Not found"));
          }));
  ASSERT_THAT(server.Start(endpoint_), StatusIs(grpc::StatusCode::OK));

  SimpleServiceStub stub(endpoint_);
  auto call = stub.CreateCall<SimpleServiceStub::SimpleCall>();
  call.request().set_foo("test_foo");
  auto response = std::move(call).Invoke();
  ASSERT_THAT(response.status(),
              StatusIs(grpc::StatusCode::NOT_FOUND, "Not found"));

  test::StopGrpcServer(server, kServerStopTimeout);
}

TEST_F(GrpcUnaryTest, SyncUnaryCallCancelledIfServerIsStopped) {
  GrpcServer server;
  base::WaitableEvent request_received_event{
      base::WaitableEvent::ResetPolicy::AUTOMATIC};
  SimpleServiceHandler::SimpleCall::Reactor* reactor;
  server.SetHandler<SimpleServiceHandler::SimpleCall>(
      base::BindLambdaForTesting(
          [&](TestRequest request,
              SimpleServiceHandler::SimpleCall::Reactor* r) {
            reactor = r;
            request_received_event.Signal();
          }));
  ASSERT_THAT(server.Start(endpoint_), StatusIs(grpc::StatusCode::OK));

  // Need to run the client request in a separate thread, so that the main test
  // thread can continue handling the case.
  base::ThreadPool::PostTask(
      FROM_HERE, base::BindLambdaForTesting([&]() {
        SimpleServiceStub stub(endpoint_);
        auto call = stub.CreateCall<SimpleServiceStub::SimpleCall>();
        call.request().set_foo("test_foo");
        auto response = std::move(call).Invoke();
        CU_ASSERT_OK(response);
      }));
  ASSERT_TRUE(request_received_event.TimedWait(kEventTimeout));
  // Allow first request to pass.
  reactor->Write(TestResponse());

  test::StopGrpcServer(server, kServerStopTimeout);

  // Server should not be available any more.
  SimpleServiceStub stub(endpoint_);
  auto call = stub.CreateCall<SimpleServiceStub::SimpleCall>();
  call.request().set_foo("test_foo");
  auto response = std::move(call).Invoke();
  ASSERT_THAT(response, StatusIs(grpc::StatusCode::UNAVAILABLE));
  ASSERT_FALSE(request_received_event.IsSignaled());
}

TEST_F(GrpcUnaryTest, AsyncUnaryCallSucceeds) {
  GrpcServer server;
  server.SetHandler<SimpleServiceHandler::SimpleCall>(
      base::BindLambdaForTesting(
          [](TestRequest request,
             SimpleServiceHandler::SimpleCall::Reactor* reactor) {
            EXPECT_EQ(request.foo(), "test_foo");
            TestResponse response;
            response.set_bar("test_bar");
            reactor->Write(std::move(response));
          }));
  ASSERT_THAT(server.Start(endpoint_), StatusIs(grpc::StatusCode::OK));

  SimpleServiceStub stub(endpoint_);
  auto call = stub.CreateCall<SimpleServiceStub::SimpleCall>();
  call.request().set_foo("test_foo");
  base::WaitableEvent response_received_event;
  std::move(call).InvokeAsync(
      base::BindLambdaForTesting([&](GrpcStatusOr<TestResponse> response) {
        CU_CHECK_OK(response);
        EXPECT_EQ(response->bar(), "test_bar");
        response_received_event.Signal();
      }));
  ASSERT_TRUE(response_received_event.TimedWait(kEventTimeout));

  test::StopGrpcServer(server, kServerStopTimeout);
}

TEST_F(GrpcUnaryTest, AsyncUnaryCallReturnsErrorStatus) {
  GrpcServer server;
  server.SetHandler<SimpleServiceHandler::SimpleCall>(
      base::BindLambdaForTesting(
          [&](TestRequest request,
              SimpleServiceHandler::SimpleCall::Reactor* reactor) {
            EXPECT_EQ(request.foo(), "test_foo");
            reactor->Write(
                grpc::Status(grpc::StatusCode::NOT_FOUND, "Not Found"));
          }));
  ASSERT_THAT(server.Start(endpoint_), StatusIs(grpc::StatusCode::OK));

  SimpleServiceStub stub(endpoint_);
  auto call = stub.CreateCall<SimpleServiceStub::SimpleCall>();
  call.request().set_foo("test_foo");
  base::WaitableEvent response_received_event;
  std::move(call).InvokeAsync(
      base::BindLambdaForTesting([&](GrpcStatusOr<TestResponse> response) {
        ASSERT_THAT(response.status(),
                    StatusIs(grpc::StatusCode::NOT_FOUND, "Not Found"));
        response_received_event.Signal();
      }));
  ASSERT_TRUE(response_received_event.TimedWait(kEventTimeout));

  test::StopGrpcServer(server, kServerStopTimeout);
}

TEST_F(GrpcUnaryTest, AsyncUnaryCallCancelledIfServerIsStopped) {
  GrpcServer server;
  base::WaitableEvent request_received_event;
  server.SetHandler<SimpleServiceHandler::SimpleCall>(
      base::BindLambdaForTesting(
          [&](TestRequest request,
              SimpleServiceHandler::SimpleCall::Reactor* reactor) {
            reactor->Write(TestResponse());
            request_received_event.Signal();
          }));
  ASSERT_THAT(server.Start(endpoint_), StatusIs(grpc::StatusCode::OK));

  SimpleServiceStub stub(endpoint_);
  auto call = stub.CreateCall<SimpleServiceStub::SimpleCall>();
  call.request().set_foo("test_foo");
  base::WaitableEvent response_received_event{
      base::WaitableEvent::ResetPolicy::AUTOMATIC};
  std::move(call).InvokeAsync(
      base::BindLambdaForTesting([&](GrpcStatusOr<TestResponse> response) {
        CU_ASSERT_OK(response);
        response_received_event.Signal();
      }));
  ASSERT_TRUE(response_received_event.TimedWait(kEventTimeout));

  test::StopGrpcServer(server, kServerStopTimeout);

  auto call1 = stub.CreateCall<SimpleServiceStub::SimpleCall>();
  std::move(call1).InvokeAsync(
      base::BindLambdaForTesting([&](GrpcStatusOr<TestResponse> response) {
        ASSERT_THAT(response, StatusIs(grpc::StatusCode::UNAVAILABLE));
        response_received_event.Signal();
      }));
  ASSERT_TRUE(response_received_event.TimedWait(kEventTimeout));
}

TEST_F(GrpcUnaryTest, SyncUnaryCallSucceedsExtra) {
  GrpcServer server;
  server.SetHandler<SimpleServiceExtraHandler::SimpleCall>(
      base::BindLambdaForTesting(
          [](TestExtraRequest request,
             SimpleServiceExtraHandler::SimpleCall::Reactor* reactor) {
            EXPECT_EQ(request.extra(), "test_extra");
            TestResponse response;
            response.set_bar("test_bar");
            reactor->Write(std::move(response));
          }));
  ASSERT_THAT(server.Start(endpoint_), StatusIs(grpc::StatusCode::OK));

  SimpleServiceExtraStub stub(endpoint_);
  auto call = stub.CreateCall<SimpleServiceExtraStub::SimpleCall>();
  call.request().set_extra("test_extra");
  auto response = std::move(call).Invoke();
  CU_ASSERT_OK(response);
  EXPECT_EQ(response->bar(), "test_bar");

  test::StopGrpcServer(server, kServerStopTimeout);
}

TEST_F(GrpcUnaryTest,
       AsyncUnaryCallCancelledByClientAndDestroyedBeforeServerShutdown) {
  GrpcServer server;
  SimpleServiceHandler::SimpleCall::Reactor* cancelled_reactor;
  base::WaitableEvent request_received_event;
  server.SetHandler<SimpleServiceHandler::SimpleCall>(
      base::BindLambdaForTesting(
          [&](TestRequest, SimpleServiceHandler::SimpleCall::Reactor* r) {
            cancelled_reactor = r;
            request_received_event.Signal();
          }));
  ASSERT_THAT(server.Start(endpoint_), StatusIs(grpc::StatusCode::OK));

  SimpleServiceStub stub(endpoint_);
  auto call = stub.CreateCall<SimpleServiceStub::SimpleCall>();
  base::WaitableEvent response_received_event;
  auto context = std::move(call).InvokeAsync(
      base::BindLambdaForTesting([&](GrpcStatusOr<TestResponse> response) {
        ASSERT_THAT(response, StatusIs(grpc::StatusCode::CANCELLED));
        response_received_event.Signal();
      }));
  ASSERT_TRUE(request_received_event.TimedWait(kEventTimeout));

  context.Cancel();
  ASSERT_TRUE(response_received_event.TimedWait(kEventTimeout));

  // Actually sleep to allow gRPC framework to propagate the cancellation to the
  // server side and mark the reactor cancelled.
  ASSERT_TRUE(
      test::WaitForPredicate(kEventTimeout, base::BindLambdaForTesting([&]() {
                               return cancelled_reactor->is_done();
                             })));

  // This releases the cancelled reactor on the server side and destroys it.
  cancelled_reactor->Write(TestResponse());
  ASSERT_TRUE(
      test::WaitForPredicate(kEventTimeout, base::BindLambdaForTesting([&]() {
                               return server.active_reactor_count() == 0;
                             })));
  ASSERT_EQ(server.active_reactor_count(), 0u);

  test::StopGrpcServer(server, kServerStopTimeout);
}

TEST_F(GrpcUnaryTest,
       AsyncUnaryCallCancelledByClientAndLeftActiveDuringServerShutdown) {
  GrpcServer server;
  base::WaitableEvent request_received_event;
  server.SetHandler<SimpleServiceHandler::SimpleCall>(
      base::BindLambdaForTesting(
          [&](TestRequest, SimpleServiceHandler::SimpleCall::Reactor*) {
            request_received_event.Signal();
          }));
  ASSERT_THAT(server.Start(endpoint_), StatusIs(grpc::StatusCode::OK));

  SimpleServiceStub stub(endpoint_);
  auto call = stub.CreateCall<SimpleServiceStub::SimpleCall>();
  base::WaitableEvent response_received_event;
  auto context = std::move(call).InvokeAsync(
      base::BindLambdaForTesting([&](GrpcStatusOr<TestResponse> response) {
        ASSERT_THAT(response, StatusIs(grpc::StatusCode::CANCELLED));
        response_received_event.Signal();
      }));
  ASSERT_TRUE(request_received_event.TimedWait(kEventTimeout));

  context.Cancel();
  ASSERT_TRUE(response_received_event.TimedWait(kEventTimeout));

  CHECK_EQ(server.active_reactor_count(), 1u);

  // All active reactors will be destroyed after this call.
  test::StopGrpcServer(server, kServerStopTimeout);
}

}  // namespace
}  // namespace utils
}  // namespace cast