chromium/third_party/google-closure-library/closure/goog/streams/lite_impl.js

/**
 * @license
 * Copyright The Closure Library Authors.
 * SPDX-License-Identifier: Apache-2.0
 */

/**
 * @fileoverview A lite polyfill of the ReadableStream native API with a subset
 * of methods supported.
 */
goog.module('goog.streams.liteImpl');

const NativeResolver = goog.require('goog.promise.NativeResolver');
const liteTypes = goog.require('goog.streams.liteTypes');
const {assert, assertFunction} = goog.require('goog.asserts');

/**
 * The lite implementation of ReadableStream.
 *
 * Supports the getReader() method and locked property.
 *
 * The only method of underlying sources that is supported is enqueueing,
 * closing, and erroring.
 *
 * Pulling (including backpressure and sizes) and cancellation are not
 * supported.
 * @template T
 * @implements {liteTypes.ReadableStream<T>}
 */
class ReadableStream {
  /** @package */
  constructor() {
    /** @package {!ReadableStream.State} */
    this.state = ReadableStream.State.READABLE;

    /**  @package {!ReadableStreamDefaultReader|undefined} */
    this.reader = undefined;

    /** @type {*} */
    this.storedError = undefined;

    /** @package {!ReadableStreamDefaultController} */
    this.readableStreamController;
  }

  /**
   * Returns true if the ReadableStream has been locked to a reader.
   * https://streams.spec.whatwg.org/#rs-locked
   * @return {boolean}
   * @override
   */
  get locked() {
    return this.reader !== undefined;
  }

  /**
   * Returns a ReadableStreamDefaultReader that enables reading chunks from
   * the source.
   * https://streams.spec.whatwg.org/#rs-get-reader
   * @return {!ReadableStreamDefaultReader<T>}
   * @override
   */
  getReader() {
    return this.reader = new ReadableStreamDefaultReader(this);
  }

  /**
   * @return {!Promise<!IIterableResult<T>>}
   * @package
   */
  addReadRequest() {
    const request = new NativeResolver();
    this.reader.readRequests.push(request);
    return request.promise;
  }

  /** @package */
  close() {
    this.state = ReadableStream.State.CLOSED;
    if (!this.reader) {
      return;
    }
    for (const readRequest of this.reader.readRequests) {
      readRequest.resolve({value: undefined, done: true});
    }
    this.reader.readRequests = [];
    this.reader.closedResolver.resolve();
  }

  /**
   * @param {*} e
   * @package
   */
  error(e) {
    this.state = ReadableStream.State.ERRORED;
    this.storedError = e;
    if (!this.reader) {
      return;
    }
    for (const readRequest of this.reader.readRequests) {
      readRequest.reject(e);
    }
    this.reader.readRequests = [];
    this.reader.closedResolver.promise.catch(() => {});
    this.reader.closedResolver.reject(e);
  }

  /**
   * @param {T} chunk
   * @param {boolean} done
   * @package
   */
  fulfillReadRequest(chunk, done) {
    const readRequest = assert(this.reader).readRequests.shift();
    readRequest.resolve({value: chunk, done});
  }

  /**
   * @return {number}
   * @package
   */
  getNumReadRequests() {
    return assert(this.reader).readRequests.length;
  }

  /**
   * @return {boolean}
   * @package
   */
  hasDefaultReader() {
    return this.reader !== undefined;
  }
}

/** @package @enum {number} */
ReadableStream.State = {
  READABLE: 1,
  CLOSED: 2,
  ERRORED: 3,
};

/**
 * Creates and returns a new ReadableStream.
 *
 * The underlying source should only have a start() method, and no other
 * properties.
 * @param {!liteTypes.ReadableStreamUnderlyingSource<T>} underlyingSource
 * @return {!ReadableStream<T>}
 * @suppress {strictMissingProperties}
 * @template T
 */
function newReadableStream(underlyingSource) {
  assertFunction(
      underlyingSource.start,
      `'start' property must be a function on an underlying source for a ` +
          'lite ReadableStream');
  const verifyObject =
      /** @type {!Object} */ (underlyingSource);
  assert(
      !(verifyObject.pull),
      `'pull' property not allowed on an underlying source for a ` +
          'lite ReadableStream');
  assert(
      !(verifyObject.cancel),
      `'cancel' property not allowed on an underlying source for a ` +
          'lite ReadableStream');
  assert(
      !(verifyObject.type),
      `'type' property not allowed on an underlying source for a ` +
          'lite ReadableStream');
  assert(
      !(verifyObject.autoAllocateChunkSize),
      `'autoAllocateChunkSize' property not allowed on an underlying ` +
          'source for a lite ReadableStream');
  const startAlgorithm = (controller) => underlyingSource.start(controller);
  const stream = new ReadableStream();
  const controller = new ReadableStreamDefaultController(stream);
  stream.readableStreamController = controller;
  controller.start(startAlgorithm);
  return stream;
}

