@@ -141,13 +141,17 @@ const runGtfsMatching = async (cfg, opt = {}) => {
141
141
name : 'nats_msg_seq' ,
142
142
help : 'sequence number of the latest NATS message being processed' ,
143
143
registers : [ register ] ,
144
+ labelNames : [
145
+ 'topic_root' , // first "segment" of the topic, e.g. `AUS` with `aus.istfahrt.foo.bar`
146
+ ] ,
144
147
} )
145
148
146
149
const successesTotal = new Counter ( {
147
150
name : 'matching_successes_total' ,
148
151
help : 'number of successfully matched movements/trips' ,
149
152
registers : [ register ] ,
150
153
labelNames : [
154
+ 'kind' , // sollfahrt, istfahrt
151
155
'cached' ,
152
156
] ,
153
157
} )
@@ -156,20 +160,24 @@ const runGtfsMatching = async (cfg, opt = {}) => {
156
160
help : 'number of matching failures' ,
157
161
registers : [ register ] ,
158
162
labelNames : [
163
+ 'kind' , // sollfahrt, istfahrt
159
164
'cached' ,
160
165
] ,
161
166
} )
162
167
const errorsTotal = new Counter ( {
163
168
name : 'matching_errors_total' ,
164
169
help : 'number of errors that have occured while matching' ,
165
170
registers : [ register ] ,
166
- labelNames : [ ] ,
171
+ labelNames : [
172
+ 'kind' , // sollfahrt, istfahrt
173
+ ] ,
167
174
} )
168
175
const matchingTimeSeconds = new Summary ( {
169
176
name : 'matching_time_seconds' ,
170
177
help : 'seconds trips need to be matched' ,
171
178
registers : [ register ] ,
172
179
labelNames : [
180
+ 'kind' , // sollfahrt, istfahrt
173
181
'matched' ,
174
182
'cached' ,
175
183
] ,
@@ -250,7 +258,7 @@ const runGtfsMatching = async (cfg, opt = {}) => {
250
258
topic_root,
251
259
redelivered,
252
260
} , tReceived / 1000 )
253
- natsMsgSeq . set ( seq )
261
+ natsMsgSeq . set ( seq , { topic_root } )
254
262
}
255
263
256
264
let vdvItem = null
@@ -275,14 +283,17 @@ const runGtfsMatching = async (cfg, opt = {}) => {
275
283
276
284
if ( isMatched ) {
277
285
successesTotal . inc ( {
286
+ kind,
278
287
cached : isCached ? '1' : '0' ,
279
288
} )
280
289
} else {
281
290
failuresTotal . inc ( {
291
+ kind,
282
292
cached : isCached ? '1' : '0' ,
283
293
} )
284
294
}
285
295
matchingTimeSeconds . observe ( {
296
+ kind,
286
297
matched : isMatched ? '1' : '0' ,
287
298
cached : isCached ? '1' : '0' ,
288
299
} , matchingTime / 1000 )
@@ -305,7 +316,9 @@ const runGtfsMatching = async (cfg, opt = {}) => {
305
316
[ kind ] : vdvItem ,
306
317
natsMsgSeq : msg . seq ,
307
318
} , `failed to match trip: ${ err . message || ( err + '' ) } ` )
308
- errorsTotal . inc ( )
319
+ errorsTotal . inc ( {
320
+ kind,
321
+ } )
309
322
}
310
323
311
324
// We catch all non-programmer errors in order not to abort the message processing (see below).
@@ -371,8 +384,9 @@ const runGtfsMatching = async (cfg, opt = {}) => {
371
384
// query details of the (externally created) NATS JetStream consumer
372
385
const consumerInfo = await istFahrtsConsumer . info ( )
373
386
serviceLogger . debug ( {
387
+ stream : NATS_JETSTREAM_AUS_ISTFAHRT_STREAM_NAME ,
374
388
consumerInfo,
375
- } , 'using NATS JetStream consumer' )
389
+ } , 'using NATS JetStream consumer for AUS IstFahrts ' )
376
390
}
377
391
378
392
const istFahrtsSub = await istFahrtsConsumer . consume ( )
0 commit comments