chromium/third_party/google-closure-library/closure/goog/streams/full_impl.js

/**
 * @license
 * Copyright The Closure Library Authors.
 * SPDX-License-Identifier: Apache-2.0
 */

/**
 * @fileoverview A full ponyfill of the ReadableStream native API.
 */
goog.module('goog.streams.fullImpl');

const NativeResolver = goog.require('goog.promise.NativeResolver');
const fullTypes = goog.require('goog.streams.fullTypes');
const liteImpl = goog.require('goog.streams.liteImpl');
const {assert, assertNumber} = goog.require('goog.asserts');

/**
 * @typedef {function(!ReadableStreamDefaultController):
 * (!Promise<undefined>|undefined)}
 */
let StartAlgorithm;

/** @typedef {function(*): !Promise<undefined>} */
let CancelAlgorithm;

/**
 * @typedef {function(!ReadableStreamDefaultController): !Promise<undefined>}
 */
let PullAlgorithm;

/**
 * The implemenation of ReadableStream.
 * @template T
 * @implements {fullTypes.ReadableStream<T>}
 */
class ReadableStream extends liteImpl.ReadableStream {
  /** @package */
  constructor() {
    super();

    /**
     * Returns an AsyncIterator over the ReadableStream.
     * https://streams.spec.whatwg.org/#rs-asynciterator
     * @return {!AsyncIterator<!IIterableResult<T>>}
     */
    this[Symbol.asyncIterator] = this.getIterator;

    /** @package {boolean} */
    this.disturbed = false;
  }

  /**
   * Returns a ReadableStreamDefaultReader that enables reading chunks from
   * the source.
   * https://streams.spec.whatwg.org/#rs-get-reader
   * @return {!ReadableStreamDefaultReader<T>}
   * @override
   */
  getReader() {
    return this.reader = new ReadableStreamDefaultReader(this);
  }

  /**
   * Cancels the ReadableStream with an optional reason.
   * https://streams.spec.whatwg.org/#rs-cancel
   * @param {*} reason
   * @return {!Promise<undefined>}
   * @override
   */
  cancel(reason) {
    if (this.locked) {
      return Promise.reject(new TypeError('Cannot cancel a locked stream'));
    }
    return this.cancelInternal(reason);
  }

  /**
   * Returns an AyncIterator over the ReadableStream.
   *
   * If preventCancel is passed as an option, calling the return() method on the
   * iterator will terminate the iterator, but will not cancel the
   * ReadableStream.
   * https://streams.spec.whatwg.org/#rs-get-iterator
   * @param {{preventCancel: boolean}=} options
   * @return {!AsyncIterator<T>}
   * @override
   */
  getIterator({preventCancel = false} = {}) {
    return new ReadableStreamAsyncIterator(this.getReader(), preventCancel);
  }

  /**
   * Returns an Array with two elements, both new ReadableStreams that contain
   * the same data as this ReadableStream. This stream will become permanently
   * locked.
   * https://streams.spec.whatwg.org/#rs-tee
   * @return {!Array<!ReadableStream>}
   * @override
   */
  tee() {
    const reader = this.getReader();
    let reading = false;
    let canceled1 = false;
    let canceled2 = false;
    let reason1;
    let reason2;
    let branch1;
    let branch2;
    const cancelResolver = new NativeResolver();
    const pullAlgorithm = () => {
      if (reading) {
        return Promise.resolve();
      }
      reading = true;
      reader.read()
          .then(({value, done}) => {
            reading = false;
            if (done) {
              if (!canceled1) {
                branch1.readableStreamController.close();
              }
              if (!canceled2) {
                branch2.readableStreamController.close();
              }
              return;
            }
            if (!canceled1) {
              branch1.readableStreamController.enqueue(value);
            }
            if (!canceled2) {
              branch2.readableStreamController.enqueue(value);
            }
          })
          .catch(() => {});
      return Promise.resolve();
    };
    const cancel1Algorithm = (reason) => {
      canceled1 = true;
      reason1 = reason;
      if (canceled2) {
        const cancelResult = this.cancelInternal([reason1, reason2]);
        cancelResolver.resolve(cancelResult);
      }
      return cancelResolver.promise;
    };
    const cancel2Algorithm = (reason) => {
      canceled2 = true;
      reason2 = reason;
      if (canceled1) {
        const cancelResult = this.cancelInternal([reason1, reason2]);
        cancelResolver.resolve(cancelResult);
      }
      return cancelResolver.promise;
    };
    const startAlgorithm = () => {};
    branch1 = new ReadableStream();
    const controller1 = new ReadableStreamDefaultController(
        branch1, cancel1Algorithm, pullAlgorithm, /* highWaterMark= */ 1,
        /* size= */ undefined);
    branch1.readableStreamController = controller1;
    controller1.start(startAlgorithm);
    branch2 = new ReadableStream();
    const controller2 = new ReadableStreamDefaultController(
        branch2, cancel2Algorithm, pullAlgorithm, /* highWatermark= */ 1,
        /* size= */ undefined);
    branch2.readableStreamController = controller2;
    controller2.start(startAlgorithm);
    reader.closed.catch((reason) => {
      controller1.error(reason);
      controller2.error(reason);
    });
    return [branch1, branch2];
  }