/**
 * A reader for a lite ReadableStream.
 *
 * Supports the read() and releaseLock() methods, along with the closed
 * property.
 * @template T
 * @implements {liteTypes.ReadableStreamDefaultReader<T>}
 */
class ReadableStreamDefaultReader {
  /**
   * @param {!ReadableStream} stream
   * @package
   */
  constructor(stream) {
    if (stream.reader) {
      throw new TypeError(
          'ReadableStreamReader constructor can only accept readable streams ' +
          'that are not yet locked to a reader');
    }
    /** @package {!ReadableStream|undefined} */
    this.ownerReadableStream = stream;

    /** @package {!NativeResolver<undefined>} */
    this.closedResolver = new NativeResolver();

    /** @package {!Array<!NativeResolver<!IIterableResult<T>>>} */
    this.readRequests = [];

    if (stream.state === ReadableStream.State.CLOSED) {
      this.closedResolver.resolve();
    } else if (stream.state === ReadableStream.State.ERRORED) {
      this.closedResolver.promise.catch(() => {});
      this.closedResolver.reject(stream.storedError);
    }
  }

  /**
   * Returns a Promise that resolves when the Stream closes or is errored, or if
   * the reader releases its lock.
   * https://streams.spec.whatwg.org/#default-reader-closed
   * @return {!Promise<undefined>}
   * @override
   */
  get closed() {
    return this.closedResolver.promise;
  }

  /**
   * Returns a Promise that resolves with an IIterableResult providing the next
   * chunk or that the stream is closed. The Promise may reject if the stream
   * is errored.
   * https://streams.spec.whatwg.org/#default-reader-read
   * @return {!Promise<!IIterableResult<T>>}
   * @override
   */
  read() {
    if (!this.ownerReadableStream) {
      throw new TypeError(
          'This readable stream reader has been released and cannot be used ' +
          'to read from its previous owner stream');
    }
    return this.readInternal();
  }

  /**
   * Release the lock on the stream. Any further calls to read() will error,
   * and the stream can create another reader.
   * https://streams.spec.whatwg.org/#default-reader-release-lock
   * @return {void}
   * @override
   */
  releaseLock() {
    if (!this.ownerReadableStream) {
      return;
    }
    if (this.readRequests.length) {
      throw new TypeError(
          'Cannot release a readable stream reader when it still has ' +
          'outstanding read() calls that have not yet settled');
    }
    this.release();
  }

  /** @package */
  release() {
    const stream = assert(this.ownerReadableStream);
    const e = new TypeError(
        'This readable stream reader has been released and cannot be used ' +
        `to monitor the stream's state`);
    if (stream.state === ReadableStream.State.READABLE) {
      this.closedResolver.promise.catch(() => {});
      this.closedResolver.reject(e);
    } else {
      this.closedResolver = new NativeResolver();
      this.closedResolver.promise.catch(() => {});
      this.closedResolver.reject(e);
    }
    stream.reader = undefined;
    this.ownerReadableStream = undefined;
  }

  /**
   * @return {!Promise<!IIterableResult<T>>}
   * @package
   */
  readInternal() {
    const stream = assert(this.ownerReadableStream);
    if (stream.state === ReadableStream.State.CLOSED) {
      return Promise.resolve({value: undefined, done: true});
    }
    if (stream.state === ReadableStream.State.ERRORED) {
      return Promise.reject(stream.storedError);
    }
    return stream.readableStreamController.pullSteps();
  }
}

/**
 * A controller for a lite ReadableStream.
 *
 * Provides the enqueue(), error(), and close() methods.
 * @template T
 * @implements {liteTypes.ReadableStreamDefaultController<T>}
 */
class ReadableStreamDefaultController {
  /**
   * @param {!ReadableStream} stream
   * @package
   */
  constructor(stream) {
    /** @package @const {!ReadableStream} */
    this.controlledReadableStream = stream;

    /** @package @const {!Queue} */
    this.queue = new Queue();

    /** @package {boolean} */
    this.closeRequested = false;
  }

