Skip to content

Commit eb1c170

Browse files
committed
matching: add more NATS metrics
- nats_nr_of_msgs_{received,sent}_total - nats_latest_msg_{received,sent}_timestamp_seconds
1 parent b46616c commit eb1c170

File tree

1 file changed

+83
-1
lines changed

1 file changed

+83
-1
lines changed

lib/match.js

+83-1
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,56 @@ const runGtfsMatching = async (cfg, opt = {}) => {
9191
}
9292
ok(Number.isInteger(natsAckWait), 'opt.natsAckWait must be an integer')
9393

94+
// NATS-related metrics
95+
// Note: We mirror OpenDataVBB/gtfs-rt-feed's metrics here.
96+
const natsNrOfMessagesReceivedTotal = new Counter({
97+
name: 'nats_nr_of_msgs_received_total',
98+
help: 'number of messages received from NATS',
99+
registers: [register],
100+
labelNames: [
101+
'stream', // name of the JetStream stream
102+
'consumer', // name of the JetStream consumer
103+
'topic_root', // first "segment" of the topic, e.g. `AUS` with `aus.istfahrt.foo.bar`
104+
'redelivered', // 1/0
105+
],
106+
})
107+
const natsLatestMessageReceivedTimestampSeconds = new Gauge({
108+
name: 'nats_latest_msg_received_timestamp_seconds',
109+
help: 'when the latest message has been received from NATS',
110+
registers: [register],
111+
labelNames: [
112+
'stream', // name of the JetStream stream
113+
'consumer', // name of the JetStream consumer
114+
'topic_root', // first "segment" of the topic, e.g. `AUS` with `aus.istfahrt.foo.bar`
115+
'redelivered', // 1/0
116+
],
117+
})
118+
// todo: track redeliveries as `Summary` using `msg.info.redeliveryCount`
119+
const natsNrOfMessagesSentTotal = new Counter({
120+
name: 'nats_nr_of_msgs_sent_total',
121+
help: 'number of messages sent to NATS',
122+
registers: [register],
123+
labelNames: [
124+
'topic_root', // first "segment" of the topic, e.g. `AUS` with `aus.istfahrt.foo.bar`
125+
],
126+
})
127+
const natsLatestMessageSentTimestampSeconds = new Gauge({
128+
name: 'nats_latest_msg_sent_timestamp_seconds',
129+
help: 'when the latest message has been sent to NATS',
130+
registers: [register],
131+
labelNames: [
132+
'topic_root', // first "segment" of the topic, e.g. `AUS` with `aus.istfahrt.foo.bar`
133+
],
134+
})
135+
// NATS gives separate sequence numbers to both a) messages in a stream and b) messages as (re-)received by a consumer.
136+
// We currently use `msg.seq`. – todo: what is that?
137+
// todo [breaking]: change ot use the stream sequence or remove this metric, as it's misleading!
94138
const natsMsgSeq = new Gauge({
95139
name: 'nats_msg_seq',
96140
help: 'sequence number of the latest NATS message being processed',
97141
registers: [register],
98142
})
143+
99144
const successesTotal = new Counter({
100145
name: 'matching_successes_total',
101146
help: 'number of successfully matched movements/trips',
@@ -143,6 +188,7 @@ const runGtfsMatching = async (cfg, opt = {}) => {
143188
} = await matchVdvAusIstFahrtWithGtfs(vdvAusIstFahrt)
144189

145190
const topic = getNatsTopicFromGtfsRtTripUpdate(gtfsRtTripUpdate)
191+
const tPublished = Date.now()
146192

147193
logger.trace({
148194
topic,
@@ -156,6 +202,18 @@ const runGtfsMatching = async (cfg, opt = {}) => {
156202
}, 'publishing GTFS-RT TripUpdate')
157203
natsClient.publish(topic, natsJson.encode(gtfsRtTripUpdate))
158204

205+
// update NATS metrics
206+
{
207+
// We slice() to keep the cardinality low in case of a bug.
208+
const topic_root = (topic.split('.')[0] || '').slice(0, 7)
209+
natsNrOfMessagesSentTotal.inc({
210+
topic_root,
211+
})
212+
natsLatestMessageSentTimestampSeconds.set({
213+
topic_root,
214+
}, tPublished / 1000)
215+
}
216+
159217
if (isMatched) {
160218
successesTotal.inc({
161219
cached: isCached ? '1' : '0',
@@ -181,6 +239,7 @@ const runGtfsMatching = async (cfg, opt = {}) => {
181239
}
182240

183241
const processAusIstFahrtMsg = async (msg) => {
242+
const tReceived = Date.now()
184243
const {
185244
subject,
186245
seq,
@@ -193,7 +252,30 @@ const runGtfsMatching = async (cfg, opt = {}) => {
193252
redelivered,
194253
dataSlice: data.slice(0, 100).toString('utf8'),
195254
}, 'processing AUS IstFahrt msg')
196-
natsMsgSeq.set(seq) // todo: is `seq` an integer?
255+
256+
// update NATS metrics
257+
{
258+
const {
259+
stream,
260+
consumer,
261+
} = msg.info
262+
// We slice() to keep the cardinality low in case of a bug.
263+
const topic_root = (subject.split('.')[0] || '').slice(0, 7)
264+
const redelivered = msg.info.redelivered ? '1' : '0'
265+
natsNrOfMessagesReceivedTotal.inc({
266+
stream, // name of the JetStream stream
267+
consumer, // name of the JetStream consumer
268+
topic_root,
269+
redelivered,
270+
})
271+
natsLatestMessageReceivedTimestampSeconds.set({
272+
stream, // name of the JetStream stream
273+
consumer, // name of the JetStream consumer
274+
topic_root,
275+
redelivered,
276+
}, tReceived / 1000)
277+
natsMsgSeq.set(seq)
278+
}
197279

198280
let ausIstFahrt = null
199281
try {

0 commit comments

Comments
 (0)