chromium/ui/file_manager/file_manager/common/js/async_util.ts

// Copyright 2013 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

export type Task = (callback: VoidCallback) => void;

/**
 * Creates a class for executing several asynchronous closures in a fifo queue.
 * Added tasks will be started in order they were added. Tasks are run
 * concurrently. At most, |limit| jobs will be run at the same time.
 */
export class ConcurrentQueue {
  private readonly added_: Task[] = [];
  private readonly running_: Task[] = [];
  private cancelled_ = false;

  /** @param limit_ The number of tasks to run at the same time. */
  constructor(private limit_: number) {
    console.assert(this.limit_ > 0, 'limit_ must be larger than 0');
  }

  /** @return whether a task is running. */
  isRunning(): boolean {
    return this.running_.length !== 0;
  }

  /** @return the number of waiting tasks. */
  getWaitingTasksCount(): number {
    return this.added_.length;
  }

  /** @return the number of running tasks. */
  getRunningTasksCount(): number {
    return this.running_.length;
  }

  /**
   * Enqueues a task for running as soon as possible. If there is already the
   * maximum number of tasks running, the run of this task is delayed until less
   * than the limit given at the construction time of tasks are running.
   */
  run(task: Task) {
    if (this.cancelled_) {
      console.warn('Cannot add a new task: Queue is cancelled');
    } else {
      this.added_.push(task);
      this.scheduleNext_();
    }
  }

  /**
   * Cancels the queue. It removes all the not-run (yet) tasks. Note that this
   * does NOT stop tasks currently running.
   */
  cancel() {
    this.cancelled_ = true;
    this.added_.length = 0;
  }

  /** @return whether the queue is cancelling or is already cancelled. */
  isCanceled(): boolean {
    return this.cancelled_;
  }

  /**
   * Attempts to run another tasks. If there is less than the maximum number
   * of task running, it immediately executes the task at the front of
   * the queue.
   */
  private maybeExecute_() {
    if (this.added_.length > 0) {
      if (this.running_.length < this.limit_) {
        this.execute_(this.added_.shift()!);
      }
    }
  }

  /**
   * Executes the given `task`. The task is placed in the list of running tasks
   * and immediately executed.
   */
  private execute_(task: Task) {
    this.running_.push(task);
    try {
      task(this.onTaskFinished_.bind(this, task));
      // If the task executes successfully, it calls the callback, where we
      // schedule a next run.
    } catch (e) {
      console.warn('Cannot execute a task', e);
      // If the task fails we call the callback explicitly.
      this.onTaskFinished_(task);
    }
  }

  /** Handles a task being finished. */
  private onTaskFinished_(task: Task) {
    this.removeTask_(task);
    this.scheduleNext_();
  }

  /** Attempts to remove the task that was running. */
  private removeTask_(task: Task) {
    const index = this.running_.indexOf(task);
    if (index >= 0) {
      this.running_.splice(index, 1);
    } else {
      console.warn('Cannot find a finished task among the running ones');
    }
  }

  /**
   * Schedules the next attempt at execution of the task at the front of
   * the queue.
   */
  private scheduleNext_() {
    // TODO(1350885): Use setTimeout(()=>{this.maybeExecute();});
    this.maybeExecute_();
  }

  /** @return a string representation of the instance. */
  toString(): string {
    return 'ConcurrentQueue\n' +
        '- WaitingTasksCount: ' + this.getWaitingTasksCount() + '\n' +
        '- RunningTasksCount: ' + this.getRunningTasksCount() + '\n' +
        '- isCanceled: ' + this.isCanceled();
  }
}

/**
 * Creates a class for executing several asynchronous closures in a fifo queue.
 * Added tasks will be executed sequentially in order they were added.
 */
export class AsyncQueue extends ConcurrentQueue {
  constructor() {
    super(1);
  }

  /**
   * Starts a task gated by this concurrent queue.
   * Typical usage:
   *
   *   const unlock = await queue.lock();
   *   try {
   *     // Operations of the task.
   *     ...
   *   } finally {
   *     unlock();
   *   }
   *
   * @return Completion callback to run when finished.
   */
  async lock(): Promise<VoidCallback> {
    return new Promise(resolve => this.run(unlock => resolve(unlock)));
  }
}

/** A task which is executed by Group. */
export class GroupTask {
  /**
   * @param closure Closure with a completion callback to be executed.
   * @param dependencies Array of dependencies.
   * @param name Task identifier. Specify to use in dependencies.
   */
  constructor(
      public readonly closure: Task, public readonly dependencies: string[],
      public readonly name: string) {}

  /** @return a string representation of the instance. */
  toString(): string {
    return 'GroupTask\n' +
        '- name: ' + this.name + '\n' +
        '- dependencies: ' + this.dependencies.join();
  }
}

export type GroupTasks = Record<string, GroupTask>;

/**
 * Creates a class for executing several asynchronous closures in a group in a
 * dependency order.
 */
export class Group {
  private readonly addedTasks_: GroupTasks = {};
  private readonly pendingTasks_: GroupTasks = {};
  private readonly finishedTasks_: GroupTasks = {};
  private readonly completionCallbacks_: VoidCallback[] = [];

  /** @return the pending tasks. */
  get pendingTasks(): GroupTasks {
    return this.pendingTasks_;
  }