  /**
   * @param {*} reason
   * @return {!Promise<undefined>}
   * @package
   */
  cancelInternal(reason) {
    this.disturbed = true;
    if (this.state === liteImpl.ReadableStream.State.CLOSED) {
      return Promise.resolve();
    }
    if (this.state === liteImpl.ReadableStream.State.ERRORED) {
      return Promise.reject(this.storedError);
    }
    this.close();
    return /** @type {!ReadableStreamDefaultController} */ (
               this.readableStreamController)
        .cancelSteps(reason)
        .then(() => {});
  }
}

/**
 * Creates and returns a new ReadableStream.
 *
 * The underlying source should only have a start() method, and no other
 * properties.
 * @param {!fullTypes.ReadableStreamUnderlyingSource<T>=} underlyingSource
 * @param {!fullTypes.ReadableStreamStrategy<T>=} strategy
 * @return {!ReadableStream<T>}
 * @suppress {strictMissingProperties}
 * @template T
 */
function newReadableStream(underlyingSource = {}, strategy = {}) {
  const verifyObject =
      /** @type {!Object} */ (underlyingSource);
  assert(
      !(verifyObject.type),
      `'type' property not allowed on an underlying source for a ` +
          'liteImpl ReadableStream');
  assert(
      !(verifyObject.autoAllocateChunkSize),
      `'autoAllocateChunkSize' property not allowed on an underlying ` +
          'source for a liteImpl ReadableStream');
  const startAlgorithm = underlyingSource.start ?
      (controller) => underlyingSource.start(controller) :
      () => {};
  const cancelAlgorithm = underlyingSource.cancel ? (reason) => {
    try {
      return Promise.resolve(underlyingSource.cancel(reason));
    } catch (e) {
      return Promise.reject(e);
    }
  } : undefined;
  const pullAlgorithm = underlyingSource.pull ? (controller) => {
    try {
      return Promise.resolve(underlyingSource.pull(controller));
    } catch (e) {
      return Promise.reject(e);
    }
  } : undefined;
  const highWaterMark =
      strategy.highWaterMark === undefined ? 1 : strategy.highWaterMark;
  const sizeAlgorithm = strategy.size ?
      (chunk) => strategy.size.call(undefined, chunk) :
      undefined;
  const stream = new ReadableStream();
  const controller = new ReadableStreamDefaultController(
      stream, cancelAlgorithm, pullAlgorithm, highWaterMark, sizeAlgorithm);
  stream.readableStreamController = controller;
  controller.start(startAlgorithm);
  return stream;
}

/**
 * The DefaultReader for a ReadableStream. Adds cancellation onto the liteImpl
 * DefaultReader.
 * @template T
 * @implements {fullTypes.ReadableStreamDefaultReader<T>}
 */
class ReadableStreamDefaultReader extends liteImpl.ReadableStreamDefaultReader {
  /**
   * Cancels the ReadableStream with an optional reason.
   * https://streams.spec.whatwg.org/#default-reader-cancel
   * @param {*} reason
   * @return {!Promise<undefined>}
   * @override
   */
  cancel(reason) {
    if (!this.ownerReadableStream) {
      return Promise.reject(new TypeError(
          'This readable stream reader has been released and cannot be used ' +
          'to cancel its previous owner stream'));
    }
    return /** @type {!ReadableStream} */ (this.ownerReadableStream)
        .cancelInternal(reason);
  }
}

