Source code

Revision control

Copy as Markdown

Other Tools

Test Info: Warnings

promise_test(async (t) => {
const source = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
const results = [];
const completion = source.forEach((value) => {
results.push(value);
});
assert_array_equals(results, [1, 2, 3]);
await completion;
}, "forEach(): Visitor callback called synchronously for each value");
promise_test(async (t) => {
const error = new Error("error");
const source = new Observable((subscriber) => {
throw error;
});
try {
await source.forEach(() => {
assert_unreached("Visitor callback is not invoked when Observable errors");
});
assert_unreached("forEach() promise does not resolve when Observable errors");
} catch (e) {
assert_equals(e, error);
}
}, "Errors thrown by Observable reject the returned promise");
promise_test(async (t) => {
const error = new Error("error");
const source = new Observable((subscriber) => {
subscriber.error(error);
});
try {
await source.forEach(() => {
assert_unreached("Visitor callback is not invoked when Observable errors");
});
assert_unreached("forEach() promise does not resolve when Observable errors");
} catch (reason) {
assert_equals(reason, error);
}
}, "Errors pushed by Observable reject the returned promise");
promise_test(async (t) => {
// This will be assigned when `source`'s teardown is called during
// unsubscription.
let abortReason = null;
const error = new Error("error");
const source = new Observable((subscriber) => {
// Should be called from within the second `next()` call below, when the
// `forEach()` visitor callback throws an error, because that triggers
// unsubscription from `source`.
subscriber.addTeardown(() => abortReason = subscriber.signal.reason);
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
const results = [];
const completion = source.forEach((value) => {
results.push(value);
if (value === 2) {
throw error;
}
});
assert_array_equals(results, [1, 2]);
assert_equals(abortReason, error,
"forEach() visitor callback throwing an error triggers unsubscription " +
"from the source observable, with the correct abort reason");
try {
await completion;
assert_unreached("forEach() promise does not resolve when visitor throws");
} catch (e) {
assert_equals(e, error);
}
}, "Errors thrown in the visitor callback reject the promise and " +
"unsubscribe from the source");
// See https://github.com/WICG/observable/issues/96 for discussion about the
// timing of Observable AbortSignal `abort` firing and promise rejection.
promise_test(async t => {
const error = new Error('custom error');
let rejectionError = null;
let outerAbortEventMicrotaskRun = false,
forEachPromiseRejectionMicrotaskRun = false,
innerAbortEventMicrotaskRun = false;
const source = new Observable(subscriber => {
subscriber.signal.addEventListener('abort', () => {
queueMicrotask(() => {
assert_true(outerAbortEventMicrotaskRun,
"Inner abort: outer abort microtask has fired");
assert_true(forEachPromiseRejectionMicrotaskRun,
"Inner abort: forEach rejection microtask has fired");
assert_false(innerAbortEventMicrotaskRun,
"Inner abort: inner abort microtask has not fired");
innerAbortEventMicrotaskRun = true;
});
});
});
const controller = new AbortController();
controller.signal.addEventListener('abort', () => {
queueMicrotask(() => {
assert_false(outerAbortEventMicrotaskRun,
"Outer abort: outer abort microtask has not fired");
assert_false(forEachPromiseRejectionMicrotaskRun,
"Outer abort: forEach rejection microtask has not fired");
assert_false(innerAbortEventMicrotaskRun,
"Outer abort: inner abort microtask has not fired");
outerAbortEventMicrotaskRun = true;
});
});
const promise = source.forEach(() => {}, {signal: controller.signal}).catch(e => {
rejectionError = e;
assert_true(outerAbortEventMicrotaskRun,
"Promise rejection: outer abort microtask has fired");
assert_false(forEachPromiseRejectionMicrotaskRun,
"Promise rejection: forEach rejection microtask has not fired");
assert_false(innerAbortEventMicrotaskRun,
"Promise rejection: inner abort microtask has not fired");
forEachPromiseRejectionMicrotaskRun = true;
});
// This should trigger the following, in this order:
// 1. Fire the `abort` event at the outer AbortSignal, whose handler
// manually queues a microtask.
// 2. Calls "signal abort" on the outer signal's dependent signals. This
// queues a microtask to reject the `forEach()` promise.
// 3. Fire the `abort` event at the inner AbortSignal, whose handler
// manually queues a microtask.
controller.abort(error);
// After a single task, assert that everything has happened correctly (and
// incrementally in the right order);
await new Promise(resolve => {
t.step_timeout(resolve);
});
assert_true(outerAbortEventMicrotaskRun,
"Final: outer abort microtask has fired");
assert_true(forEachPromiseRejectionMicrotaskRun,
"Final: forEach rejection microtask has fired");
assert_true(innerAbortEventMicrotaskRun,
"Final: inner abort microtask has fired");
assert_equals(rejectionError, error, "Promise is rejected with the right " +
"value");
}, "forEach visitor callback rejection microtask ordering");
promise_test(async (t) => {
const source = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
const results = [];
const completion = source.forEach((value) => {
results.push(value);
});
assert_array_equals(results, [1, 2, 3]);
const completionValue = await completion;
assert_equals(completionValue, undefined, "Promise resolves with undefined");
}, "forEach() promise resolves with undefined");