Skip to content

Commit 1716c74

Browse files
committed
matching: process more NATS messages while some still get processed 🐛
iter-tools' asyncBuffer() retains the input order, causing slow matchings to "block" later messages from getting processed, reducing the throughput significantly! async-iterator-concurrent-map's mapConcurrent() doesn't do this.
1 parent 74a0a41 commit 1716c74

File tree

2 files changed

+11
-13
lines changed

2 files changed

+11
-13
lines changed

lib/match.js

+10-13
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import {
77
} from 'prom-client'
88
import {
99
asyncConsume,
10-
execPipe,
11-
asyncMap,
12-
asyncBuffer,
1310
} from 'iter-tools'
11+
import {
12+
mapConcurrent as mapConcurrently,
13+
} from 'async-iterator-concurrent-map'
1414
import {createLogger} from './logger.js'
1515
import {register} from './metrics.js'
1616
import {
@@ -368,16 +368,13 @@ const runGtfsMatching = async (cfg, opt = {}) => {
368368
}
369369

370370
const istFahrtsSub = await istFahrtsConsumer.consume()
371-
execPipe(
372-
istFahrtsSub,
373-
374-
// asyncBuffer workaround
375-
// see also https://github.com/iter-tools/iter-tools/issues/425#issuecomment-882875848
376-
asyncMap(msg => [processAusIstFahrtMsg(msg)]),
377-
asyncBuffer(matchConcurrency),
378-
asyncMap(([task]) => task),
379-
380-
asyncConsume,
371+
// We're not interested in the values, processAusIstFahrtMsg() publishes by itself.
372+
asyncConsume(
373+
mapConcurrently(
374+
istFahrtsSub[Symbol.asyncIterator](),
375+
matchConcurrency,
376+
processAusIstFahrtMsg,
377+
),
381378
).catch(abortWithError)
382379

383380
// todo: support IstUmlauf – would require adapting vdv-453-nats-adapter

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"node": ">=22"
2121
},
2222
"dependencies": {
23+
"async-iterator-concurrent-map": "^1.0.2",
2324
"ioredis": "^5.4.1",
2425
"iter-tools": "^7.5.3",
2526
"lodash": "^4.17.21",

0 commit comments

Comments
 (0)