Source code

Revision control

Copy as Markdown

Other Tools

Test Info: Warnings

test(() => {
const source = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
let projectionCalls = 0;
const results = [];
const flattened = source.flatMap(value => {
projectionCalls++;
return new Observable((subscriber) => {
subscriber.next(value * 10);
subscriber.next(value * 100);
subscriber.complete();
});
});
assert_true(flattened instanceof Observable, "flatMap() returns an Observable");
assert_equals(projectionCalls, 0,
"Projection is not called until subscription starts");
flattened.subscribe({
next: v => results.push(v),
error: () => results.push("error"),
complete: () => results.push("complete"),
});
assert_equals(projectionCalls, 3,
"Mapper is called three times, once for each source Observable value");
assert_array_equals(results, [10, 100, 20, 200, 30, 300, "complete"],
"flatMap() results are correct");
}, "flatMap(): Flattens simple source Observable properly");
test(() => {
const error = new Error("error");
const source = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.error(error);
subscriber.next(3);
});
const flattened = source.flatMap(value => {
return new Observable(subscriber => {
subscriber.next(value * 10);
subscriber.next(value * 100);
subscriber.complete();
});
});
const results = [];
flattened.subscribe({
next: v => results.push(v),
error: e => results.push(e),
complete: () => results.push("complete"),
});
assert_array_equals(results, [10, 100, 20, 200, error],
"Source error is passed through to the flatMap() Observable");
}, "flatMap(): Returned Observable passes through source Observable errors");
test(() => {
const results = [];
const error = new Error("error");
const source = new Observable(subscriber => {
subscriber.next(1);
results.push(subscriber.active ? "active" : "inactive");
subscriber.next(2);
results.push(subscriber.active ? "active" : "inactive");
subscriber.next(3);
subscriber.complete();
});
const flattened = source.flatMap((value) => {
return new Observable((subscriber) => {
subscriber.next(value * 10);
subscriber.next(value * 100);
if (value === 2) {
subscriber.error(error);
} else {
subscriber.complete();
}
});
});
flattened.subscribe({
next: v => results.push(v),
error: e => results.push(e),
complete: () => results.push("complete"),
});
assert_array_equals(results, [10, 100, "active", 20, 200, error, "inactive"],
"Inner subscription error gets surfaced");
}, "flatMap(): Outer Subscription synchronously becomes inactive when an " +
"'inner' Observable emits an error");
test(() => {
const results = [];
const error = new Error("error");
const source = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
results.push(subscriber.active ? "active" : "inactive");
subscriber.complete();
});
const flattened = source.flatMap(value => {
if (value === 3) {
throw error;
}
return new Observable(subscriber => {
subscriber.next(value * 10);
subscriber.next(value * 100);
subscriber.complete();
});
});
flattened.subscribe({
next: v => results.push(v),
error: e => results.push(e),
complete: () => results.push("complete"),
});
assert_array_equals(results, [10, 100, 20, 200, error, "inactive"],
"Inner subscriber thrown error gets surfaced");
}, "flatMap(): Outer Subscription synchronously becomes inactive when an " +
"'inner' Observable throws an error");
test(() => {
const source = createTestSubject();
const inner1 = createTestSubject();
const inner2 = createTestSubject();
const flattened = source.flatMap(value => {
if (value === 1) {
return inner1;
}
return inner2;
});
const results = [];
flattened.subscribe({
next: v => results.push(v),
error: e => results.push(e),
complete: () => results.push("complete"),
});
assert_array_equals(results, []);
source.next(1);
assert_equals(inner1.subscriberCount(), 1, "inner1 gets subscribed to");
source.next(2);
assert_equals(inner2.subscriberCount(), 0,
"inner2 is queued, not subscribed to until inner1 completes");
assert_array_equals(results, []);
inner1.next(100);
inner1.next(101);
assert_array_equals(results, [100, 101]);
inner1.complete();
assert_equals(inner1.subscriberCount(), 0,
"inner1 becomes inactive once it completes");
assert_equals(inner2.subscriberCount(), 1,
"inner2 gets un-queued and subscribed to once inner1 completes");
inner2.next(200);
inner2.next(201);
assert_array_equals(results, [100, 101, 200, 201]);
inner2.complete();
assert_equals(inner2.subscriberCount(), 0,
"inner2 becomes inactive once it completes");
assert_equals(source.subscriberCount(), 1,
"source is not unsubscribed from yet, since it has not completed");
assert_array_equals(results, [100, 101, 200, 201]);
source.complete();
assert_equals(source.subscriberCount(), 0,
"source unsubscribed from after it completes");
assert_array_equals(results, [100, 101, 200, 201, "complete"]);
}, "flatMap(): result Observable does not complete until source and inner " +
"Observables all complete");
test(() => {
const source = createTestSubject();
const inner1 = createTestSubject();
const inner2 = createTestSubject();
const flattened = source.flatMap(value => {
if (value === 1) {
return inner1;
}
return inner2;
});
const results = [];
flattened.subscribe({
next: v => results.push(v),
error: e => results.push(e),
complete: () => results.push("complete"),
});
assert_array_equals(results, []);
source.next(1);
source.next(2);
assert_equals(inner1.subscriberCount(), 1, "inner1 gets subscribed to");
assert_equals(inner2.subscriberCount(), 0,
"inner2 is queued, not subscribed to until inner1 completes");
assert_array_equals(results, []);
// Before `inner1` pushes any values, we first complete the source Observable.
// This will not fire completion of the Observable returned from `flatMap()`,
// because there are two values (corresponding to inner Observables) that are
// queued to the inner queue that need to be processed first. Once the last
// one of *those* completes (i.e., `inner2.complete()` further down), then the
// returned Observable can finally complete.
source.complete();
assert_equals(source.subscriberCount(), 0,
"source becomes inactive once it completes");
inner1.next(100);
inner1.next(101);
assert_array_equals(results, [100, 101]);
inner1.complete();
assert_array_equals(results, [100, 101],
"Outer completion not triggered after inner1 completes");
assert_equals(inner2.subscriberCount(), 1,
"inner2 gets un-queued and subscribed after inner1 completes");
inner2.next(200);
inner2.next(201);
assert_array_equals(results, [100, 101, 200, 201]);
inner2.complete();
assert_equals(inner2.subscriberCount(), 0,
"inner2 becomes inactive once it completes");
assert_array_equals(results, [100, 101, 200, 201, "complete"]);
}, "flatMap(): result Observable does not complete after source Observable " +
"completes while there are still queued inner Observables to process " +
"Observables all complete");
test(() => {
const source = createTestSubject();
const inner = createTestSubject();
const result = source.flatMap(() => inner);
const ac = new AbortController();
result.subscribe({}, { signal: ac.signal, });
source.next(1);
assert_equals(inner.subscriberCount(), 1,
"inner Observable subscribed to once source emits it");
ac.abort();
assert_equals(source.subscriberCount(), 0,
"source unsubscribed from, once outer signal is aborted");
assert_equals(inner.subscriberCount(), 0,
"inner Observable unsubscribed from once the outer Observable is " +
"subscribed from, as a result of the outer signal being aborted");
}, "flatMap(): source and inner active Observables are both unsubscribed " +
"from once the outer subscription signal is aborted");
// A helper function to create an Observable that can be externally controlled
// and examined for testing purposes.
function createTestSubject() {
const subscribers = new Set();
const subject = new Observable(subscriber => {
subscribers.add(subscriber);
subscriber.addTeardown(() => subscribers.delete(subscriber));
});
subject.next = value => {
for (const subscriber of Array.from(subscribers)) {
subscriber.next(value);
}
};
subject.error = error => {
for (const subscriber of Array.from(subscribers)) {
subscriber.error(error);
}
};
subject.complete = () => {
for (const subscriber of Array.from(subscribers)) {
subscriber.complete();
}
};
subject.subscriberCount = () => {
return subscribers.size;
};
return subject;
}