folly/folly/synchronization/test/FlatCombiningExamples.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <atomic>
#include <memory>
#include <mutex>

#include <folly/synchronization/Baton.h>
#include <folly/synchronization/FlatCombining.h>

namespace folly {

struct alignas(hardware_destructive_interference_size) Line {
  uint64_t val_;
};

class Data { // Sequential data structure
 public:
  explicit Data(size_t size) : size_(size) {
    x_ = std::make_unique<Line[]>(size_);
  }

  uint64_t getVal() {
    uint64_t val = x_[0].val_;
    for (size_t i = 1; i < size_; ++i) {
      assert(x_[i].val_ == val);
    }
    return val;
  }

  // add

  void add(uint64_t val) {
    uint64_t oldval = x_[0].val_;
    for (size_t i = 0; i < size_; ++i) {
      assert(x_[i].val_ == oldval);
      x_[i].val_ = oldval + val;
    }
  }

  uint64_t fetchAdd(uint64_t val) {
    uint64_t res = x_[0].val_;
    for (size_t i = 0; i < size_; ++i) {
      assert(x_[i].val_ == res);
      x_[i].val_ += val;
    }
    return res;
  }

 private:
  size_t size_;
  std::unique_ptr<Line[]> x_;
};

// Example of FC concurrent data structure using simple interface

template <
    typename Mutex = std::mutex,
    template <typename> class Atom = std::atomic>
class FcSimpleExample
    : public FlatCombining<FcSimpleExample<Mutex, Atom>, Mutex, Atom> {
  using FC = FlatCombining<FcSimpleExample<Mutex, Atom>, Mutex, Atom>;
  using Rec = typename FC::Rec;

 public:
  explicit FcSimpleExample(
      size_t size,
      bool dedicated = true,
      uint32_t numRecs = 0,
      uint32_t maxOps = 0)
      : FC(dedicated, numRecs, maxOps), data_(size) {}

  uint64_t getVal() { return data_.getVal(); }

  // add

  void addNoFC(uint64_t val) {
    this->requestNoFC([&] { data_.add(val); });
  }

  void add(uint64_t val, Rec* rec = nullptr) {
    auto opFn = [&, val] { // asynchronous -- capture val by value
      data_.add(val);
    };
    this->requestFC(opFn, rec, false);
  }

  // fetchAdd

  uint64_t fetchAddNoFC(uint64_t val) {
    uint64_t res;
    auto opFn = [&] { res = data_.fetchAdd(val); };
    this->requestNoFC(opFn);
    return res;
  }

  uint64_t fetchAdd(uint64_t val, Rec* rec = nullptr) {
    uint64_t res;
    auto opFn = [&] { res = data_.fetchAdd(val); };
    this->requestFC(opFn, rec);
    return res;
  }

 private:
  Data data_;
};

// Example of FC data structure using custom request processing

class Req {
 public:
  enum class Type { ADD, FETCHADD };

  void setType(Type type) { type_ = type; }

  Type getType() { return type_; }

  void setVal(uint64_t val) { val_ = val; }

  uint64_t getVal() { return val_; }

  void setRes(uint64_t res) { res_ = res; }

  uint64_t getRes() { return res_; }

 private:
  Type type_;
  uint64_t val_;
  uint64_t res_;
};

template <
    typename Req,
    typename Mutex = std::mutex,
    template <typename> class Atom = std::atomic>
class FcCustomExample : public FlatCombining<
                            FcCustomExample<Req, Mutex, Atom>,
                            Mutex,
                            Atom,
                            Req> {
  using FC = FlatCombining<FcCustomExample<Req, Mutex, Atom>, Mutex, Atom, Req>;
  using Rec = typename FC::Rec;

 public:
  explicit FcCustomExample(
      int size,
      bool dedicated = true,
      uint32_t numRecs = 0,
      uint32_t maxOps = 0)
      : FC(dedicated, numRecs, maxOps), data_(size) {}

  uint64_t getVal() { return data_.getVal(); }

  // add

  void addNoFC(uint64_t val) {
    this->requestNoFC([&] { data_.add(val); });
  }

  void add(uint64_t val, Rec* rec = nullptr) {
    auto opFn = [&, val] { data_.add(val); };
    auto fillFn = [&](Req& req) {
      req.setType(Req::Type::ADD);
      req.setVal(val);
    };
    this->requestFC(opFn, fillFn, rec, false); // asynchronous
  }

  // fetchAdd

  uint64_t fetchAddNoFC(uint64_t val) {
    uint64_t res;
    auto opFn = [&] { res = data_.fetchAdd(val); };
    this->requestNoFC(opFn);
    return res;
  }

  uint64_t fetchAdd(uint64_t val, Rec* rec = nullptr) {
    uint64_t res;
    auto opFn = [&] { res = data_.fetchAdd(val); };
    auto fillFn = [&](Req& req) {
      req.setType(Req::Type::FETCHADD);
      req.setVal(val);
    };
    auto resFn = [&](Req& req) { res = req.getRes(); };
    this->requestFC(opFn, fillFn, resFn, rec);
    return res;
  }

  // custom combined op processing - overrides FlatCombining::combinedOp(Req&)
  void combinedOp(Req& req) {
    switch (req.getType()) {
      case Req::Type::ADD:
        data_.add(req.getVal());
        return;
      case Req::Type::FETCHADD:
        req.setRes(data_.fetchAdd(req.getVal()));
        return;
    }
    assume_unreachable();
  }

 private:
  Data data_;
};

} // namespace folly