  /**
   * Signals that the ReadableStream should close. The ReadableStream will
   * actually close once all of its chunks have been read.
   * https://streams.spec.whatwg.org/#rs-default-controller-close
   * @return {void}
   * @override
   */
  close() {
    if (!this.canCloseOrEnqueue()) {
      throw new TypeError(
          'Cannot close a readable stream that has already been requested to ' +
          'be closed');
    }
    this.closeInternal();
  }

  /**
   * Enqueues a new chunk into the stream that can be read.
   * https://streams.spec.whatwg.org/#rs-default-controller-enqueue
   * @param {T} chunk
   * @override
   */
  enqueue(chunk) {
    if (!this.canCloseOrEnqueue()) {
      throw new TypeError(
          'Cannot enqueue a readable stream that has already been requested ' +
          'to be closed');
    }
    this.enqueueInternal(chunk);
  }

  /**
   * Closes the stream with an error. Any future interactions with the
   * controller will throw an error.
   * https://streams.spec.whatwg.org/#rs-default-controller-error
   * @param {*} e
   * @override
   */
  error(e) {
    this.errorInternal(e);
  }

  /**
   * @param {function(!ReadableStreamDefaultController):
   *     (!Promise<undefined>|undefined)} startAlgorithm
   * @package
   */
  start(startAlgorithm) {
    Promise.resolve(startAlgorithm(this))
        .then(
            () => {
              this.started();
            },
            (e) => {
              this.errorInternal(e);
            });
  }

  /**
   * @return {!Promise<!IIterableResult<T>>}
   * @package
   */
  pullSteps() {
    if (!this.queue.empty()) {
      const chunk = this.dequeueFromQueue();
      if (this.closeRequested && this.queue.empty()) {
        this.clearAlgorithms();
        this.controlledReadableStream.close();
      } else {
        this.callPullIfNeeded();
      }
      return Promise.resolve({value: chunk, done: false});
    }
    const promise = this.controlledReadableStream.addReadRequest();
    this.callPullIfNeeded();
    return promise;
  }

  /** @package */
  started() {}

  /** @package */
  callPullIfNeeded() {}

  /** @package */
  clearAlgorithms() {}

  /**
   * @package
   */
  closeInternal() {
    this.closeRequested = true;
    if (this.queue.empty()) {
      this.clearAlgorithms();
      this.controlledReadableStream.close();
    }
  }

  /**
   * @param {T} chunk
   * @package
   */
  enqueueInternal(chunk) {
    if (this.controlledReadableStream.locked &&
        this.controlledReadableStream.getNumReadRequests() > 0) {
      this.controlledReadableStream.fulfillReadRequest(
          chunk, /* done= */ false);
      return;
    }
    this.enqueueIntoQueue(chunk);
  }

  /**
   * @param {*} e
   * @package
   */
  errorInternal(e) {
    if (this.controlledReadableStream.state !== ReadableStream.State.READABLE) {
      return;
    }
    this.resetQueue();
    this.clearAlgorithms();
    this.controlledReadableStream.error(e);
  }

  /**
   * @return {boolean}
   * @package
   */
  canCloseOrEnqueue() {
    return !this.closeRequested &&
        this.controlledReadableStream.state === ReadableStream.State.READABLE;
  }

  /**
   * @param {T} chunk
   * @protected
   */
  enqueueIntoQueue(chunk) {
    this.queue.enqueueValue(chunk);
  }

  /**
   * @return {T}
   * @protected
   */
  dequeueFromQueue() {
    return this.queue.dequeueValue();
  }

  /**
   * @protected
   */
  resetQueue() {
    this.queue.resetQueue();
  }
}

/**
 * An internal Queue representation. This simple Queue just wraps an Array.
 * Other implementations may also have a size associated with each element.
 * @template T
 * @package
 */
class Queue {
  constructor() {
    /** @private {!Array<T>} */
    this.queue_ = [];
  }

  /**
   * @return {boolean}
   */
  empty() {
    return this.queue_.length === 0;
  }

  /**
   * @param {T} value
   */
  enqueueValue(value) {
    this.queue_.push(value);
  }

  /**
   * @return {T}
   */
  dequeueValue() {
    return this.queue_.shift();
  }

  /**
   * @return {void}
   */
  resetQueue() {
    this.queue_ = [];
  }
}

exports = {
  Queue,
  ReadableStream,
  ReadableStreamDefaultController,
  ReadableStreamDefaultReader,
  newReadableStream,
};