chromium/chromecast/base/observer.h

// Copyright 2017 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

// Safe Observer/Observable implementation.
//
// When using ObserverListThreadSafe, we were running into issues since there
// was no synchronization between getting the existing value, and registering
// as an observer. See go/cast-platform-design-synchronized-value for more
// details.
//
// To fix this issue, and to make observing values safer and simpler in general,
// use the Observable/Observer pattern in this file. When you have a value that
// other code wants to observe (ie, get the value of and receive any changes),
// wrap that value in an Observable<T>. The value type T must be copyable and
// copy-assignable. The Observable must be constructed with the initial observed
// value, and the value may be updated at any time from any thread by calling
// SetValue(). You can also get the value using GetValue(); but note that this
// is not threadsafe (the value is returned without locking), so the caller must
// ensure safety by other means. Calling GetValueThreadSafe() is threadsafe but
// involves a mutex lock.
//
// Code that wants to observe the value calls Observe() on it at any point when
// the value is alive. Note that Observe() may be called safely from any thread.
// Observe() returns an Observer<T> instance, which MUST be used and destroyed
// only on the thread that called Observe(). The Observer initially contains the
// value that the Observable had when Observe() was called, and that value will
// be updated asynchronously whenever the Observable's SetValue() method is
// is called. NOTE: the initial value of the Observer is the value known to the
// thread that created the Observer at the time; there may be an updated value
// from the Observable that hasn't been handled by the Observer's thread yet.
//
// The Observer's view of the observed value is returned by GetValue(); this is
// a low-cost call since there is no locking (the value is updated on the thread
// that constructed the Observer). Note that Observers are always updated
// asynchronously with PostTask(), even if they belong to the same thread that
// calls SetValue(). All Observers on the same thread have the same consistent
// view of the observed value.
//
// Observers may be copied freely; the copy also observes the original
// Observable, and belongs to the thread that created the copy. Copying is safe
// even when the original Observable has been destroyed.
//
// Code may register a callback that is called whenever an Observer's value is
// updated, by calling SetOnUpdateCallback(). If you get an Observer by calling
// Observe() and then immediately call SetOnUpdateCallback() to register a
// a callback, you are guaranteed to get every value of the Observable starting
// from when you called Observe() - you get the initial value by calling
// GetValue() on the returned Observer, and any subsequent updates will trigger
// the callback so you can call GetValue() to get the new value. You will not
// receive any extra callbacks (exactly one callback per value update).
//
// Note that Observers are not default-constructible, since there is no way to
// construct it in a default state. In cases where you need to instantiate an
// Observer after your constructor, you can use a std::unique_ptr<Observer>
// instead, and initialize it when needed.
//
// Example usage:
//
// class MediaManager {
//  public:
//   MediaManager() : volume_(0.0f) {}
//
//   Observer<float> ObserveVolume() { return volume_.Observe(); }
//
//   // ... other methods ...
//
//  private:
//   // Assume this is called from some other internal code when the volume is
//   // updated.
//   void OnUpdateVolume(float new_volume) {
//     volume_.SetValue(new_volume);  // All observers will get the new value.
//   }
//
//   Observable<float> volume_;
// }
//
// class VolumeFeedbackManager {
//  public:
//   VolumeFeedbackManager(MediaManager* media_manager)
//       : volume_observer_(media_manager->ObserveVolume()) {
//     volume_observer_.SetOnUpdateCallback(
//         base::BindRepeating(&VolumeFeedbackManager::OnVolumeChanged,
//                             base::Unretained(this)));
//   }
//
//  private:
//   void OnVolumeChanged() {
//     ShowVolumeFeedback(volume_observer_.GetValue());
//   }
//
//   void ShowVolumeFeedback(float volume) {
//     // ... some implementation ...
//   }
// };
//

#ifndef CHROMECAST_BASE_OBSERVER_H_
#define CHROMECAST_BASE_OBSERVER_H_

#include <stddef.h>
#include <stdint.h>

#include <memory>
#include <utility>
#include <vector>

#include "base/check_op.h"
#include "base/containers/contains.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/location.h"
#include "base/memory/ref_counted.h"
#include "base/notreached.h"
#include "base/sequence_checker.h"
#include "base/synchronization/lock.h"
#include "base/task/sequenced_task_runner.h"

namespace chromecast {

namespace subtle {
template <typename T>
class ObservableInternals;
}  // namespace subtle

template <typename T>
class Observable;

template <typename T>
class Observer {
 public:
  Observer(const Observer& other);

