chromium/third_party/blink/web_tests/external/wpt/dom/observable/tentative/observable-takeUntil.any.js

// Because we test that the global error handler is called at various times.
setup({allow_uncaught_exception: true});

promise_test(async () => {
  const source = new Observable(subscriber => {
    let i = 0;
    const interval = setInterval(() => {
      if (i < 5) {
        subscriber.next(++i);
      } else {
        subscriber.complete();
        clearInterval(interval);
      }
    }, 0);
  });

  const result = await source.takeUntil(new Observable(() => {})).toArray();
  assert_array_equals(result, [1, 2, 3, 4, 5]);
}, "takeUntil subscribes to source Observable and mirrors it uninterrupted");

promise_test(async () => {
  const source = new Observable(() => {});
  let notifierSubscribedTo = false;
  const notifier = new Observable(() => notifierSubscribedTo = true);

  source.takeUntil(notifier).subscribe();
  assert_true(notifierSubscribedTo);
}, "takeUntil subscribes to notifier");

// This test is important because ordinarily, calling `subscriber.next()` does
// not cancel a subscription associated with `subscriber`. However, for the
// `takeUntil()` operator, the spec responds to `notifier`'s `next()` by
// unsubscribing from `notifier`, which is what this test asserts.
promise_test(async () => {
  const results = [];
  const source = new Observable(subscriber => {
    results.push('source subscribe callback');
    subscriber.addTeardown(() => results.push('source teardown'));
  });

  const notifier = new Observable(subscriber => {
    subscriber.addTeardown(() => results.push('notifier teardown'));

    results.push('notifier subscribe callback');
    // Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`.
    results.push(`notifer active before next(): ${subscriber.active}`);
    subscriber.next('value');
    results.push(`notifer active after next(): ${subscriber.active}`);
  });

  source.takeUntil(notifier).subscribe({
    next: () => results.push('takeUntil() next callback'),
    error: e => results.push(`takeUntil() error callback: ${error}`),
    complete: () => results.push('takeUntil() complete callback'),
  });

  assert_array_equals(results, [
    'notifier subscribe callback',
    'notifer active before next(): true',
    'notifier teardown',
    'takeUntil() complete callback',
    'notifer active after next(): false',
  ]);
}, "takeUntil: notifier next() unsubscribes from notifier");
// This test is identical to the one above, with the exception being that the
// `notifier` calls `subscriber.error()` instead `subscriber.next()`.
promise_test(async () => {
  const results = [];
  const source = new Observable(subscriber => {
    results.push('source subscribe callback');
    subscriber.addTeardown(() => results.push('source teardown'));
  });

  const notifier = new Observable(subscriber => {
    subscriber.addTeardown(() => results.push('notifier teardown'));

    results.push('notifier subscribe callback');
    // Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`.
    results.push(`notifer active before error(): ${subscriber.active}`);
    subscriber.error('error');
    results.push(`notifer active after error(): ${subscriber.active}`);
  });

  source.takeUntil(notifier).subscribe({
    next: () => results.push('takeUntil() next callback'),
    error: e => results.push(`takeUntil() error callback: ${error}`),
    complete: () => results.push('takeUntil() complete callback'),
  });

  assert_array_equals(results, [
    'notifier subscribe callback',
    'notifer active before error(): true',
    'notifier teardown',
    'takeUntil() complete callback',
    'notifer active after error(): false',
  ]);
}, "takeUntil: notifier error() unsubscribes from notifier");
// This test is identical to the above except it `throw`s instead of calling
// `Subscriber#error()`.
promise_test(async () => {
  const results = [];
  const source = new Observable(subscriber => {
    results.push('source subscribe callback');
    subscriber.addTeardown(() => results.push('source teardown'));
  });

  const notifier = new Observable(subscriber => {
    subscriber.addTeardown(() => results.push('notifier teardown'));

    results.push('notifier subscribe callback');
    // Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`.
    results.push(`notifer active before throw: ${subscriber.active}`);
    throw new Error('custom error');
    // Won't run:
    results.push(`notifer active after throw: ${subscriber.active}`);
  });

  source.takeUntil(notifier).subscribe({
    next: () => results.push('takeUntil() next callback'),
    error: e => results.push(`takeUntil() error callback: ${error}`),
    complete: () => results.push('takeUntil() complete callback'),
  });

  assert_array_equals(results, [
    'notifier subscribe callback',
    'notifer active before throw: true',
    'notifier teardown',
    'takeUntil() complete callback',
  ]);
}, "takeUntil: notifier throw Error unsubscribes from notifier");

