Skip to content

Commit 8a8b031

Browse files
committed
1 parent c7ca6c8 commit 8a8b031

File tree

3 files changed

+110
-1
lines changed

3 files changed

+110
-1
lines changed

packages/pubsub/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
export * from './in-memory-channel';
22
export * from './in-memory-pubsub';
3-
3+
export * from './split';
44
export * from './types';

packages/pubsub/src/split.ts

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// adapted from: https://stackoverflow.com/questions/63543455/how-to-multicast-an-async-iterable
2+
// and: https://gist.github.com/jed/cc1e949419d42e2cb26d7f2e1645864d
3+
// and also: https://github.com/repeaterjs/repeater/issues/48#issuecomment-569134039
4+
5+
import { Push, Repeater } from '@repeaterjs/repeater';
6+
7+
import { Splitter } from './types';
8+
9+
export function split<T>(asyncIterable: AsyncIterableIterator<T>, n: number, splitter: Splitter<IteratorResult<T>>) {
10+
const iterator = asyncIterable[Symbol.asyncIterator]();
11+
const returner = iterator.return ?? undefined;
12+
13+
const buffers: Array<Array<IteratorResult<T>>> = Array(n).fill([]);
14+
15+
if (returner) {
16+
const set: Set<number> = new Set();
17+
return buffers.map((buffer, index) => {
18+
set.add(index);
19+
return new Repeater(async (push, stop) => {
20+
let earlyReturn: any;
21+
stop.then(() => {
22+
set.delete(index);
23+
if (!set.size) {
24+
earlyReturn = returner();
25+
}
26+
});
27+
28+
await loop(push, earlyReturn, buffer, index, buffers, iterator, splitter);
29+
30+
await earlyReturn;
31+
});
32+
});
33+
}
34+
35+
return buffers.map(
36+
(buffer, index) =>
37+
new Repeater(async (push, stop) => {
38+
let earlyReturn: any;
39+
stop.then(() => {
40+
earlyReturn = returner ? returner() : true;
41+
});
42+
43+
await loop(push, earlyReturn, buffer, index, buffers, iterator, splitter);
44+
45+
await earlyReturn;
46+
})
47+
);
48+
}
49+
50+
async function loop<T>(
51+
push: Push<T>,
52+
earlyReturn: Promise<any> | any,
53+
buffer: Array<IteratorResult<T>>,
54+
index: number,
55+
buffers: Array<Array<IteratorResult<T>>>,
56+
iterator: AsyncIterator<T>,
57+
splitter: Splitter<IteratorResult<T>>
58+
): Promise<void> {
59+
/* eslint-disable no-unmodified-loop-condition */
60+
while (!earlyReturn) {
61+
const iteration = await next(buffer, index, buffers, iterator, splitter);
62+
63+
if (iteration === undefined) {
64+
continue;
65+
}
66+
67+
if (iteration.done) {
68+
stop();
69+
return iteration.value;
70+
}
71+
72+
await push(iteration.value);
73+
}
74+
/* eslint-enable no-unmodified-loop-condition */
75+
}
76+
77+
async function next<T>(
78+
buffer: Array<IteratorResult<T>>,
79+
index: number,
80+
buffers: Array<Array<IteratorResult<T>>>,
81+
iterator: AsyncIterator<T>,
82+
splitter: Splitter<IteratorResult<T>>
83+
): Promise<IteratorResult<T> | undefined> {
84+
let iteration: IteratorResult<T>;
85+
86+
if (0 in buffer) {
87+
return buffer.shift();
88+
}
89+
90+
const iterationCandidate = await iterator.next();
91+
92+
const value = iterationCandidate.value;
93+
if (value) {
94+
const [iterationIndex, newValue] = splitter(value);
95+
if (index === iterationIndex) {
96+
return newValue;
97+
}
98+
99+
buffers[iterationIndex].push(iteration);
100+
return undefined;
101+
}
102+
103+
for (const buffer of buffers) {
104+
buffer.push(iteration);
105+
}
106+
return iterationCandidate;
107+
}

packages/pubsub/src/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,5 @@ export interface PubSub<T> {
1212
subscribe(topic: string, buffer?: RepeaterBuffer): AsyncIterableIterator<T>;
1313
close(reason?: any): Promise<unknown> | unknown;
1414
}
15+
16+
export type Splitter<T> = (item: T) => [number, T];

0 commit comments

Comments
 (0)