  Observer& operator=(const Observer&) = delete;

  ~Observer();

  void SetOnUpdateCallback(base::RepeatingClosure callback) {
    on_update_callback_ = std::move(callback);
  }

  const T& GetValue() const {
    DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    return value_;
  }

 private:
  friend class subtle::ObservableInternals<T>;
  friend class Observable<T>;

  explicit Observer(scoped_refptr<subtle::ObservableInternals<T>> internals);

  void OnUpdate();

  const scoped_refptr<subtle::ObservableInternals<T>> internals_;
  // Note: value_ is a const ref to the value copy for this sequence, stored in
  // SequenceOwnedInfo.
  const T& value_;
  base::RepeatingClosure on_update_callback_;
  SEQUENCE_CHECKER(sequence_checker_);
};

template <typename T>
class Observable {
  static_assert(std::is_copy_constructible<T>::value,
                "Observable values must be copyable");
  static_assert(std::is_copy_assignable<T>::value,
                "Observable values must be copy-assignable");

 public:
  explicit Observable(const T& initial_value);

  Observable(const Observable&) = delete;
  Observable& operator=(const Observable&) = delete;

  Observer<T> Observe();

  void SetValue(const T& new_value);
  const T& GetValue() const;  // NOT threadsafe!
  T GetValueThreadSafe() const;

