chromium/third_party/blink/web_tests/external/wpt/streams/transform-streams/backpressure.any.js

// META: global=window,worker,shadowrealm
// META: script=../resources/recording-streams.js
// META: script=../resources/test-utils.js
'use strict';

const error1 = new Error('error1 message');
error1.name = 'error1';

promise_test(() => {
  const ts = recordingTransformStream();
  const writer = ts.writable.getWriter();
  // This call never resolves.
  writer.write('a');
  return flushAsyncEvents().then(() => {
    assert_array_equals(ts.events, [], 'transform should not be called');
  });
}, 'backpressure allows no transforms with a default identity transform and no reader');

promise_test(() => {
  const ts = recordingTransformStream({}, undefined, { highWaterMark: 1 });
  const writer = ts.writable.getWriter();
  // This call to write() resolves asynchronously.
  writer.write('a');
  // This call to write() waits for backpressure that is never relieved and never calls transform().
  writer.write('b');
  return flushAsyncEvents().then(() => {
    assert_array_equals(ts.events, ['transform', 'a'], 'transform should be called once');
  });
}, 'backpressure only allows one transform() with a identity transform with a readable HWM of 1 and no reader');

promise_test(() => {
  // Without a transform() implementation, recordingTransformStream() never enqueues anything.
  const ts = recordingTransformStream({
    transform() {
      // Discard all chunks. As a result, the readable side is never full enough to exert backpressure and transform()
      // keeps being called.
    }
  }, undefined, { highWaterMark: 1 });
  const writer = ts.writable.getWriter();
  const writePromises = [];
  for (let i = 0; i < 4; ++i) {
    writePromises.push(writer.write(i));
  }
  return Promise.all(writePromises).then(() => {
    assert_array_equals(ts.events, ['transform', 0, 'transform', 1, 'transform', 2, 'transform', 3],
                        'all 4 events should be transformed');
  });
}, 'transform() should keep being called as long as there is no backpressure');

promise_test(() => {
  const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
  const writer = ts.writable.getWriter();
  const reader = ts.readable.getReader();
  const events = [];
  const writerPromises = [
    writer.write('a').then(() => events.push('a')),
    writer.write('b').then(() => events.push('b')),
    writer.close().then(() => events.push('closed'))];
  return delay(0).then(() => {
    assert_array_equals(events, ['a'], 'the first write should have resolved');
    return reader.read();
  }).then(({ value, done }) => {
    assert_false(done, 'done should not be true');
    assert_equals('a', value, 'value should be "a"');
    return delay(0);
  }).then(() => {
    assert_array_equals(events, ['a', 'b', 'closed'], 'both writes and close() should have resolved');
    return reader.read();
  }).then(({ value, done }) => {
    assert_false(done, 'done should still not be true');
    assert_equals('b', value, 'value should be "b"');
    return reader.read();
  }).then(({ done }) => {
    assert_true(done, 'done should be true');
    return writerPromises;
  });
}, 'writes should resolve as soon as transform completes');

promise_test(() => {
  const ts = new TransformStream(undefined, undefined, { highWaterMark: 0 });
  const writer = ts.writable.getWriter();
  const reader = ts.readable.getReader();
  const readPromise = reader.read();
  writer.write('a');
  return readPromise.then(({ value, done }) => {
    assert_false(done, 'not done');
    assert_equals(value, 'a', 'value should be "a"');
  });
}, 'calling pull() before the first write() with backpressure should work');

promise_test(() => {
  let reader;
  const ts = recordingTransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk);
      return reader.read();
    }
  }, undefined, { highWaterMark: 1 });
  const writer = ts.writable.getWriter();
  reader = ts.readable.getReader();
  return writer.write('a');
}, 'transform() should be able to read the chunk it just enqueued');

promise_test(() => {
  let resolveTransform;
  const transformPromise = new Promise(resolve => {
    resolveTransform = resolve;
  });
  const ts = recordingTransformStream({
    transform() {
      return transformPromise;
    }
  }, undefined, new CountQueuingStrategy({ highWaterMark: Infinity }));
  const writer = ts.writable.getWriter();
  assert_equals(writer.desiredSize, 1, 'desiredSize should be 1');
  return delay(0).then(() => {
    writer.write('a');
    assert_array_equals(ts.events, ['transform', 'a']);
    assert_equals(writer.desiredSize, 0, 'desiredSize should be 0');
    return flushAsyncEvents();
  }).then(() => {
    assert_equals(writer.desiredSize, 0, 'desiredSize should still be 0');
    resolveTransform();
    return delay(0);
  }).then(() => {
    assert_equals(writer.desiredSize, 1, 'desiredSize should be 1');
  });
}, 'blocking transform() should cause backpressure');

promise_test(t => {
  const ts = new TransformStream();
  ts.readable.cancel(error1);
  return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject');
}, 'writer.closed should resolve after readable is canceled during start');

promise_test(t => {
  const ts = new TransformStream({}, undefined, { highWaterMark: 0 });
  return delay(0).then(() => {
    ts.readable.cancel(error1);
    return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject');
  });
}, 'writer.closed should resolve after readable is canceled with backpressure');

promise_test(t => {
  const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
  return delay(0).then(() => {
    ts.readable.cancel(error1);
    return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject');
  });
}, 'writer.closed should resolve after readable is canceled with no backpressure');

promise_test(() => {
  const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
  const writer = ts.writable.getWriter();
  return delay(0).then(() => {
    const writePromise = writer.write('a');
    ts.readable.cancel(error1);
    return writePromise;
  });
}, 'cancelling the readable should cause a pending write to resolve');

promise_test(t => {
  const rs = new ReadableStream();
  const ts = new TransformStream();
  const pipePromise = rs.pipeTo(ts.writable);
  ts.readable.cancel(error1);
  return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected');
}, 'cancelling the readable side of a TransformStream should abort an empty pipe');

promise_test(t => {
  const rs = new ReadableStream();
  const ts = new TransformStream();
  const pipePromise = rs.pipeTo(ts.writable);
  return delay(0).then(() => {
    ts.readable.cancel(error1);
    return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected');
  });
}, 'cancelling the readable side of a TransformStream should abort an empty pipe after startup');

promise_test(t => {
  const rs = new ReadableStream({
    start(controller) {
      controller.enqueue('a');
      controller.enqueue('b');
      controller.enqueue('c');
    }
  });
  const ts = new TransformStream();
  const pipePromise = rs.pipeTo(ts.writable);
  // Allow data to flow into the pipe.
  return delay(0).then(() => {
    ts.readable.cancel(error1);
    return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected');
  });
}, 'cancelling the readable side of a TransformStream should abort a full pipe');