/**
* @license
* Copyright The Closure Library Authors.
* SPDX-License-Identifier: Apache-2.0
*/
/**
* @fileoverview A full ponyfill of the ReadableStream native API.
*/
goog.module('goog.streams.fullImpl');
const NativeResolver = goog.require('goog.promise.NativeResolver');
const fullTypes = goog.require('goog.streams.fullTypes');
const liteImpl = goog.require('goog.streams.liteImpl');
const {assert, assertNumber} = goog.require('goog.asserts');
/**
* @typedef {function(!ReadableStreamDefaultController):
* (!Promise<undefined>|undefined)}
*/
let StartAlgorithm;
/** @typedef {function(*): !Promise<undefined>} */
let CancelAlgorithm;
/**
* @typedef {function(!ReadableStreamDefaultController): !Promise<undefined>}
*/
let PullAlgorithm;
/**
* The implemenation of ReadableStream.
* @template T
* @implements {fullTypes.ReadableStream<T>}
*/
class ReadableStream extends liteImpl.ReadableStream {
/** @package */
constructor() {
super();
/**
* Returns an AsyncIterator over the ReadableStream.
* https://streams.spec.whatwg.org/#rs-asynciterator
* @return {!AsyncIterator<!IIterableResult<T>>}
*/
this[Symbol.asyncIterator] = this.getIterator;
/** @package {boolean} */
this.disturbed = false;
}
/**
* Returns a ReadableStreamDefaultReader that enables reading chunks from
* the source.
* https://streams.spec.whatwg.org/#rs-get-reader
* @return {!ReadableStreamDefaultReader<T>}
* @override
*/
getReader() {
return this.reader = new ReadableStreamDefaultReader(this);
}
/**
* Cancels the ReadableStream with an optional reason.
* https://streams.spec.whatwg.org/#rs-cancel
* @param {*} reason
* @return {!Promise<undefined>}
* @override
*/
cancel(reason) {
if (this.locked) {
return Promise.reject(new TypeError('Cannot cancel a locked stream'));
}
return this.cancelInternal(reason);
}
/**
* Returns an AyncIterator over the ReadableStream.
*
* If preventCancel is passed as an option, calling the return() method on the
* iterator will terminate the iterator, but will not cancel the
* ReadableStream.
* https://streams.spec.whatwg.org/#rs-get-iterator
* @param {{preventCancel: boolean}=} options
* @return {!AsyncIterator<T>}
* @override
*/
getIterator({preventCancel = false} = {}) {
return new ReadableStreamAsyncIterator(this.getReader(), preventCancel);
}
/**
* Returns an Array with two elements, both new ReadableStreams that contain
* the same data as this ReadableStream. This stream will become permanently
* locked.
* https://streams.spec.whatwg.org/#rs-tee
* @return {!Array<!ReadableStream>}
* @override
*/
tee() {
const reader = this.getReader();
let reading = false;
let canceled1 = false;
let canceled2 = false;
let reason1;
let reason2;
let branch1;
let branch2;
const cancelResolver = new NativeResolver();
const pullAlgorithm = () => {
if (reading) {
return Promise.resolve();
}
reading = true;
reader.read()
.then(({value, done}) => {
reading = false;
if (done) {
if (!canceled1) {
branch1.readableStreamController.close();
}
if (!canceled2) {
branch2.readableStreamController.close();
}
return;
}
if (!canceled1) {
branch1.readableStreamController.enqueue(value);
}
if (!canceled2) {
branch2.readableStreamController.enqueue(value);
}
})
.catch(() => {});
return Promise.resolve();
};
const cancel1Algorithm = (reason) => {
canceled1 = true;
reason1 = reason;
if (canceled2) {
const cancelResult = this.cancelInternal([reason1, reason2]);
cancelResolver.resolve(cancelResult);
}
return cancelResolver.promise;
};
const cancel2Algorithm = (reason) => {
canceled2 = true;
reason2 = reason;
if (canceled1) {
const cancelResult = this.cancelInternal([reason1, reason2]);
cancelResolver.resolve(cancelResult);
}
return cancelResolver.promise;
};
const startAlgorithm = () => {};
branch1 = new ReadableStream();
const controller1 = new ReadableStreamDefaultController(
branch1, cancel1Algorithm, pullAlgorithm, /* highWaterMark= */ 1,
/* size= */ undefined);
branch1.readableStreamController = controller1;
controller1.start(startAlgorithm);
branch2 = new ReadableStream();
const controller2 = new ReadableStreamDefaultController(
branch2, cancel2Algorithm, pullAlgorithm, /* highWatermark= */ 1,
/* size= */ undefined);
branch2.readableStreamController = controller2;
controller2.start(startAlgorithm);
reader.closed.catch((reason) => {
controller1.error(reason);
controller2.error(reason);
});
return [branch1, branch2];
}
/**
* @param {*} reason
* @return {!Promise<undefined>}
* @package
*/
cancelInternal(reason) {
this.disturbed = true;
if (this.state === liteImpl.ReadableStream.State.CLOSED) {
return Promise.resolve();
}
if (this.state === liteImpl.ReadableStream.State.ERRORED) {
return Promise.reject(this.storedError);
}
this.close();
return /** @type {!ReadableStreamDefaultController} */ (
this.readableStreamController)
.cancelSteps(reason)
.then(() => {});
}
}
/**
* Creates and returns a new ReadableStream.
*
* The underlying source should only have a start() method, and no other
* properties.
* @param {!fullTypes.ReadableStreamUnderlyingSource<T>=} underlyingSource
* @param {!fullTypes.ReadableStreamStrategy<T>=} strategy
* @return {!ReadableStream<T>}
* @suppress {strictMissingProperties}
* @template T
*/
function newReadableStream(underlyingSource = {}, strategy = {}) {
const verifyObject =
/** @type {!Object} */ (underlyingSource);
assert(
!(verifyObject.type),
`'type' property not allowed on an underlying source for a ` +
'liteImpl ReadableStream');
assert(
!(verifyObject.autoAllocateChunkSize),
`'autoAllocateChunkSize' property not allowed on an underlying ` +
'source for a liteImpl ReadableStream');
const startAlgorithm = underlyingSource.start ?
(controller) => underlyingSource.start(controller) :
() => {};
const cancelAlgorithm = underlyingSource.cancel ? (reason) => {
try {
return Promise.resolve(underlyingSource.cancel(reason));
} catch (e) {
return Promise.reject(e);
}
} : undefined;
const pullAlgorithm = underlyingSource.pull ? (controller) => {
try {
return Promise.resolve(underlyingSource.pull(controller));
} catch (e) {
return Promise.reject(e);
}
} : undefined;
const highWaterMark =
strategy.highWaterMark === undefined ? 1 : strategy.highWaterMark;
const sizeAlgorithm = strategy.size ?
(chunk) => strategy.size.call(undefined, chunk) :
undefined;
const stream = new ReadableStream();
const controller = new ReadableStreamDefaultController(
stream, cancelAlgorithm, pullAlgorithm, highWaterMark, sizeAlgorithm);
stream.readableStreamController = controller;
controller.start(startAlgorithm);
return stream;
}
/**
* The DefaultReader for a ReadableStream. Adds cancellation onto the liteImpl
* DefaultReader.
* @template T
* @implements {fullTypes.ReadableStreamDefaultReader<T>}
*/
class ReadableStreamDefaultReader extends liteImpl.ReadableStreamDefaultReader {
/**
* Cancels the ReadableStream with an optional reason.
* https://streams.spec.whatwg.org/#default-reader-cancel
* @param {*} reason
* @return {!Promise<undefined>}
* @override
*/
cancel(reason) {
if (!this.ownerReadableStream) {
return Promise.reject(new TypeError(
'This readable stream reader has been released and cannot be used ' +
'to cancel its previous owner stream'));
}
return /** @type {!ReadableStream} */ (this.ownerReadableStream)
.cancelInternal(reason);
}
}
/**
* @template T
* @implements {fullTypes.ReadableStreamAsyncIterator<T>}
*/
class ReadableStreamAsyncIterator {
/**
* @param {!ReadableStreamDefaultReader<T>} asyncIteratorReader
* @param {boolean} preventCancel
* @package
*/
constructor(asyncIteratorReader, preventCancel) {
/** @package @const {!ReadableStreamDefaultReader<T>} */
this.asyncIteratorReader = asyncIteratorReader;
/** @package @const {boolean} */
this.preventCancel = preventCancel;
}
/**
* Gets the next value from the ReadableStream.
* https://streams.spec.whatwg.org/#rs-asynciterator-prototype-next
* @override
*/
next() {
if (!this.asyncIteratorReader.ownerReadableStream) {
return Promise.reject(
new TypeError('There is no more data left in the ReadableStream'));
}
return this.asyncIteratorReader.read().then(({value, done}) => {
if (done) {
this.asyncIteratorReader.release();
}
return {value, done};
});
}
/**
* Cancels the underlying stream and resolves with the value.
* @param {*} value
* @return {!Promise<!IIterableResult<T>>}
* @override
*/
return(value) {
if (!this.asyncIteratorReader.ownerReadableStream) {
return Promise.reject(
new TypeError('There is no more data left in the ReadableStream'));
}
if (this.asyncIteratorReader.readRequests.length) {
return Promise.reject(new TypeError(
'There are pending read requests in the ReadableStream'));
}
if (!this.preventCancel) {
const result = this.asyncIteratorReader.cancel(value);
this.asyncIteratorReader.release();
return result.then(() => ({done: true, value}));
}
this.asyncIteratorReader.release();
return Promise.resolve({done: true, value});
}
}
/**
* The controller for a ReadableStream. Adds cancellation and backpressure onto
* the liteImpl DefaultController.
* @template T
* @implements {fullTypes.ReadableStreamDefaultController}
*/
class ReadableStreamDefaultController extends
liteImpl.ReadableStreamDefaultController {
/**
* @param {!ReadableStream} stream
* @param {!CancelAlgorithm|undefined} cancelAlgorithm
* @param {!PullAlgorithm|undefined} pullAlgorithm
* @param {number} strategyHWM
* @param {(function(T): number)|undefined} strategySizeAlgorithm
* @package
*/
constructor(
stream, cancelAlgorithm, pullAlgorithm, strategyHWM,
strategySizeAlgorithm) {
super(stream);
/** @private {!CancelAlgorithm|undefined} */
this.cancelAlgorithm_ = cancelAlgorithm;
/** @private {boolean} */
this.pullAgain_ = false;
/** @private {!PullAlgorithm|undefined} */
this.pullAlgorithm_ = pullAlgorithm;
/** @private {boolean} */
this.pulling_ = false;
/** @private {number} */
this.queueTotalSize_ = 0;
/** @private {boolean} */
this.started_ = false;
/** @private @const {number} */
this.strategyHWM_ = strategyHWM;
/** @private {(function(T): number)|undefined} */
this.strategySizeAlgorithm_ = strategySizeAlgorithm;
/** @private @const {!QueueWithSizes<T>} */
this.queueWithSizes_ = new QueueWithSizes(this.queue);
}
/**
* Returns the desired size to fill the controlled stream's internal queue. It
* can be negative if the queue is full.
* https://streams.spec.whatwg.org/#rs-default-controller-desired-size
* @return {?number}
* @override
*/
get desiredSize() {
return this.getDesiredSize_();
}
/** @override */
started() {
this.started_ = true;
this.callPullIfNeeded();
}
/** @override */
callPullIfNeeded() {
if (!this.pullAlgorithm_ || !this.shouldCallPull_()) {
return;
}
if (this.pulling_) {
this.pullAgain_ = true;
return;
}
this.pulling_ = true;
this.pullAlgorithm_(this).then(
() => {
this.pulling_ = false;
if (this.pullAgain_) {
this.pullAgain_ = false;
this.callPullIfNeeded();
}
},
(error) => {
this.error(error);
});
}
/**
* @return {boolean}
* @private
*/
shouldCallPull_() {
if (!this.canCloseOrEnqueue()) {
return false;
}
if (!this.started_) {
return false;
}
if (this.controlledReadableStream.locked &&
this.controlledReadableStream.getNumReadRequests() > 0) {
return true;
}
return assertNumber(this.getDesiredSize_()) > 0;
}
/** @override */
clearAlgorithms() {
this.cancelAlgorithm_ = undefined;
this.pullAlgorithm_ = undefined;
this.strategySizeAlgorithm_ = undefined;
}
/**
* @param {*} reason
* @return {!Promise<*>}
* @package
*/
cancelSteps(reason) {
this.queue.resetQueue();
const cancelResult = this.cancelAlgorithm_ ? this.cancelAlgorithm_(reason) :
Promise.resolve();
this.clearAlgorithms();
return cancelResult;
}
/** @override */
enqueueIntoQueue(chunk) {
let size;
try {
// Default to size of 1 if no algorithm is specified.
size = Number(
this.strategySizeAlgorithm_ ? this.strategySizeAlgorithm_(chunk) : 1);
} catch (e) {
this.error(e);
throw e;
}
if (typeof size !== 'number' || Number.isNaN(size) || size < 0 ||
size === Infinity) {
throw new RangeError(
`The return value of a queuing strategy's size function must be a` +
' finite, non-NaN, non-negative number');
}
this.queueTotalSize_ += size;
this.queueWithSizes_.enqueueValueWithSize(chunk, size);
}
/** @override */
dequeueFromQueue() {
const {value, size} = this.queueWithSizes_.dequeueValueWithSize();
this.queueTotalSize_ -= size;
if (this.queueTotalSize_ < 0) {
// This might be less than zero due to rounding errors.
this.queueTotalSize_ = 0;
}
return value;
}
/** @override */
resetQueue() {
this.queueWithSizes_.resetQueue();
}
/**
* @return {?number}
* @private
*/
getDesiredSize_() {
if (this.controlledReadableStream.state ===
liteImpl.ReadableStream.State.ERRORED) {
return null;
}
if (this.controlledReadableStream.state ===
liteImpl.ReadableStream.State.CLOSED) {
return 0;
}
return this.strategyHWM_ - this.queueTotalSize_;
}
}
/**
* An internal Queue representation that wraps a queue and has a size associated
* with each chunk.
* @template T
* @package
*/
class QueueWithSizes {
/**
* @param {!liteImpl.Queue} queue
*/
constructor(queue) {
/**
* @private @const {!liteImpl.Queue}
*/
this.queue_ = queue;
/**
* @private {!Array<number>}
*/
this.sizes_ = [];
}
/**
* @param {T} chunk
* @param {number} size
*/
enqueueValueWithSize(chunk, size) {
this.queue_.enqueueValue(chunk);
this.sizes_.push(size);
}
/**
* @return {{value: T, size: number}}
*/
dequeueValueWithSize() {
return {
value: this.queue_.dequeueValue(),
size: this.sizes_.shift(),
};
}
/**
* @return {void}
*/
resetQueue() {
this.queue_.resetQueue();
this.sizes_ = [];
}
}
exports = {
ReadableStream,
ReadableStreamAsyncIterator,
ReadableStreamDefaultController,
ReadableStreamDefaultReader,
newReadableStream,
};