Source code

Revision control

Copy as Markdown

Other Tools

Test Info: Warnings

// Because we test that the global error handler is called at various times.
setup({allow_uncaught_exception: true});
promise_test(async () => {
const source = new Observable(subscriber => {
let i = 0;
const interval = setInterval(() => {
if (i < 5) {
subscriber.next(++i);
} else {
subscriber.complete();
clearInterval(interval);
}
}, 0);
});
const result = await source.takeUntil(new Observable(() => {})).toArray();
assert_array_equals(result, [1, 2, 3, 4, 5]);
}, "takeUntil subscribes to source Observable and mirrors it uninterrupted");
promise_test(async () => {
const source = new Observable(() => {});
let notifierSubscribedTo = false;
const notifier = new Observable(() => notifierSubscribedTo = true);
source.takeUntil(notifier).subscribe();
assert_true(notifierSubscribedTo);
}, "takeUntil subscribes to notifier");
// This test is important because ordinarily, calling `subscriber.next()` does
// not cancel a subscription associated with `subscriber`. However, for the
// `takeUntil()` operator, the spec responds to `notifier`'s `next()` by
// unsubscribing from `notifier`, which is what this test asserts.
promise_test(async () => {
const results = [];
const source = new Observable(subscriber => {
results.push('source subscribe callback');
subscriber.addTeardown(() => results.push('source teardown'));
});
const notifier = new Observable(subscriber => {
subscriber.addTeardown(() => results.push('notifier teardown'));
results.push('notifier subscribe callback');
// Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`.
results.push(`notifer active before next(): ${subscriber.active}`);
subscriber.next('value');
results.push(`notifer active after next(): ${subscriber.active}`);
});
source.takeUntil(notifier).subscribe({
next: () => results.push('takeUntil() next callback'),
error: e => results.push(`takeUntil() error callback: ${error}`),
complete: () => results.push('takeUntil() complete callback'),
});
assert_array_equals(results, [
'notifier subscribe callback',
'notifer active before next(): true',
'notifier teardown',
'takeUntil() complete callback',
'notifer active after next(): false',
]);
}, "takeUntil: notifier next() unsubscribes from notifier");
// This test is identical to the one above, with the exception being that the
// `notifier` calls `subscriber.error()` instead `subscriber.next()`.
promise_test(async () => {
const results = [];
const source = new Observable(subscriber => {
results.push('source subscribe callback');
subscriber.addTeardown(() => results.push('source teardown'));
});
const notifier = new Observable(subscriber => {
subscriber.addTeardown(() => results.push('notifier teardown'));
results.push('notifier subscribe callback');
// Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`.
results.push(`notifer active before error(): ${subscriber.active}`);
subscriber.error('error');
results.push(`notifer active after error(): ${subscriber.active}`);
});
source.takeUntil(notifier).subscribe({
next: () => results.push('takeUntil() next callback'),
error: e => results.push(`takeUntil() error callback: ${error}`),
complete: () => results.push('takeUntil() complete callback'),
});
assert_array_equals(results, [
'notifier subscribe callback',
'notifer active before error(): true',
'notifier teardown',
'takeUntil() complete callback',
'notifer active after error(): false',
]);
}, "takeUntil: notifier error() unsubscribes from notifier");
// This test is identical to the above except it `throw`s instead of calling
// `Subscriber#error()`.
promise_test(async () => {
const results = [];
const source = new Observable(subscriber => {
results.push('source subscribe callback');
subscriber.addTeardown(() => results.push('source teardown'));
});
const notifier = new Observable(subscriber => {
subscriber.addTeardown(() => results.push('notifier teardown'));
results.push('notifier subscribe callback');
// Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`.
results.push(`notifer active before throw: ${subscriber.active}`);
throw new Error('custom error');
// Won't run:
results.push(`notifer active after throw: ${subscriber.active}`);
});
source.takeUntil(notifier).subscribe({
next: () => results.push('takeUntil() next callback'),
error: e => results.push(`takeUntil() error callback: ${error}`),
complete: () => results.push('takeUntil() complete callback'),
});
assert_array_equals(results, [
'notifier subscribe callback',
'notifer active before throw: true',
'notifier teardown',
'takeUntil() complete callback',
]);
}, "takeUntil: notifier throw Error unsubscribes from notifier");
// Test that `notifier` unsubscribes from source Observable.
promise_test(async t => {
const results = [];
const source = new Observable(subscriber => {
results.push('source subscribed');
subscriber.addTeardown(() => results.push('source teardown'));
subscriber.signal.addEventListener('abort',
e => results.push('source signal abort'));
});
let notifierTeardownCalled = false;
const notifier = new Observable(subscriber => {
results.push('notifier subscribed');
subscriber.addTeardown(() => {
results.push('notifier teardown');
notifierTeardownCalled = true;
});
subscriber.signal.addEventListener('abort',
e => results.push('notifier signal abort'));
// Asynchronously shut everything down.
t.step_timeout(() => subscriber.next('value'));
});
let nextOrErrorCalled = false;
let notifierTeardownCalledBeforeCompleteCallback;
await new Promise(resolve => {
source.takeUntil(notifier).subscribe({
next: () => {nextOrErrorCalled = true; results.push('next callback');},
error: () => {nextOrErrorCalled = true; results.push('error callback');},
complete: () => {
results.push('complete callback');
notifierTeardownCalledBeforeCompleteCallback = notifierTeardownCalled;
resolve();
},
});
});
// The outer `Observer#complete()` callback is called before any teardowns are
// invoked.
assert_false(nextOrErrorCalled);
// The notifier/source teardowns are not called by the time the outer
// `Observer#complete()` callback is invoked, but they are all run *after*
// (i.e., before `notifier`'s `subscriber.next()` returns internally).
assert_true(notifierTeardownCalledBeforeCompleteCallback);
assert_true(notifierTeardownCalled);
assert_array_equals(results, [
"notifier subscribed",
"source subscribed",
"notifier signal abort",
"notifier teardown",
"source signal abort",
"source teardown",
"complete callback",
]);
}, "takeUntil: notifier next() unsubscribes from notifier & source observable");
// This test is almost identical to the above test, however instead of the
// `notifier` Observable being the thing that causes the unsubscription from
// `notifier` and `source`, it is the outer composite Observable's
// `SubscribeOptions#signal` being aborted that does this.
promise_test(async t => {
const results = [];
// This will get populated later with a function that resolves a promise.
let resolver;
const source = new Observable(subscriber => {
results.push('source subscribed');
subscriber.addTeardown(() => results.push('source teardown'));
subscriber.signal.addEventListener('abort', e => {
results.push('source signal abort');
// This should be the last thing run in the whole teardown sequence. After
// this, we can resolve the promise that this test is waiting on, via
// `resolver`. That'll wrap things up and move us on to the assertions.
resolver();
});
});
const notifier = new Observable(subscriber => {
results.push('notifier subscribed');
subscriber.addTeardown(() => {
results.push('notifier teardown');
});
subscriber.signal.addEventListener('abort',
e => results.push('notifier signal abort'));
});
let observerCallbackCalled = false;
await new Promise(resolve => {
resolver = resolve;
const controller = new AbortController();
source.takeUntil(notifier).subscribe({
next: () => observerCallbackCalled = true,
error: () => observerCallbackCalled = true,
complete: () => observerCallbackCalled = true,
}, {signal: controller.signal});
// Asynchronously shut everything down.
t.step_timeout(() => controller.abort());
});
assert_false(observerCallbackCalled);
assert_array_equals(results, [
"notifier subscribed",
"source subscribed",
"notifier signal abort",
"notifier teardown",
"source signal abort",
"source teardown",
]);
}, "takeUntil()'s AbortSignal unsubscribes from notifier & source observable");
promise_test(async () => {
let sourceSubscribedTo = false;
const source = new Observable(subscriber => {
sourceSubscribedTo = true;
});
const notifier = new Observable(subscriber => subscriber.next('value'));
let nextOrErrorCalled = false;
let completeCalled = false;
const result = source.takeUntil(notifier).subscribe({
next: v => nextOrErrorCalled = true,
error: e => nextOrErrorCalled = true,
complete: () => completeCalled = true,
});
assert_false(sourceSubscribedTo);
assert_true(completeCalled);
assert_false(nextOrErrorCalled);
}, "takeUntil: source never subscribed to when notifier synchronously emits a value");
promise_test(async () => {
let sourceSubscribedTo = false;
const source = new Observable(subscriber => {
sourceSubscribedTo = true;
});
const notifier = new Observable(subscriber => subscriber.error('error'));
let nextOrErrorCalled = false;
let completeCalled = false;
const result = source.takeUntil(notifier).subscribe({
next: v => nextOrErrorCalled = true,
error: e => nextOrErrorCalled = true,
complete: () => completeCalled = true,
});
assert_false(sourceSubscribedTo);
assert_true(completeCalled);
assert_false(nextOrErrorCalled);
}, "takeUntil: source never subscribed to when notifier synchronously emits error");
promise_test(async () => {
const source = new Observable(subscriber => {
let i = 0;
const interval = setInterval(() => {
if (i < 5) {
subscriber.next(++i);
} else {
subscriber.complete();
clearInterval(interval);
}
}, 500);
});
const notifier = new Observable(subscriber => subscriber.complete());
const result = await source.takeUntil(notifier).toArray();
assert_array_equals(result, [1, 2, 3, 4, 5]);
}, "takeUntil: source is uninterrupted when notifier completes, even synchronously");
promise_test(async () => {
const results = [];
let sourceSubscriber;
let notifierSubscriber;
const source = new Observable(subscriber => sourceSubscriber = subscriber);
const notifier = new Observable(subscriber => notifierSubscriber = subscriber);
source.takeUntil(notifier).subscribe({
next: v => results.push(v),
complete: () => results.push("complete"),
});
sourceSubscriber.next(1);
sourceSubscriber.next(2);
notifierSubscriber.next('notifier value');
sourceSubscriber.next(3);
assert_array_equals(results, [1, 2, 'complete']);
}, "takeUntil() mirrors the source Observable until its first next() value");
promise_test(async t => {
let errorReported = null;
self.addEventListener("error", e => errorReported = e, { once: true });
const source = new Observable(() => {});
const notifier = new Observable(subscriber => {
t.step_timeout(() => {
subscriber.error('error 1');
subscriber.error('error 2');
});
});
let errorCallbackCalled = false;
await new Promise(resolve => {
source.takeUntil(notifier).subscribe({
error: e => errorCallbackCalled = true,
complete: () => resolve(),
});
});
assert_false(errorCallbackCalled);
assert_true(errorReported !== null, "Exception was reported to global");
assert_equals(errorReported.message, "Uncaught error 2", "Error message matches");
assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0");
assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0");
assert_equals(errorReported.error, 'error 2', "Error object is equivalent (just a string)");
}, "takeUntil: notifier calls `Subscriber#error()` twice; second goes to global error handler");