// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_CAST_CORE_GRPC_GRPC_SERVER_STREAMING_CALL_H_
#define CHROMECAST_CAST_CORE_GRPC_GRPC_SERVER_STREAMING_CALL_H_
#include <grpcpp/grpcpp.h>
#include <grpcpp/support/client_callback.h>
#include "base/functional/callback.h"
#include "base/logging.h"
#include "base/memory/raw_ptr.h"
#include "chromecast/cast_core/grpc/grpc_call.h"
#include "chromecast/cast_core/grpc/grpc_client_reactor.h"
#include "chromecast/cast_core/grpc/grpc_status_or.h"
namespace cast {
namespace utils {
// Typedef for the server streaming method generated by gRPC compiler.
template <typename TAsyncInterface, typename TRequest, typename TResponse>
using GrpcServerStreamingMethod =
void (TAsyncInterface::*)(grpc::ClientContext*,
const TRequest*,
grpc::ClientReadReactor<TResponse>*);
// A GrpcCall implementation for unary gRPC calls specialized by the
// |AsyncMethodPtr| function pointer.
// TGrpcStub - gRPC service stub type.
// TRequest - gRPC request type for a method in the stub.
// TResponse - gRPC response type for a method in the stub.
// AsyncMethodPtr - pointer to a method in the stub that handles a streaming
// call.
template <typename TGrpcStub,
typename TRequest,
typename TResponse,
GrpcServerStreamingMethod<typename TGrpcStub::AsyncInterface,
TRequest,
TResponse> AsyncMethodPtr>
class GrpcServerStreamingCall : public GrpcCall<TGrpcStub, TRequest> {
public:
using Base = GrpcCall<TGrpcStub, TRequest>;
using Base::async;
using Base::GrpcCall;
using Base::request;
using Base::sync;
using typename Base::AsyncInterface;
using typename Base::Context;
using typename Base::Request;
using Response = TResponse;
using ResponseCallback =
base::RepeatingCallback<void(GrpcStatusOr<Response>, bool /*done*/)>;
// Invokes a gRPC call asynchronously. The method follows moves semantics:
// std::move(call).InvokeAsync(...);
// The returned Context is valid only during duration of the call and can be
// used to cancel it.
Context InvokeAsync(ResponseCallback response_callback) && {
// gRPC doesn't support setting a deadline for individual streaming
// requests\responses. Hence, the zero timeout is set to allow for
// inifinitely long streaming connections.
Base::SetDeadline(0);
auto reactor =
new Reactor(std::move(*this).async(), std::move(*this).request(),
std::move(*this).options(), std::move(response_callback));
reactor->Start();
return Context(reactor->context());
}
private:
using ReactorBase =
GrpcClientReactor<Request, grpc::ClientReadReactor<Response>>;
class Reactor final : public ReactorBase {
public:
using ReactorBase::context;
using ReactorBase::request;
Reactor(AsyncInterface* async_stub,
Request request,
GrpcCallOptions options,
ResponseCallback response_callback)
: ReactorBase(std::move(request), std::move(options)),
async_interface_(async_stub),
response_callback_(std::move(response_callback)) {}
void Start() override {
ReactorBase::Start();
(async_interface_->*AsyncMethodPtr)(context(), request(), this);
grpc::ClientReadReactor<Response>::StartRead(&response_);
grpc::ClientReadReactor<Response>::StartCall();
}
private:
// Implements grpc::ClientReadReactor APIs.
void OnReadDone(bool ok) override {
DVLOG(1) << "Reads done: ok=" << ok;
if (!ok) {
return;
}
response_callback_.Run(std::move(response_), false);
Reactor::StartRead(&response_);
}
// The method is always called on completion of all operations associated
// with this call, and deletes itself on exit.
void OnDone(const grpc::Status& status) override {
DVLOG(1) << "Request done: " << GrpcStatusToString(status);
if (status.ok()) {
response_callback_.Run(Response(), true);
} else {
response_callback_.Run(status, true);
}
ReactorBase::DeleteThis();
}
using AsyncStubCall =
base::OnceCallback<void(grpc::ClientContext*,
const Request*,
grpc::ClientReadReactor<Response>*)>;
raw_ptr<AsyncInterface> async_interface_;
ResponseCallback response_callback_;
Response response_;
};
};
} // namespace utils
} // namespace cast
#endif // CHROMECAST_CAST_CORE_GRPC_GRPC_SERVER_STREAMING_CALL_H_