@@ -16,6 +16,9 @@ import {register} from './metrics.js'
16
16
import {
17
17
jsonCodec as natsJson ,
18
18
} from './nats.js'
19
+ import {
20
+ createMergeVdvFahrtWithRefAusSollFahrtAndAusIstFahrts ,
21
+ } from './merge-vdv-sollfahrts-istfahrts.js'
19
22
import {
20
23
createMatchWithGtfs ,
21
24
} from './raw-match.js'
@@ -39,12 +42,17 @@ const isProgrammerError = (err) => {
39
42
return PROGRAMMER_ERRORS . some ( Err => err instanceof Err )
40
43
}
41
44
45
+ const KIND_SOLLFAHRT = 'sollfahrt'
42
46
const KIND_ISTFAHRT = 'istfahrt'
43
47
44
48
// todo: DRY with OpenDataVBB/nats-consuming-gtfs-rt-server
49
+ const NATS_JETSTREAM_REF_AUS_SOLLFAHRT_STREAM_NAME = `REF_AUS_SOLLFAHRT_${ MAJOR_VERSION } `
45
50
const NATS_JETSTREAM_AUS_ISTFAHRT_STREAM_NAME = `AUS_ISTFAHRT_${ MAJOR_VERSION } `
46
51
const NATS_JETSTREAM_GTFSRT_STREAM_NAME = `GTFS_RT_${ MAJOR_VERSION } `
47
52
53
+ const vdvMergingLogger = createLogger ( 'vdv-merging' , {
54
+ level : ( process . env . LOG_LEVEL_VDV_MERGING || 'WARN' ) . toLowerCase ( ) ,
55
+ } )
48
56
const logger = createLogger ( 'matching' , {
49
57
level : ( process . env . LOG_LEVEL_MATCHING || 'warn' ) . toLowerCase ( ) ,
50
58
} )
@@ -184,13 +192,21 @@ const runGtfsMatching = async (cfg, opt = {}) => {
184
192
} )
185
193
const vdvFahrtHaltsTotal = new Summary ( {
186
194
name : 'vdv_fahrt_halts_total' ,
187
- help : 'number of IstHalts per AUS IstFahrt' ,
195
+ help : 'number of SollHalts/ IstHalts per REF-AUS SollFahrt/ AUS IstFahrt' ,
188
196
registers : [ register ] ,
189
197
labelNames : [
190
198
'kind' , // sollfahrt, istfahrt
191
199
] ,
192
200
} )
193
201
202
+ const {
203
+ storeRefAusSollFahrt : storeVdvRefAusSollFahrtForLaterMerging ,
204
+ storeAusIstFahrt : storeVdvAusIstFahrtForLaterMerging ,
205
+ mergeVdvFahrtWithRefAusSollFahrtAndAusIstFahrts,
206
+ } = await createMergeVdvFahrtWithRefAusSollFahrtAndAusIstFahrts ( {
207
+ logger : vdvMergingLogger ,
208
+ } )
209
+
194
210
const {
195
211
matchVdvAusIstFahrtWithGtfs,
196
212
stop : stopMatching ,
@@ -222,7 +238,7 @@ const runGtfsMatching = async (cfg, opt = {}) => {
222
238
}
223
239
}
224
240
225
- const processVdvNatsMsg = async ( msg , kind , kindTitle , haltsField ) => {
241
+ const processVdvNatsMsg = async ( msg , kind , kindTitle , haltsField , storeVdvFahrt ) => {
226
242
const tReceived = Date . now ( )
227
243
const {
228
244
subject,
@@ -293,20 +309,24 @@ const runGtfsMatching = async (cfg, opt = {}) => {
293
309
kind,
294
310
} , vdvFahrt [ haltsField ] . length )
295
311
312
+ await storeVdvFahrt ( vdvFahrt )
313
+ const mergedVdvFahrt = await mergeVdvFahrtWithRefAusSollFahrtAndAusIstFahrts ( vdvFahrt )
314
+ // todo: trace-log?
315
+
296
316
const {
297
317
item : gtfsRtTripUpdate ,
298
318
isMatched,
299
319
isCached,
300
320
matchingTime,
301
- } = await matchVdvAusIstFahrtWithGtfs ( vdvFahrt )
321
+ } = await matchVdvAusIstFahrtWithGtfs ( mergedVdvFahrt )
302
322
303
323
if ( isMatched || publishUnmatchedTripUpdates ) {
304
324
publishGtfsRtTripUpdateToNats ( gtfsRtTripUpdate , {
305
325
isMatched,
306
326
isCached,
307
327
matchingTime,
308
328
// todo: log just a slice?
309
- [ kind ] : vdvFahrt ,
329
+ [ kind ] : mergedVdvFahrt ,
310
330
natsMsgSeq : msg . seq ,
311
331
} )
312
332
}
@@ -365,14 +385,70 @@ const runGtfsMatching = async (cfg, opt = {}) => {
365
385
}
366
386
}
367
387
388
+ const processRefAusSollFahrtMsg = async ( msg ) => {
389
+ await processVdvNatsMsg (
390
+ msg ,
391
+ KIND_SOLLFAHRT ,
392
+ 'REF-AUS SollFahrt' ,
393
+ 'SollHalts' ,
394
+ storeVdvRefAusSollFahrtForLaterMerging ,
395
+ )
396
+ }
397
+
398
+ // subscribe to REF-AUS SollFahrt messages
399
+ {
400
+ {
401
+ // query details of the NATS JetStream stream for AUS IstFahrts
402
+ const stream = await natsJetstreamClient . streams . get ( NATS_JETSTREAM_REF_AUS_SOLLFAHRT_STREAM_NAME )
403
+ const streamInfo = await stream . info ( )
404
+ serviceLogger . debug ( {
405
+ streamInfo,
406
+ } , 'using NATS JetStream stream for REF-AUS SollFahrts' )
407
+ // todo: assert some properties?
408
+ // strictEqual(streamInfo.config.discard, 'old', `NATS JetStream's discard must be "old"`)
409
+ }
410
+
411
+ const sollFahrtsConsumer = await natsJetstreamClient . consumers . get (
412
+ NATS_JETSTREAM_REF_AUS_SOLLFAHRT_STREAM_NAME ,
413
+ natsConsumerName ,
414
+ )
415
+
416
+ {
417
+ // query details of the (externally created) NATS JetStream consumer
418
+ const consumerInfo = await sollFahrtsConsumer . info ( )
419
+ serviceLogger . debug ( {
420
+ stream : NATS_JETSTREAM_REF_AUS_SOLLFAHRT_STREAM_NAME ,
421
+ consumerInfo,
422
+ } , 'using NATS JetStream consumer for REF-AUS SollFahrts' )
423
+ // todo: assert some properties?
424
+ // strictEqual(consumerInfo.config.deliver_policy, 'new', `REF-AUS JetStream consumer's deliver_policy must be "new"`)
425
+ // strictEqual(consumerInfo.config.ack_policy, 'explicit', `REF-AUS JetStream consumer's ack_policy must be "explicit"`)
426
+ }
427
+
428
+ const sollFahrtsSub = await sollFahrtsConsumer . consume ( )
429
+ // We're not interested in the values, processRefAusSollFahrtMsg() publishes by itself.
430
+ asyncConsume (
431
+ mapConcurrently (
432
+ sollFahrtsSub [ Symbol . asyncIterator ] ( ) ,
433
+ matchConcurrency ,
434
+ processRefAusSollFahrtMsg ,
435
+ ) ,
436
+ ) . catch ( abortWithError )
437
+
438
+ // todo: support SollUmlauf β would require adapting vdv-453-nats-adapter
439
+ }
440
+
368
441
const processAusIstFahrtMsg = async ( msg ) => {
369
442
await processVdvNatsMsg (
370
443
msg ,
371
444
KIND_ISTFAHRT ,
372
445
'AUS IstFahrt' ,
446
+ 'IstHalts' ,
447
+ storeVdvAusIstFahrtForLaterMerging ,
373
448
)
374
449
}
375
450
451
+ // subscribe to AUS IstFahrt messages
376
452
{
377
453
{
378
454
// query details of the NATS JetStream stream for AUS IstFahrts
0 commit comments