chromium/chromecast/media/cma/base/balanced_media_task_runner_factory.cc

// Copyright 2014 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chromecast/media/cma/base/balanced_media_task_runner_factory.h"

#include <map>
#include <utility>

#include "base/check.h"
#include "base/functional/bind.h"
#include "base/task/single_thread_task_runner.h"
#include "chromecast/media/cma/base/media_task_runner.h"
#include "media/base/timestamp_constants.h"

namespace chromecast {
namespace media {

// MediaTaskRunnerWithNotification -
// Media task runner which also behaves as a media task runner observer.
class MediaTaskRunnerWithNotification : public MediaTaskRunner {
 public:
  // Wraps a MediaTaskRunner so that a third party can:
  // - be notified when a PostMediaTask is performed on this media task runner.
  //   |new_task_cb| is invoked in that case.
  // - monitor the lifetime of the media task runner, i.e. check when the media
  //   task runner is not needed anymore.
  //   |shutdown_cb| is invoked in that case.
  MediaTaskRunnerWithNotification(
      const scoped_refptr<MediaTaskRunner>& media_task_runner,
      base::RepeatingClosure new_task_cb,
      base::OnceClosure shutdown_cb);

  MediaTaskRunnerWithNotification(const MediaTaskRunnerWithNotification&) =
      delete;
  MediaTaskRunnerWithNotification& operator=(
      const MediaTaskRunnerWithNotification&) = delete;

  // MediaTaskRunner implementation.
  bool PostMediaTask(const base::Location& from_here,
                     base::OnceClosure task,
                     base::TimeDelta timestamp) override;

 private:
  ~MediaTaskRunnerWithNotification() override;

  scoped_refptr<MediaTaskRunner> const media_task_runner_;

