chromium/chromecast/cast_core/grpc/grpc_unary_handler.h

// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef CHROMECAST_CAST_CORE_GRPC_GRPC_UNARY_HANDLER_H_
#define CHROMECAST_CAST_CORE_GRPC_GRPC_UNARY_HANDLER_H_

#include <grpcpp/grpcpp.h>

#include "base/functional/callback.h"
#include "base/logging.h"
#include "chromecast/cast_core/grpc/cancellable_reactor.h"
#include "chromecast/cast_core/grpc/grpc_handler.h"
#include "chromecast/cast_core/grpc/grpc_server_reactor.h"
#include "chromecast/cast_core/grpc/grpc_status_or.h"
#include "chromecast/cast_core/grpc/server_reactor_tracker.h"
#include "chromecast/cast_core/grpc/trackable_reactor.h"

namespace cast {
namespace utils {

// A generic handler for unary, ie request/response, gRPC APIs. Can only be used
// with rpc that have the following signature:
//       rpc Foo(Request) returns (Response)
//
// - TService is the gRPC service type.
// - TRequest is the service request type.
// - TResponse is the service response type.
// - MethodName is the rpc method as a string.
//
// This class is not thread-safe. Appropriate means have to be added by the
// users to guarantee thread-safety (ie task runners, mutexes etc).
template <typename TService,
          typename TRequest,
          typename TResponse,
          const char* MethodName>
class GrpcUnaryHandler final : public GrpcHandler {
 public:
  using ReactorBase = GrpcServerReactor<TRequest, TResponse>;

  class Reactor : public ReactorBase {
   public:
    using ReactorBase::name;
    using ReactorBase::Write;

    using OnRequestCallback = base::RepeatingCallback<void(TRequest, Reactor*)>;

    template <typename... TArgs>
    explicit Reactor(OnRequestCallback on_request_callback, TArgs&&... args)
        : ReactorBase(std::forward<TArgs>(args)...),
          on_request_callback_(std::move(on_request_callback)) {
      ReadRequest();
    }

   protected:
    using ReactorBase::Finish;
    using ReactorBase::ReadRequest;
    using ReactorBase::StartRead;
    using ReactorBase::StartWriteAndFinish;

    // Implements GrpcServerReactor APIs.
    void WriteResponse(const grpc::ByteBuffer* buffer) override {
      DCHECK(buffer);
      FinishWriting(buffer, grpc::Status::OK);
    }

    void FinishWriting(const grpc::ByteBuffer* buffer,
                       const grpc::Status& status) override {
      DCHECK((status.ok() && buffer) || !status.ok())
          << "Either buffer must be set or status must flag an error";
      DVLOG(1) << "Reactor is finished: " << name()
               << ", status=" << GrpcStatusToString(status);
      if (status.ok()) {
        StartWriteAndFinish(buffer, grpc::WriteOptions(), grpc::Status::OK);
      } else {
        Finish(status);
      }
    }

    void OnResponseDone(const grpc::Status& status) override {
      // This method may be called from the cancelled_reactor as a generic way
      // to signal reactor is done via OnResponseDone API. For unary reactor it
      // is a no-op.
      CHECK(status.error_code() == grpc::StatusCode::ABORTED)
          << "Unexpected status: " << GrpcStatusToString(status);
    }

    void OnRequestDone(GrpcStatusOr<TRequest> request) override {
      if (!request.ok()) {
        FinishWriting(nullptr, request.status());
        return;
      }
      on_request_callback_.Run(std::move(request).value(), this);
    }

    OnRequestCallback on_request_callback_;
  };

  using Response = TResponse;
  using OnRequestCallback = typename Reactor::OnRequestCallback;

  GrpcUnaryHandler(OnRequestCallback on_request_callback,
                   ServerReactorTracker* server_reactor_tracker)
      : GrpcHandler(server_reactor_tracker),
        on_request_callback_(std::move(on_request_callback)) {}

  static std::string rpc_name() {
    return std::string("/") + TService::service_full_name() + "/" + MethodName;
  }

 private:
  // Implements GrpcHandler APIs.
  grpc::ServerGenericBidiReactor* CreateReactor(
      grpc::CallbackServerContext* context) override {
    return new CancellableReactor<TrackableReactor<Reactor>>(
        server_reactor_tracker(), on_request_callback_, rpc_name(), context);
  }

  OnRequestCallback on_request_callback_;
};

}  // namespace utils
}  // namespace cast

#endif  // CHROMECAST_CAST_CORE_GRPC_GRPC_UNARY_HANDLER_H_