-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathbuffercountortime.ts
63 lines (57 loc) · 1.96 KB
/
buffercountortime.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import { OperatorAsyncFunction } from '../../interfaces.js';
import { AsyncIterableX, interval, concat, of } from '../index.js';
import { map } from './map.js';
import { merge } from '../merge.js';
import { wrapWithAbort } from './withabort.js';
const timerEvent = {};
const ended = {};
class BufferCountOrTime<TSource> extends AsyncIterableX<TSource[]> {
constructor(
private readonly source: AsyncIterable<TSource>,
private readonly bufferSize: number,
private readonly maxWaitTime: number
) {
super();
}
async *[Symbol.asyncIterator](signal?: AbortSignal) {
const buffer: TSource[] = [];
const timer = interval(this.maxWaitTime, true).pipe(map(() => timerEvent));
const source = concat(this.source, of(ended));
const merged = merge(source, timer);
for await (const item of wrapWithAbort(merged, signal)) {
if (item === ended) {
break;
}
if (item !== timerEvent) {
buffer.push(item as TSource);
}
if (buffer.length >= this.bufferSize || (buffer.length && item === timerEvent)) {
yield buffer.slice();
buffer.length = 0;
}
}
if (buffer.length) {
yield buffer;
}
}
}
/**
* Projects each element of an async-iterable sequence into consecutive buffers
* which are emitted when either the threshold count or time is met.
*
* @template TSource The type of elements in the source sequence.
* @param {number} count The size of the buffer.
* @param {number} time The threshold number of milliseconds to wait before flushing a non-full buffer
* @returns {OperatorAsyncFunction<TSource, TSource[]>} An operator which returns an async-iterable sequence
* of buffers
*/
export function bufferCountOrTime<TSource>(
count: number,
time: number
): OperatorAsyncFunction<TSource, TSource[]> {
return function bufferOperatorFunction(
source: AsyncIterable<TSource>
): AsyncIterableX<TSource[]> {
return new BufferCountOrTime<TSource>(source, count, time);
};
}