  const base::RepeatingClosure new_task_cb_;
  base::OnceClosure shutdown_cb_;
};

MediaTaskRunnerWithNotification::MediaTaskRunnerWithNotification(
    const scoped_refptr<MediaTaskRunner>& media_task_runner,
    base::RepeatingClosure new_task_cb,
    base::OnceClosure shutdown_cb)
    : media_task_runner_(media_task_runner),
      new_task_cb_(std::move(new_task_cb)),
      shutdown_cb_(std::move(shutdown_cb)) {}

MediaTaskRunnerWithNotification::~MediaTaskRunnerWithNotification() {
  std::move(shutdown_cb_).Run();
}

bool MediaTaskRunnerWithNotification::PostMediaTask(
    const base::Location& from_here,
    base::OnceClosure task,
    base::TimeDelta timestamp) {
  bool may_run_in_future =
      media_task_runner_->PostMediaTask(from_here, std::move(task), timestamp);
  if (may_run_in_future)
    new_task_cb_.Run();
  return may_run_in_future;
}

// BalancedMediaTaskRunner -
// Run media tasks whose timestamp is less or equal to a max timestamp.
//
// Restrictions of BalancedMediaTaskRunner:
// - Can have at most one task in the queue.
// - Tasks should be given by increasing timestamps.
class BalancedMediaTaskRunner
    : public MediaTaskRunner {
 public:
  explicit BalancedMediaTaskRunner(
      const scoped_refptr<base::SingleThreadTaskRunner>& task_runner);

  BalancedMediaTaskRunner(const BalancedMediaTaskRunner&) = delete;
  BalancedMediaTaskRunner& operator=(const BalancedMediaTaskRunner&) = delete;

  // Schedule tasks whose timestamp is less than or equal to |max_timestamp|.
  void ScheduleWork(base::TimeDelta max_timestamp);

  // Return the timestamp of the last media task.
  // Return ::media::kNoTimestamp if no media task has been posted.
  base::TimeDelta GetMediaTimestamp() const;

  // MediaTaskRunner implementation.
  bool PostMediaTask(const base::Location& from_here,
                     base::OnceClosure task,
                     base::TimeDelta timestamp) override;

 private:
  ~BalancedMediaTaskRunner() override;

  scoped_refptr<base::SingleThreadTaskRunner> const task_runner_;

  // Protects the following variables.
  mutable base::Lock lock_;

  // Possible pending media task.
  base::Location from_here_;
  base::OnceClosure pending_task_;

  // Timestamp of the last posted task.
  // Is initialized to ::media::kNoTimestamp.
  base::TimeDelta last_timestamp_;
};

BalancedMediaTaskRunner::BalancedMediaTaskRunner(
    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
    : task_runner_(task_runner), last_timestamp_(::media::kNoTimestamp) {}

BalancedMediaTaskRunner::~BalancedMediaTaskRunner() {
}

void BalancedMediaTaskRunner::ScheduleWork(base::TimeDelta max_media_time) {
  base::OnceClosure task;
  {
    base::AutoLock auto_lock(lock_);
    if (pending_task_.is_null())
      return;

    if (last_timestamp_ != ::media::kNoTimestamp &&
        last_timestamp_ >= max_media_time) {
      return;
    }

    task = std::move(pending_task_);
  }
  task_runner_->PostTask(from_here_, std::move(task));
}

base::TimeDelta BalancedMediaTaskRunner::GetMediaTimestamp() const {
  base::AutoLock auto_lock(lock_);
  return last_timestamp_;
}

bool BalancedMediaTaskRunner::PostMediaTask(const base::Location& from_here,
                                            base::OnceClosure task,
                                            base::TimeDelta timestamp) {
  DCHECK(!task.is_null());

  // Pass through for a task with no timestamp.
  if (timestamp == ::media::kNoTimestamp) {
    return task_runner_->PostTask(from_here, std::move(task));
  }

  base::AutoLock auto_lock(lock_);

  // Timestamps must be in order.
  // Any task that does not meet that condition is simply discarded.
  if (last_timestamp_ != ::media::kNoTimestamp && timestamp < last_timestamp_) {
    return false;
  }

  // Only support one pending task at a time.
  DCHECK(pending_task_.is_null());
  from_here_ = from_here;
  pending_task_ = std::move(task);
  last_timestamp_ = timestamp;

  return true;
}

BalancedMediaTaskRunnerFactory::BalancedMediaTaskRunnerFactory(
    base::TimeDelta max_delta)
  : max_delta_(max_delta) {
}

BalancedMediaTaskRunnerFactory::~BalancedMediaTaskRunnerFactory() {
}

scoped_refptr<MediaTaskRunner>
BalancedMediaTaskRunnerFactory::CreateMediaTaskRunner(
    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner) {
  scoped_refptr<BalancedMediaTaskRunner> media_task_runner(
      new BalancedMediaTaskRunner(task_runner));
  scoped_refptr<MediaTaskRunnerWithNotification> media_task_runner_wrapper(
      new MediaTaskRunnerWithNotification(
          media_task_runner,
          base::BindRepeating(&BalancedMediaTaskRunnerFactory::OnNewTask, this),
          base::BindOnce(
              &BalancedMediaTaskRunnerFactory::UnregisterMediaTaskRunner, this,
              media_task_runner)));
  base::AutoLock auto_lock(lock_);
  // Note that |media_task_runner| is inserted here and
  // not |media_task_runner_wrapper|. Otherwise, we would always have one
  // ref on |media_task_runner_wrapper| and would never get the release
  // notification.
  // When |media_task_runner_wrapper| is going away,
  // BalancedMediaTaskRunnerFactory will receive a notification and will in
  // turn remove |media_task_runner|.
  task_runners_.insert(media_task_runner);
  return media_task_runner_wrapper;
}

void BalancedMediaTaskRunnerFactory::OnNewTask() {
  typedef
      std::multimap<base::TimeDelta, scoped_refptr<BalancedMediaTaskRunner> >
      TaskRunnerMap;
  TaskRunnerMap runnable_task_runner;

  base::AutoLock auto_lock(lock_);

  // Get the minimum timestamp among all streams.
  for (MediaTaskRunnerSet::const_iterator it = task_runners_.begin();
       it != task_runners_.end(); ++it) {
    base::TimeDelta timestamp((*it)->GetMediaTimestamp());
    if (timestamp == ::media::kNoTimestamp)
      continue;
    runnable_task_runner.insert(
        std::pair<base::TimeDelta, scoped_refptr<BalancedMediaTaskRunner> >(
            timestamp, *it));
  }

  // If there is no media task, just returns.
  if (runnable_task_runner.empty())
    return;

  // Run tasks which meet the balancing criteria.
  base::TimeDelta min_timestamp(runnable_task_runner.begin()->first);
  base::TimeDelta max_timestamp = min_timestamp + max_delta_;
  for (TaskRunnerMap::iterator it = runnable_task_runner.begin();
       it != runnable_task_runner.end(); ++it) {
    (*it).second->ScheduleWork(max_timestamp);
  }
}

void BalancedMediaTaskRunnerFactory::UnregisterMediaTaskRunner(
      const scoped_refptr<BalancedMediaTaskRunner>& media_task_runner) {
  {
    base::AutoLock auto_lock(lock_);
    task_runners_.erase(media_task_runner);
  }
  // After removing one of the task runners some of the other task runners might
  // need to be waken up, if they are no longer blocked by the balancing
  // restrictions with the old stream.
  OnNewTask();
}

}  // namespace media
}  // namespace chromecast