Skip to content

Commit 9c01908

Browse files
committed
implement mapAsyncIterator with repeaters
Implementation adapted from: repeaterjs/repeater#48 (comment) so that all payloads will be delivered in the original order
1 parent 041479e commit 9c01908

File tree

2 files changed

+58
-45
lines changed

2 files changed

+58
-45
lines changed

packages/utils/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
},
2727
"dependencies": {
2828
"@ardatan/aggregate-error": "0.0.6",
29+
"@repeaterjs/repeater": "^3.0.4",
2930
"camel-case": "4.1.2",
3031
"tslib": "~2.2.0"
3132
},
Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,71 @@
11
/**
2-
* Given an AsyncIterable and a callback function, return an AsyncIterator
2+
* Given an AsyncIterator and a callback function, return an AsyncIterator
33
* which produces values mapped via calling the callback function.
4+
*
5+
* Implementation adapted from:
6+
* https://github.com/repeaterjs/repeater/issues/48#issuecomment-569134039
7+
* so that all payloads will be delivered in the original order
48
*/
9+
10+
import { Push, Stop, Repeater } from '@repeaterjs/repeater';
11+
512
export function mapAsyncIterator<T, U>(
613
iterator: AsyncIterator<T>,
7-
callback: (value: T) => Promise<U> | U,
8-
rejectCallback?: any
14+
mapValue: (value: T) => Promise<U> | U,
915
): AsyncIterableIterator<U> {
10-
let $return: any;
11-
let abruptClose: any;
12-
13-
if (typeof iterator.return === 'function') {
14-
$return = iterator.return;
15-
abruptClose = (error: any) => {
16-
const rethrow = () => Promise.reject(error);
17-
return $return.call(iterator).then(rethrow, rethrow);
18-
};
19-
}
16+
const returner = iterator.return?.bind(iterator) ?? (() => {});
2017

21-
function mapResult(result: any) {
22-
return result.done ? result : asyncMapValue(result.value, callback).then(iteratorResult, abruptClose);
23-
}
18+
return new Repeater(async (push, stop) => {
19+
let earlyReturn: any;
20+
stop.then(() => {
21+
earlyReturn = returner();
22+
});
2423

25-
let mapReject: any;
26-
if (rejectCallback) {
27-
// Capture rejectCallback to ensure it cannot be null.
28-
const reject = rejectCallback;
29-
mapReject = (error: any) => asyncMapValue(error, reject).then(iteratorResult, abruptClose);
30-
}
24+
await loop(push, stop, earlyReturn, iterator, mapValue);
3125

32-
return {
33-
next() {
34-
return iterator.next().then(mapResult, mapReject);
35-
},
36-
return() {
37-
return $return
38-
? $return.call(iterator).then(mapResult, mapReject)
39-
: Promise.resolve({ value: undefined, done: true });
40-
},
41-
throw(error: any) {
42-
if (typeof iterator.throw === 'function') {
43-
return iterator.throw(error).then(mapResult, mapReject);
44-
}
45-
return Promise.reject(error).catch(abruptClose);
46-
},
47-
[Symbol.asyncIterator]() {
48-
return this;
49-
},
50-
} as any;
26+
await earlyReturn;
27+
});
5128
}
5229

53-
function asyncMapValue<T, U>(value: T, callback: (value: T) => Promise<U> | U): Promise<U> {
54-
return new Promise(resolve => resolve(callback(value)));
30+
async function loop<T, U>(
31+
push: Push<U>,
32+
stop: Stop,
33+
earlyReturn: Promise<any> | any,
34+
iterator: AsyncIterator<T>,
35+
mapValue: (value: T) => Promise<U> | U,
36+
): Promise<void> {
37+
/* eslint-disable no-unmodified-loop-condition */
38+
while (!earlyReturn) {
39+
const iteration = await next(iterator, mapValue);
40+
41+
if (iteration.done) {
42+
if (iteration.value !== undefined) {
43+
await push(iteration.value);
44+
}
45+
stop();
46+
return;
47+
}
48+
49+
await push(iteration.value);
50+
}
51+
/* eslint-enable no-unmodified-loop-condition */
5552
}
5653

57-
function iteratorResult<T>(value: T): IteratorResult<T> {
58-
return { value, done: false };
54+
async function next<T, U>(
55+
iterator: AsyncIterator<T>,
56+
mapValue: (value: T) => Promise<U> | U,
57+
): Promise<IteratorResult<U>> {
58+
const iterationCandidate = await iterator.next();
59+
60+
const value = iterationCandidate.value;
61+
if (value === undefined) {
62+
return iterationCandidate as IteratorResult<U>;
63+
}
64+
65+
const newValue = await mapValue(iterationCandidate.value);
66+
67+
return {
68+
...iterationCandidate,
69+
value: newValue,
70+
};
5971
}

0 commit comments

Comments
 (0)