  /**
   * Enqueues a closure to be executed after dependencies are completed.
   *
   * @param closure Closure with a completion callback to be executed.
   * @param dependencies Array of dependencies. If no dependencies, then the
   *     the closure will be executed immediately.
   * @param maybeName Task identifier. Specify to use in dependencies.
   */
  add(closure: Task, dependencies: string[] = [], maybeName?: string) {
    const name =
        maybeName || (`(unnamed#${Object.keys(this.addedTasks_).length + 1})`);
    const task = new GroupTask(closure, dependencies, name);
    this.addedTasks_[name] = task;
    this.pendingTasks_[name] = task;
  }

  /**
   * Runs the enqueued closure in order of dependencies.
   * @param onCompletion Completion callback.
   */
  run(onCompletion?: VoidCallback) {
    if (onCompletion) {
      this.completionCallbacks_.push(onCompletion);
    }
    this.continue_();
  }

  /** Runs enqueued pending tasks whose dependencies are completed. */
  private continue_() {
    // If all of the added tasks have finished, then call completion callbacks.
    if (Object.keys(this.addedTasks_).length ===
        Object.keys(this.finishedTasks_).length) {
      for (const callback of this.completionCallbacks_) {
        callback();
      }
      this.completionCallbacks_.length = 0;
      return;
    }

    for (const name in this.pendingTasks_) {
      const task = this.pendingTasks_[name]!;
      let dependencyMissing = false;
      for (const dependency of task.dependencies) {
        // Check if the dependency has finished.
        if (!this.finishedTasks_[dependency]) {
          dependencyMissing = true;
        }
      }
      // All dependences finished, therefore start the task.
      if (!dependencyMissing) {
        delete this.pendingTasks_[task.name];
        task.closure(this.finish_.bind(this, task));
      }
    }
  }

  /** Finishes the passed task and continues executing enqueued closures. */
  private finish_(task: GroupTask) {
    this.finishedTasks_[task.name] = task;
    this.continue_();
  }
}

/**
 * Aggregates consecutive calls and executes the closure only once instead of
 * several times. The first call is always called immediately, and the next
 * consecutive ones are aggregated and the closure is called only once once
 * |delay| amount of time passes after the last call to run().
 */
export class Aggregator {
  private scheduledRunsTimer_: number|null = null;
  private lastRunTime_ = 0;

  /**
   * @param closure_ Closure to be aggregated.
   * @param delay_ Minimum aggregation time in milliseconds.
   */
  constructor(private closure_: VoidCallback, private delay_: number = 50) {}

  /**
   * Runs a closure. Skips consecutive calls. The first call is called
   * immediately.
   */
  run() {
    // If recently called, then schedule the consecutive call with a delay.
    if (Date.now() - this.lastRunTime_ < this.delay_) {
      this.cancelScheduledRuns_();
      this.scheduledRunsTimer_ =
          setTimeout(this.runImmediately_.bind(this), this.delay_ + 1);
      this.lastRunTime_ = Date.now();
      return;
    }

    // Otherwise, run immediately.
    this.runImmediately_();
  }

  /** Calls the schedule immediately and cancels any scheduled calls. */
  private runImmediately_() {
    this.cancelScheduledRuns_();
    this.closure_();
    this.lastRunTime_ = Date.now();
  }

  /** Cancels all scheduled runs (if any). */
  private cancelScheduledRuns_() {
    if (this.scheduledRunsTimer_) {
      clearTimeout(this.scheduledRunsTimer_);
      this.scheduledRunsTimer_ = null;
    }
  }
}

/**
 * Samples calls so that they are not called too frequently. The first call is
 * always called immediately, and the following calls may be skipped or delayed
 * to keep each interval no less than `minInterval_`.
 */
export class RateLimiter {
  private scheduledRunsTimer_ = 0;

  /** Last time the closure is called. */
  private lastRunTime_ = 0;

  /**
   * @param closure_ Closure to be called.
   * @param minInterval_ Minimum interval between each call in milliseconds.
   */
  constructor(
      private closure_: VoidCallback, private minInterval_: number = 200) {}

  /**
   * Requests to run the closure. Skips or delays calls so that the intervals
   * between calls are no less than `minInterval_` milliseconds.
   */
  run() {
    const now = Date.now();
    // If |minInterval| has not passed since the closure is run, skips or delays
    // this run.
    if (now - this.lastRunTime_ < this.minInterval_) {
      // Delays this run only when there is no scheduled run.
      // Otherwise, simply skip this run.
      if (!this.scheduledRunsTimer_) {
        this.scheduledRunsTimer_ = setTimeout(
            this.runImmediately.bind(this),
            this.lastRunTime_ + this.minInterval_ - now);
      }
      return;
    }

    // Otherwise, run immediately
    this.runImmediately();
  }

  /** Calls the scheduled run immediately and cancels any scheduled calls. */
  runImmediately() {
    this.cancelScheduledRuns_();
    this.lastRunTime_ = Date.now();
    this.closure_();
  }

  /** Cancels all scheduled runs (if any). */
  private cancelScheduledRuns_() {
    if (this.scheduledRunsTimer_) {
      clearTimeout(this.scheduledRunsTimer_);
      this.scheduledRunsTimer_ = 0;
    }
  }
}