/**
 * @template T
 * @implements {fullTypes.ReadableStreamAsyncIterator<T>}
 */
class ReadableStreamAsyncIterator {
  /**
   * @param {!ReadableStreamDefaultReader<T>} asyncIteratorReader
   * @param {boolean} preventCancel
   * @package
   */
  constructor(asyncIteratorReader, preventCancel) {
    /** @package @const {!ReadableStreamDefaultReader<T>} */
    this.asyncIteratorReader = asyncIteratorReader;

    /** @package @const {boolean} */
    this.preventCancel = preventCancel;
  }

  /**
   * Gets the next value from the ReadableStream.
   * https://streams.spec.whatwg.org/#rs-asynciterator-prototype-next
   * @override
   */
  next() {
    if (!this.asyncIteratorReader.ownerReadableStream) {
      return Promise.reject(
          new TypeError('There is no more data left in the ReadableStream'));
    }
    return this.asyncIteratorReader.read().then(({value, done}) => {
      if (done) {
        this.asyncIteratorReader.release();
      }
      return {value, done};
    });
  }

  /**
   * Cancels the underlying stream and resolves with the value.
   * @param {*} value
   * @return {!Promise<!IIterableResult<T>>}
   * @override
   */
  return(value) {
    if (!this.asyncIteratorReader.ownerReadableStream) {
      return Promise.reject(
          new TypeError('There is no more data left in the ReadableStream'));
    }
    if (this.asyncIteratorReader.readRequests.length) {
      return Promise.reject(new TypeError(
          'There are pending read requests in the ReadableStream'));
    }
    if (!this.preventCancel) {
      const result = this.asyncIteratorReader.cancel(value);
      this.asyncIteratorReader.release();
      return result.then(() => ({done: true, value}));
    }
    this.asyncIteratorReader.release();
    return Promise.resolve({done: true, value});
  }
}

/**
 * The controller for a ReadableStream. Adds cancellation and backpressure onto
 * the liteImpl DefaultController.
 * @template T
 * @implements {fullTypes.ReadableStreamDefaultController}
 */
