folly/folly/channels/ChannelCallbackHandle.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <folly/IntrusiveList.h>
#include <folly/ScopeGuard.h>
#include <folly/channels/Channel.h>

namespace folly {
namespace channels {

namespace detail {
class ChannelCallbackProcessor : public IChannelCallback {
 public:
  virtual void onHandleDestroyed() = 0;
};
} // namespace detail

/**
 * A callback handle for a consumption operation on a channel. The consumption
 * operation will be cancelled when this handle is destroyed.
 */
class ChannelCallbackHandle {
 public:
  ChannelCallbackHandle() : processor_(nullptr) {}

  explicit ChannelCallbackHandle(detail::ChannelCallbackProcessor* processor)
      : processor_(processor) {}

  ~ChannelCallbackHandle() {
    if (processor_) {
      processor_->onHandleDestroyed();
    }
  }

  ChannelCallbackHandle(ChannelCallbackHandle&& other) noexcept
      : processor_(std::exchange(other.processor_, nullptr)) {}

  ChannelCallbackHandle& operator=(ChannelCallbackHandle&& other) {
    if (&other == this) {
      return *this;
    }
    reset();
    processor_ = std::exchange(other.processor_, nullptr);
    return *this;
  }

  void reset() {
    if (processor_) {
      processor_->onHandleDestroyed();
      processor_ = nullptr;
    }
  }

 private:
  detail::ChannelCallbackProcessor* processor_;
};

namespace detail {

/**
 * A wrapper around a ChannelCallbackHandle that belongs to an intrusive linked
 * list. When the holder is destroyed, the object will automatically be unlinked
 * from the linked list that it is in (if any).
 */
struct ChannelCallbackHandleHolder {
  explicit ChannelCallbackHandleHolder(ChannelCallbackHandle _handle)
      : handle(std::move(_handle)) {}

  ChannelCallbackHandleHolder(ChannelCallbackHandleHolder&& other) noexcept
      : handle(std::move(other.handle)) {
    hook.swap_nodes(other.hook);
  }

  ChannelCallbackHandleHolder& operator=(
      ChannelCallbackHandleHolder&& other) noexcept {
    if (&other == this) {
      return *this;
    }
    handle = std::move(other.handle);
    hook.unlink();
    hook.swap_nodes(other.hook);
    return *this;
  }

  void requestCancellation() { handle.reset(); }

  ChannelCallbackHandle handle;
  folly::IntrusiveListHook hook;
};

template <typename TValue, typename OnNextFunc>
class ChannelCallbackProcessorImplWithList;
} // namespace detail

/**
 * A list of channel callback handles. When consumeChannelWithCallback is
 * invoked with a list, a cancellation handle is automatically added to the list
 * for the consumption operation. Similarly, when a consumption operation is
 * completed, the handle is automatically removed from the lists.
 *
 * If the list still has any cancellation handles remaining when the list is
 * destroyed, cancellation is triggered for each handle in the list.
 *
 * This list is not thread safe.
 */
class ChannelCallbackHandleList {
 public:
  ChannelCallbackHandleList() {}

  ChannelCallbackHandleList(ChannelCallbackHandleList&& other) noexcept {
    holders_.swap(other.holders_);
  }

  ChannelCallbackHandleList& operator=(
      ChannelCallbackHandleList&& other) noexcept {
    if (&other == this) {
      return *this;
    }
    holders_.swap(other.holders_);
    return *this;
  }

  ~ChannelCallbackHandleList() { clear(); }

  void clear() {
    for (auto& holder : holders_) {
      holder.requestCancellation();
    }
    holders_.clear();
  }

 private:
  template <typename TValue, typename OnNextFunc>
  friend class detail::ChannelCallbackProcessorImplWithList;

  void add(detail::ChannelCallbackHandleHolder& holder) {
    holders_.push_back(holder);
  }

  using ChannelCallbackHandleListImpl = folly::IntrusiveList<
      detail::ChannelCallbackHandleHolder,
      &detail::ChannelCallbackHandleHolder::hook>;

  ChannelCallbackHandleListImpl holders_;
};
} // namespace channels
} // namespace folly