// Test that `notifier` unsubscribes from source Observable.
promise_test(async t => {
  const results = [];

  const source = new Observable(subscriber => {
    results.push('source subscribed');
    subscriber.addTeardown(() => results.push('source teardown'));
    subscriber.signal.addEventListener('abort',
        e => results.push('source signal abort'));
  });

  let notifierTeardownCalled = false;
  const notifier = new Observable(subscriber => {
    results.push('notifier subscribed');
    subscriber.addTeardown(() => {
      results.push('notifier teardown');
      notifierTeardownCalled = true;
    });
    subscriber.signal.addEventListener('abort',
        e => results.push('notifier signal abort'));

    // Asynchronously shut everything down.
    t.step_timeout(() => subscriber.next('value'));
  });

  let nextOrErrorCalled = false;
  let notifierTeardownCalledBeforeCompleteCallback;
  await new Promise(resolve => {
    source.takeUntil(notifier).subscribe({
      next: () => {nextOrErrorCalled = true; results.push('next callback');},
      error: () => {nextOrErrorCalled = true; results.push('error callback');},
      complete: () => {
        results.push('complete callback');
        notifierTeardownCalledBeforeCompleteCallback = notifierTeardownCalled;
        resolve();
      },
    });
  });

  // The outer `Observer#complete()` callback is called before any teardowns are
  // invoked.
  assert_false(nextOrErrorCalled);
  // The notifier/source teardowns are not called by the time the outer
  // `Observer#complete()` callback is invoked, but they are all run *after*
  // (i.e., before `notifier`'s `subscriber.next()` returns internally).
  assert_true(notifierTeardownCalledBeforeCompleteCallback);
  assert_true(notifierTeardownCalled);
  assert_array_equals(results, [
    "notifier subscribed",
    "source subscribed",
    "notifier signal abort",
    "notifier teardown",
    "source signal abort",
    "source teardown",
    "complete callback",
  ]);
}, "takeUntil: notifier next() unsubscribes from notifier & source observable");

// This test is almost identical to the above test, however instead of the
// `notifier` Observable being the thing that causes the unsubscription from
// `notifier` and `source`, it is the outer composite Observable's
// `SubscribeOptions#signal` being aborted that does this.
promise_test(async t => {
  const results = [];
  // This will get populated later with a function that resolves a promise.
  let resolver;

  const source = new Observable(subscriber => {
    results.push('source subscribed');
    subscriber.addTeardown(() => results.push('source teardown'));
    subscriber.signal.addEventListener('abort', e => {
      results.push('source signal abort');
      // This should be the last thing run in the whole teardown sequence. After
      // this, we can resolve the promise that this test is waiting on, via
      // `resolver`. That'll wrap things up and move us on to the assertions.
      resolver();
    });
  });

  const notifier = new Observable(subscriber => {
    results.push('notifier subscribed');
    subscriber.addTeardown(() => {
      results.push('notifier teardown');
    });
    subscriber.signal.addEventListener('abort',
        e => results.push('notifier signal abort'));
  });

  let observerCallbackCalled = false;
  await new Promise(resolve => {
    resolver = resolve;
    const controller = new AbortController();
    source.takeUntil(notifier).subscribe({
      next: () => observerCallbackCalled = true,
      error: () => observerCallbackCalled = true,
      complete: () => observerCallbackCalled = true,
    }, {signal: controller.signal});

    // Asynchronously shut everything down.
    t.step_timeout(() => controller.abort());
  });

  assert_false(observerCallbackCalled);
  assert_array_equals(results, [
    "notifier subscribed",
    "source subscribed",
    "notifier signal abort",
    "notifier teardown",
    "source signal abort",
    "source teardown",
  ]);
}, "takeUntil()'s AbortSignal unsubscribes from notifier & source observable");

