// Copyright 2021 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H #define GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H #include <grpc/support/port_platform.h> #include <stdint.h> #include <algorithm> #include <atomic> #include <memory> #include <string> #include <utility> #include "absl/base/thread_annotations.h" #include "absl/status/status.h" #include "absl/types/optional.h" #include <grpc/support/log.h> #include "src/core/lib/gprpp/construct_destruct.h" #include "src/core/lib/gprpp/no_destruct.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/detail/promise_factory.h" #include "src/core/lib/promise/detail/status.h" namespace grpc_core { class Activity; // A Wakeable object is used by queues to wake activities. class Wakeable { … }; namespace promise_detail { struct Unwakeable final : public Wakeable { … }; static Unwakeable* unwakeable() { … } } // namespace promise_detail // An owning reference to a Wakeable. // This type is non-copyable but movable. class Waker { … }; // An Activity tracks execution of a single promise. // It executes the promise under a mutex. // When the promise stalls, it registers the containing activity to be woken up // later. // The activity takes a callback, which will be called exactly once with the // result of execution. // Activity execution may be cancelled by simply deleting the activity. In such // a case, if execution had not already finished, the done callback would be // called with absl::CancelledError(). class Activity : public Orphanable { … }; // Owned pointer to one Activity. ActivityPtr; namespace promise_detail { template <typename Context> class ContextHolder { … }; ContextHolder<Context *>; ContextHolder<std::unique_ptr<Context, Deleter>>; ContextTypeFromHeld; template <typename... Contexts> class ActivityContexts : public ContextHolder<Contexts>... { … }; // A free standing activity: an activity that owns its own synchronization and // memory. // The alternative is an activity that's somehow tied into another system, for // instance the type seen in promise_based_filter.h as we're transitioning from // the old filter stack to the new system. // FreestandingActivity is-a Wakeable, but needs to increment a refcount before // returning that Wakeable interface. Additionally, we want to keep // FreestandingActivity as small as is possible, since it will be used // everywhere. So we use inheritance to provide the Wakeable interface: this // makes it zero sized, and we make the inheritance private to prevent // accidental casting. class FreestandingActivity : public Activity, private Wakeable { … }; // Implementation details for an Activity of an arbitrary type of promise. // There should exist an inner template class `BoundScheduler` that provides // the following interface: // struct WakeupScheduler { // template <typename ActivityType> // class BoundScheduler { // public: // BoundScheduler(WakeupScheduler); // void ScheduleWakeup(); // }; // }; // The ScheduleWakeup function should arrange that // static_cast<ActivityType*>(this)->RunScheduledWakeup() be invoked at the // earliest opportunity. // It can assume that activity will remain live until RunScheduledWakeup() is // invoked, and that a given activity will not be concurrently scheduled again // until its RunScheduledWakeup() has been invoked. // We use private inheritance here as a way of getting private members for each // of the contexts. // TODO(ctiller): We can probably reconsider the private inheritance here // when we move away from C++11 and have more powerful template features. template <class F, class WakeupScheduler, class OnDone, typename... Contexts> class PromiseActivity final : public FreestandingActivity, public WakeupScheduler::template BoundScheduler< PromiseActivity<F, WakeupScheduler, OnDone, Contexts...>>, private ActivityContexts<Contexts...> { public: using Factory = OncePromiseFactory<void, F>; using ResultType = typename Factory::Promise::Result; PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler, OnDone on_done, Contexts&&... contexts) : … { … } ~PromiseActivity() override { … } void RunScheduledWakeup() { … } private: using typename ActivityContexts<Contexts...>::ScopedContext; void Cancel() final { … } // Wakeup this activity. Arrange to poll the activity again at a convenient // time: this could be inline if it's deemed safe, or it could be by passing // the activity to an external threadpool to run. If the activity is already // running on this thread, a note is taken of such and the activity is // repolled if it doesn't complete. void Wakeup(void*) final { … } // Drop a wakeup void Drop(void*) final { … } // Notification that we're no longer executing - it's ok to destruct the // promise. void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { … } // In response to Wakeup, run the Promise state machine again until it // settles. Then check for completion, and if we have completed, call on_done. void Step() ABSL_LOCKS_EXCLUDED(mu()) { … } // The main body of a step: set the current activity, and any contexts, and // then run the main polling loop. Contained in a function by itself in // order to keep the scoping rules a little easier in Step(). absl::optional<ResultType> RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { … } // Similarly to RunStep, but additionally construct the promise from a // promise factory before entering the main loop. Called once from the // constructor. absl::optional<ResultType> Start(Factory promise_factory) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { … } // Until there are no wakeups from within and the promise is incomplete: // poll the promise. absl::optional<ResultType> StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { … } using Promise = typename Factory::Promise; // Callback on completion of the promise. GPR_NO_UNIQUE_ADDRESS OnDone on_done_; // Has execution completed? GPR_NO_UNIQUE_ADDRESS bool done_ ABSL_GUARDED_BY(mu()) = false; // Is there a wakeup scheduled? GPR_NO_UNIQUE_ADDRESS std::atomic<bool> wakeup_scheduled_{false}; // We wrap the promise in a union to allow control over the construction // simultaneously with annotating mutex requirements and noting that the // promise contained may not use any memory. union PromiseHolder { PromiseHolder() {} ~PromiseHolder() {} GPR_NO_UNIQUE_ADDRESS Promise promise; }; GPR_NO_UNIQUE_ADDRESS PromiseHolder promise_holder_ ABSL_GUARDED_BY(mu()); }; } // namespace promise_detail // Given a functor that returns a promise (a promise factory), a callback for // completion, and a callback scheduler, construct an activity. template <typename Factory, typename WakeupScheduler, typename OnDone, typename... Contexts> ActivityPtr MakeActivity(Factory promise_factory, WakeupScheduler wakeup_scheduler, OnDone on_done, Contexts&&... contexts) { … } } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H