class ReadableStreamDefaultController extends
    liteImpl.ReadableStreamDefaultController {
  /**
   * @param {!ReadableStream} stream
   * @param {!CancelAlgorithm|undefined} cancelAlgorithm
   * @param {!PullAlgorithm|undefined} pullAlgorithm
   * @param {number} strategyHWM
   * @param {(function(T): number)|undefined} strategySizeAlgorithm
   * @package
   */
  constructor(
      stream, cancelAlgorithm, pullAlgorithm, strategyHWM,
      strategySizeAlgorithm) {
    super(stream);

    /** @private {!CancelAlgorithm|undefined} */
    this.cancelAlgorithm_ = cancelAlgorithm;

    /** @private {boolean} */
    this.pullAgain_ = false;

    /** @private {!PullAlgorithm|undefined} */
    this.pullAlgorithm_ = pullAlgorithm;

    /** @private {boolean} */
    this.pulling_ = false;

    /** @private {number} */
    this.queueTotalSize_ = 0;

    /** @private {boolean} */
    this.started_ = false;

    /** @private @const {number} */
    this.strategyHWM_ = strategyHWM;

    /** @private {(function(T): number)|undefined} */
    this.strategySizeAlgorithm_ = strategySizeAlgorithm;

    /** @private @const {!QueueWithSizes<T>} */
    this.queueWithSizes_ = new QueueWithSizes(this.queue);
  }

  /**
   * Returns the desired size to fill the controlled stream's internal queue. It
   * can be negative if the queue is full.
   * https://streams.spec.whatwg.org/#rs-default-controller-desired-size
   * @return {?number}
   * @override
   */
  get desiredSize() {
    return this.getDesiredSize_();
  }

  /** @override */
  started() {
    this.started_ = true;
    this.callPullIfNeeded();
  }

  /** @override */
  callPullIfNeeded() {
    if (!this.pullAlgorithm_ || !this.shouldCallPull_()) {
      return;
    }
    if (this.pulling_) {
      this.pullAgain_ = true;
      return;
    }
    this.pulling_ = true;
    this.pullAlgorithm_(this).then(
        () => {
          this.pulling_ = false;
          if (this.pullAgain_) {
            this.pullAgain_ = false;
            this.callPullIfNeeded();
          }
        },
        (error) => {
          this.error(error);
        });
  }

  /**
   * @return {boolean}
   * @private
   */
  shouldCallPull_() {
    if (!this.canCloseOrEnqueue()) {
      return false;
    }
    if (!this.started_) {
      return false;
    }
    if (this.controlledReadableStream.locked &&
        this.controlledReadableStream.getNumReadRequests() > 0) {
      return true;
    }
    return assertNumber(this.getDesiredSize_()) > 0;
  }

  /** @override */
  clearAlgorithms() {
    this.cancelAlgorithm_ = undefined;
    this.pullAlgorithm_ = undefined;
    this.strategySizeAlgorithm_ = undefined;
  }

  /**
   * @param {*} reason
   * @return {!Promise<*>}
   * @package
   */
  cancelSteps(reason) {
    this.queue.resetQueue();
    const cancelResult = this.cancelAlgorithm_ ? this.cancelAlgorithm_(reason) :
                                                 Promise.resolve();
    this.clearAlgorithms();
    return cancelResult;
  }

  /** @override */
  enqueueIntoQueue(chunk) {
    let size;
    try {
      // Default to size of 1 if no algorithm is specified.
      size = Number(
          this.strategySizeAlgorithm_ ? this.strategySizeAlgorithm_(chunk) : 1);
    } catch (e) {
      this.error(e);
      throw e;
    }
    if (typeof size !== 'number' || Number.isNaN(size) || size < 0 ||
        size === Infinity) {
      throw new RangeError(
          `The return value of a queuing strategy's size function must be a` +
          ' finite, non-NaN, non-negative number');
    }
    this.queueTotalSize_ += size;
    this.queueWithSizes_.enqueueValueWithSize(chunk, size);
  }

  /** @override */
  dequeueFromQueue() {
    const {value, size} = this.queueWithSizes_.dequeueValueWithSize();
    this.queueTotalSize_ -= size;
    if (this.queueTotalSize_ < 0) {
      // This might be less than zero due to rounding errors.
      this.queueTotalSize_ = 0;
    }
    return value;
  }

  /** @override */
  resetQueue() {
    this.queueWithSizes_.resetQueue();
  }

  /**
   * @return {?number}
   * @private
   */
  getDesiredSize_() {
    if (this.controlledReadableStream.state ===
        liteImpl.ReadableStream.State.ERRORED) {
      return null;
    }
    if (this.controlledReadableStream.state ===
        liteImpl.ReadableStream.State.CLOSED) {
      return 0;
    }
    return this.strategyHWM_ - this.queueTotalSize_;
  }
}

/**
 * An internal Queue representation that wraps a queue and has a size associated
 * with each chunk.
 * @template T
 * @package
 */
class QueueWithSizes {
  /**
   * @param {!liteImpl.Queue} queue
   */
  constructor(queue) {
    /**
     * @private @const {!liteImpl.Queue}
     */
    this.queue_ = queue;

    /**
     * @private {!Array<number>}
     */
    this.sizes_ = [];
  }

  /**
   * @param {T} chunk
   * @param {number} size
   */
  enqueueValueWithSize(chunk, size) {
    this.queue_.enqueueValue(chunk);
    this.sizes_.push(size);
  }

  /**
   * @return {{value: T, size: number}}
   */
  dequeueValueWithSize() {
    return {
      value: this.queue_.dequeueValue(),
      size: this.sizes_.shift(),
    };
  }

  /**
   * @return {void}
   */
  resetQueue() {
    this.queue_.resetQueue();
    this.sizes_ = [];
  }
}

exports = {
  ReadableStream,
  ReadableStreamAsyncIterator,
  ReadableStreamDefaultController,
  ReadableStreamDefaultReader,
  newReadableStream,
};