promise_test(async () => {
  let sourceSubscribedTo = false;
  const source = new Observable(subscriber => {
    sourceSubscribedTo = true;
  });

  const notifier = new Observable(subscriber => subscriber.next('value'));

  let nextOrErrorCalled = false;
  let completeCalled = false;
  const result = source.takeUntil(notifier).subscribe({
    next: v => nextOrErrorCalled = true,
    error: e => nextOrErrorCalled = true,
    complete: () => completeCalled = true,
  });

  assert_false(sourceSubscribedTo);
  assert_true(completeCalled);
  assert_false(nextOrErrorCalled);
}, "takeUntil: source never subscribed to when notifier synchronously emits a value");

promise_test(async () => {
  let sourceSubscribedTo = false;
  const source = new Observable(subscriber => {
    sourceSubscribedTo = true;
  });

  const notifier = new Observable(subscriber => subscriber.error('error'));

  let nextOrErrorCalled = false;
  let completeCalled = false;
  const result = source.takeUntil(notifier).subscribe({
    next: v => nextOrErrorCalled = true,
    error: e => nextOrErrorCalled = true,
    complete: () => completeCalled = true,
  });

  assert_false(sourceSubscribedTo);
  assert_true(completeCalled);
  assert_false(nextOrErrorCalled);
}, "takeUntil: source never subscribed to when notifier synchronously emits error");

promise_test(async () => {
  const source = new Observable(subscriber => {
    let i = 0;
    const interval = setInterval(() => {
      if (i < 5) {
        subscriber.next(++i);
      } else {
        subscriber.complete();
        clearInterval(interval);
      }
    }, 500);
  });

  const notifier = new Observable(subscriber => subscriber.complete());

  const result = await source.takeUntil(notifier).toArray();
  assert_array_equals(result, [1, 2, 3, 4, 5]);
}, "takeUntil: source is uninterrupted when notifier completes, even synchronously");

promise_test(async () => {
  const results = [];

  let sourceSubscriber;
  let notifierSubscriber;
  const source = new Observable(subscriber => sourceSubscriber = subscriber);
  const notifier = new Observable(subscriber => notifierSubscriber = subscriber);

  source.takeUntil(notifier).subscribe({
    next: v => results.push(v),
    complete: () => results.push("complete"),
  });

  sourceSubscriber.next(1);
  sourceSubscriber.next(2);
  notifierSubscriber.next('notifier value');
  sourceSubscriber.next(3);

  assert_array_equals(results, [1, 2, 'complete']);
}, "takeUntil() mirrors the source Observable until its first next() value");

promise_test(async t => {
  let errorReported = null;

  self.addEventListener("error", e => errorReported = e, { once: true });

  const source = new Observable(() => {});
  const notifier = new Observable(subscriber => {
    t.step_timeout(() => {
      subscriber.error('error 1');
      subscriber.error('error 2');
    });
  });

  let errorCallbackCalled = false;
  await new Promise(resolve => {
    source.takeUntil(notifier).subscribe({
      error: e => errorCallbackCalled = true,
      complete: () => resolve(),
    });
  });

  assert_false(errorCallbackCalled);
  assert_true(errorReported !== null, "Exception was reported to global");
  assert_equals(errorReported.message, "Uncaught error 2", "Error message matches");
  assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0");
  assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0");
  assert_equals(errorReported.error, 'error 2', "Error object is equivalent (just a string)");
}, "takeUntil: notifier calls `Subscriber#error()` twice; second goes to global error handler");