 private:
  // By using a refcounted object to store the value and observer list, we can
  // avoid tying the lifetime of Observable to its Observers or vice versa.
  const scoped_refptr<subtle::ObservableInternals<T>> internals_;
};

namespace subtle {

template <typename T>
class ObservableInternals
    : public base::RefCountedThreadSafe<ObservableInternals<T>> {
 public:
  explicit ObservableInternals(const T& initial_value)
      : value_(initial_value) {}

  ObservableInternals(const ObservableInternals&) = delete;
  ObservableInternals& operator=(const ObservableInternals&) = delete;

  void SetValue(const T& new_value) {
    base::AutoLock lock(lock_);
    value_ = new_value;

    for (auto& item : per_sequence_) {
      item.SetValue(new_value);
    }
  }

  const T& GetValue() const { return value_; }

  T GetValueThreadSafe() const {
    base::AutoLock lock(lock_);
    return value_;
  }

  const T& AddObserver(Observer<T>* observer) {
    DCHECK(observer);
    DCHECK(base::SequencedTaskRunner::HasCurrentDefault());
    auto task_runner = base::SequencedTaskRunner::GetCurrentDefault();

    base::AutoLock lock(lock_);
    auto it = per_sequence_.begin();
    while (it != per_sequence_.end() && it->task_runner() != task_runner) {
      ++it;
    }
    if (it == per_sequence_.end()) {
      per_sequence_.emplace_back(std::move(task_runner), value_);
      it = --per_sequence_.end();
    }
    it->AddObserver(observer);
    return it->value();
  }

  void RemoveObserver(Observer<T>* observer) {
    DCHECK(observer);
    DCHECK(base::SequencedTaskRunner::HasCurrentDefault());
    auto task_runner = base::SequencedTaskRunner::GetCurrentDefault();

    base::AutoLock lock(lock_);
    for (size_t i = 0; i < per_sequence_.size(); ++i) {
      if (per_sequence_[i].task_runner() == task_runner) {
        per_sequence_[i].RemoveObserver(observer);

        if (per_sequence_[i].Empty()) {
          per_sequence_[i].Swap(per_sequence_.back());
          per_sequence_.pop_back();
        }
        return;
      }
    }

    NOTREACHED_IN_MIGRATION()
        << "Tried to remove observer from unknown task runner";
  }

 private:
  // Information owned by a particular sequence. Must be only accessed on that
  // sequence, and must be deleted by posting a task to that sequence.
  // This class MUST NOT contain a scoped_refptr to the task_runner, since if it
  // did, there would be a reference cycle during cleanup, when the task to
  // Destroy() is posted.
  class SequenceOwnedInfo {
   public:
    explicit SequenceOwnedInfo(const T& value) : value_(value) {}

    SequenceOwnedInfo(const SequenceOwnedInfo&) = delete;
    SequenceOwnedInfo& operator=(const SequenceOwnedInfo&) = delete;

    const T& value() const {
      DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
      return value_;
    }

    void AddObserver(Observer<T>* observer) {
      DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
      DCHECK(observer);
      DCHECK(!base::Contains(observers_, observer));
      observers_.push_back(observer);
    }

    void RemoveObserver(Observer<T>* observer) {
      DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
      DCHECK(observer);
      DCHECK(base::Contains(observers_, observer));
      observers_.erase(
          std::remove(observers_.begin(), observers_.end(), observer),
          observers_.end());
    }

    bool Empty() const {
      DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
      return observers_.empty();
    }

    void SetValue(const T& value) {
      DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
      value_ = value;
      for (auto* obs : observers_) {
        obs->OnUpdate();
      }
    }

    static void Destroy(std::unique_ptr<SequenceOwnedInfo> self) {
      // The unique_ptr deletes automatically.
    }

   private:
    std::vector<Observer<T>*> observers_;
    T value_;
    SEQUENCE_CHECKER(sequence_checker_);
  };

  class PerSequenceInfo {
   public:
    PerSequenceInfo(scoped_refptr<base::SequencedTaskRunner> task_runner,
                    const T& value)
        : task_runner_(std::move(task_runner)),
          owned_info_(std::make_unique<SequenceOwnedInfo>(value)) {}

    PerSequenceInfo(PerSequenceInfo&& other) = default;

    ~PerSequenceInfo() {
      if (!owned_info_) {
        // Members have been moved out via move constructor.
        return;
      }

      DCHECK(Empty());
      // Must post a task to delete the owned info, since there may still be a
      // pending task to call SequenceOwnedInfo::SetValue().
      // Use manual PostNonNestableTask(), since DeleteSoon() does not
      // guarantee deletion.
      task_runner_->PostNonNestableTask(
          FROM_HERE,
          base::BindOnce(&SequenceOwnedInfo::Destroy, std::move(owned_info_)));
    }

    const T& value() const { return owned_info_->value(); }

    const base::SequencedTaskRunner* task_runner() const {
      return task_runner_.get();
    }

    void AddObserver(Observer<T>* observer) {
      owned_info_->AddObserver(observer);
    }

    void RemoveObserver(Observer<T>* observer) {
      owned_info_->RemoveObserver(observer);
    }

    bool Empty() const { return owned_info_->Empty(); }

    void Swap(PerSequenceInfo& other) {
      std::swap(task_runner_, other.task_runner_);
      std::swap(owned_info_, other.owned_info_);
    }

    void SetValue(const T& value) {
      task_runner_->PostTask(
          FROM_HERE,
          base::BindOnce(&SequenceOwnedInfo::SetValue,
                         base::Unretained(owned_info_.get()), value));
    }

   private:
    // Operations on |owned_info| do not need to be synchronized with a lock,
    // since all operations must occur on |task_runner|.
    scoped_refptr<base::SequencedTaskRunner> task_runner_;
    std::unique_ptr<SequenceOwnedInfo> owned_info_;
  };

  friend class base::RefCountedThreadSafe<ObservableInternals>;
  ~ObservableInternals() {}

  mutable base::Lock lock_;
  T value_;
  std::vector<PerSequenceInfo> per_sequence_;
};

}  // namespace subtle

template <typename T>
Observer<T>::Observer(scoped_refptr<subtle::ObservableInternals<T>> internals)
    : internals_(std::move(internals)), value_(internals_->AddObserver(this)) {}

template <typename T>
Observer<T>::Observer(const Observer& other) : Observer(other.internals_) {}

template <typename T>
Observer<T>::~Observer() {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  internals_->RemoveObserver(this);
}

template <typename T>
void Observer<T>::OnUpdate() {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  if (on_update_callback_) {
    on_update_callback_.Run();
  }
}

template <typename T>
Observable<T>::Observable(const T& initial_value)
    : internals_(base::WrapRefCounted(
          new subtle::ObservableInternals<T>(initial_value))) {}

template <typename T>
Observer<T> Observable<T>::Observe() {
  return Observer<T>(internals_);
}

template <typename T>
void Observable<T>::SetValue(const T& new_value) {
  internals_->SetValue(new_value);
}

template <typename T>
const T& Observable<T>::GetValue() const {
  return internals_->GetValue();
}

template <typename T>
T Observable<T>::GetValueThreadSafe() const {
  return internals_->GetValueThreadSafe();
}

}  // namespace chromecast

#endif  // CHROMECAST_BASE_OBSERVER_H_