@@ -39,6 +39,8 @@ const isProgrammerError = (err) => {
39
39
return PROGRAMMER_ERRORS . some ( Err => err instanceof Err )
40
40
}
41
41
42
+ const KIND_ISTFAHRT = 'istfahrt'
43
+
42
44
// todo: DRY with OpenDataVBB/nats-consuming-gtfs-rt-server
43
45
const NATS_JETSTREAM_AUS_ISTFAHRT_STREAM_NAME = `AUS_ISTFAHRT_${ MAJOR_VERSION } `
44
46
const NATS_JETSTREAM_GTFSRT_STREAM_NAME = `GTFS_RT_${ MAJOR_VERSION } `
@@ -180,77 +182,31 @@ const runGtfsMatching = async (cfg, opt = {}) => {
180
182
logger,
181
183
} )
182
184
183
- const matchVdvAusIstFahrtAndPublishAsGtfsRtTripUpdate = async ( vdvAusIstFahrt , msg ) => {
184
- try {
185
- const {
186
- item : gtfsRtTripUpdate ,
187
- isMatched,
188
- isCached,
189
- matchingTime,
190
- } = await matchVdvAusIstFahrtWithGtfs ( vdvAusIstFahrt )
191
-
192
- if ( isMatched || publishUnmatchedTripUpdates ) {
193
- const topic = getNatsTopicFromGtfsRtTripUpdate ( gtfsRtTripUpdate )
194
- const tPublished = Date . now ( )
185
+ const publishGtfsRtTripUpdateToNats = ( gtfsRtTripUpdate , logCtx ) => {
186
+ const topic = getNatsTopicFromGtfsRtTripUpdate ( gtfsRtTripUpdate )
187
+ const tPublished = Date . now ( )
195
188
196
- logger . trace ( {
197
- topic,
198
- isMatched,
199
- isCached,
200
- matchingTime,
201
- gtfsRtTripUpdate,
202
- // todo: log just a slice?
203
- vdvAusIstFahrt,
204
- natsMsgSeq : msg . seq ,
205
- } , 'publishing GTFS-RT TripUpdate' )
206
- natsClient . publish ( topic , natsJson . encode ( gtfsRtTripUpdate ) )
207
-
208
- // update NATS metrics
209
- {
210
- // We slice() to keep the cardinality low in case of a bug.
211
- const topic_root = ( topic . split ( '.' ) [ 0 ] || '' ) . slice ( 0 , 7 )
212
- natsNrOfMessagesSentTotal . inc ( {
213
- topic_root,
214
- } )
215
- natsLatestMessageSentTimestampSeconds . set ( {
216
- topic_root,
217
- } , tPublished / 1000 )
218
- }
219
- }
189
+ logger . trace ( {
190
+ ...logCtx ,
191
+ topic,
192
+ gtfsRtTripUpdate,
193
+ } , 'publishing GTFS-RT TripUpdate' )
194
+ natsClient . publish ( topic , natsJson . encode ( gtfsRtTripUpdate ) )
220
195
221
- if ( isMatched ) {
222
- successesTotal . inc ( {
223
- cached : isCached ? '1' : '0' ,
224
- } )
225
- } else {
226
- failuresTotal . inc ( {
227
- cached : isCached ? '1' : '0' ,
228
- } )
229
- }
230
- matchingTimeSeconds . observe ( {
231
- matched : isMatched ? '1' : '0' ,
232
- cached : isCached ? '1' : '0' ,
233
- } , matchingTime / 1000 )
234
-
235
- if ( ! isMatched ) {
236
- // > Indicate to the JetStream server that processing of the message failed and that the message should not be sent to the consumer again.
237
- // https://nats-io.github.io/nats.js/jetstream/interfaces/JsMsg.html#term
238
- msg . term ( )
239
- }
240
- } catch ( err ) {
241
- if ( isProgrammerError ( err ) ) {
242
- throw err
243
- }
244
- logger . warn ( {
245
- err,
246
- vdvAusIstFahrt,
247
- natsMsgSeq : msg . seq ,
248
- } , `failed to match trip: ${ err . message || ( err + '' ) } ` )
249
- errorsTotal . inc ( )
196
+ // update NATS metrics
197
+ {
198
+ // We slice() to keep the cardinality low in case of a bug.
199
+ const topic_root = ( topic . split ( '.' ) [ 0 ] || '' ) . slice ( 0 , 7 )
200
+ natsNrOfMessagesSentTotal . inc ( {
201
+ topic_root,
202
+ } )
203
+ natsLatestMessageSentTimestampSeconds . set ( {
204
+ topic_root,
205
+ } , tPublished / 1000 )
250
206
}
251
207
}
252
208
253
- const processAusIstFahrtMsg = async ( msg ) => {
209
+ const processVdvNatsMsg = async ( msg , kind , kindTitle , processVdvItem ) => {
254
210
const tReceived = Date . now ( )
255
211
const {
256
212
subject,
@@ -267,7 +223,7 @@ const runGtfsMatching = async (cfg, opt = {}) => {
267
223
streamSeq,
268
224
redelivered,
269
225
dataSlice : data . slice ( 0 , 100 ) . toString ( 'utf8' ) ,
270
- } , ' processing AUS IstFahrt msg' )
226
+ } , ` processing ${ kindTitle } msg` )
271
227
272
228
// > Indicate to the JetStream server that processing of the message is on going, and that the ack wait timer for the message should be reset preventing a redelivery.
273
229
// https://nats-io.github.io/nats.js/jetstream/interfaces/JsMsg.html#working
@@ -297,35 +253,67 @@ const runGtfsMatching = async (cfg, opt = {}) => {
297
253
natsMsgSeq . set ( seq )
298
254
}
299
255
300
- let ausIstFahrt = null
256
+ let vdvItem = null
301
257
try {
302
- ausIstFahrt = msg . json ( data )
258
+ vdvItem = msg . json ( data )
303
259
} catch ( err ) {
304
260
serviceLogger . warn ( {
305
261
err,
306
262
subject,
307
263
seq,
308
- } , ' failure decoding AUS IstFahrt msg' )
264
+ } , ` failure decoding ${ kindTitle } msg` )
309
265
// We don't nak() here because we don't want it to be redelivered, the message is invalid anyways.
310
266
return ;
311
267
}
312
268
313
269
try {
314
- // todo: validate against schema, error-log and abort if invalid
315
- await matchVdvAusIstFahrtAndPublishAsGtfsRtTripUpdate ( ausIstFahrt , msg )
270
+ const {
271
+ isMatched,
272
+ isCached,
273
+ matchingTime,
274
+ } = await processVdvItem ( vdvItem , msg )
275
+
276
+ if ( isMatched ) {
277
+ successesTotal . inc ( {
278
+ cached : isCached ? '1' : '0' ,
279
+ } )
280
+ } else {
281
+ failuresTotal . inc ( {
282
+ cached : isCached ? '1' : '0' ,
283
+ } )
284
+ }
285
+ matchingTimeSeconds . observe ( {
286
+ matched : isMatched ? '1' : '0' ,
287
+ cached : isCached ? '1' : '0' ,
288
+ } , matchingTime / 1000 )
316
289
317
290
serviceLogger . trace ( {
318
291
subject,
319
292
seq,
320
- } , 'successfully processed AUS IstFahrt msg' )
321
- msg . ack ( )
293
+ } , `successfully processed ${ kindTitle } msg` )
294
+ if ( isMatched ) {
295
+ msg . ack ( )
296
+ } else {
297
+ // > Indicate to the JetStream server that processing of the message failed and that the message should not be sent to the consumer again.
298
+ // https://nats-io.github.io/nats.js/jetstream/interfaces/JsMsg.html#term
299
+ msg . term ( )
300
+ }
322
301
} catch ( err ) {
302
+ if ( ! isProgrammerError ( err ) ) {
303
+ logger . warn ( {
304
+ err,
305
+ [ kind ] : vdvItem ,
306
+ natsMsgSeq : msg . seq ,
307
+ } , `failed to match trip: ${ err . message || ( err + '' ) } ` )
308
+ errorsTotal . inc ( )
309
+ }
310
+
323
311
// We catch all non-programmer errors in order not to abort the message processing (see below).
324
312
serviceLogger . warn ( {
325
313
err,
326
314
subject,
327
315
seq,
328
- } , ' failure processing AUS IstFahrt msg' )
316
+ } , ` failure processing ${ kindTitle } msg` )
329
317
// Explicitly signal to NATS JetStream that this message could not be processed.
330
318
msg . nak ( )
331
319
if ( isProgrammerError ( err ) ) {
@@ -334,6 +322,36 @@ const runGtfsMatching = async (cfg, opt = {}) => {
334
322
}
335
323
}
336
324
325
+ const matchVdvAusIstFahrtAndPublishAsGtfsRtTripUpdate = async ( vdvAusIstFahrt , msg ) => {
326
+ // todo: validate against schema, error-log and abort if invalid
327
+ const {
328
+ item : gtfsRtTripUpdate ,
329
+ isMatched,
330
+ isCached,
331
+ matchingTime,
332
+ } = await matchVdvAusIstFahrtWithGtfs ( vdvAusIstFahrt )
333
+
334
+ if ( isMatched || publishUnmatchedTripUpdates ) {
335
+ publishGtfsRtTripUpdateToNats ( gtfsRtTripUpdate , {
336
+ isMatched,
337
+ isCached,
338
+ matchingTime,
339
+ // todo: log just a slice?
340
+ vdvAusIstFahrt,
341
+ natsMsgSeq : msg . seq ,
342
+ } )
343
+ }
344
+ }
345
+
346
+ const processAusIstFahrtMsg = async ( msg ) => {
347
+ await processVdvNatsMsg (
348
+ msg ,
349
+ KIND_ISTFAHRT ,
350
+ 'AUS IstFahrt' ,
351
+ matchVdvAusIstFahrtAndPublishAsGtfsRtTripUpdate ,
352
+ )
353
+ }
354
+
337
355
{
338
356
{
339
357
// query details of the NATS JetStream stream for AUS IstFahrts
0 commit comments