Skip to content

Commit 4896deb

Browse files
committed
matching: SollFahrt/IstFahrt merging: use Redis Hashes πŸ›βœ…πŸ“
The previous approach (plain keys + SCAN + MGET) - Failed to SCAN to completion sometimes, and - scaled badly. Using Redis Hashes seems much more elegant in general.
1 parent 56f18ac commit 4896deb

File tree

4 files changed

+47
-23
lines changed

4 files changed

+47
-23
lines changed

β€Žlib/caching.js

+1
Original file line numberDiff line numberDiff line change
@@ -111,5 +111,6 @@ const createCache = async (opt = {}) => {
111111
}
112112

113113
export {
114+
PREFIX,
114115
createCache,
115116
}

β€Žlib/merge-vdv-sollfahrts-istfahrts.js

+44-21
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ import {
55
} from 'node:assert/strict'
66
import omit from 'lodash/omit.js'
77
import maxBy from 'lodash/maxBy.js'
8+
import {
9+
PREFIX as REDIS_PREFIX,
10+
} from './caching.js'
811
import {
912
mergeButPreferNonNull,
1013
unixTimestampFromIso8601,
1114
} from './util.js'
12-
import {createCache} from './caching.js'
15+
import {connectToRedis} from './redis.js'
1316

1417
// The logic in this file uses three sources of information:
1518
// 1. In almost all cases, a REF-AUS SollFahrt is sent ahead of time.
@@ -20,9 +23,14 @@ import {createCache} from './caching.js'
2023

2124
// ---
2225

26+
const VDV_STORAGE_TTL = process.env.VDV_STORAGE_TTL
27+
? parseInt(process.env.VDV_STORAGE_TTL) * 1000
28+
: 32 * 60 * 60 * 1000 // 32 hours
29+
2330
const KIND_SOLLFAHRT = Symbol('REF-AUS SollFahrt')
2431
const KIND_ISTFAHRT = Symbol('AUS IstFahrt')
2532

33+
const STORAGE_KEY_PREFIX = REDIS_PREFIX + 'vdv:'
2634
const STORAGE_KEY_REF_AUS_SOLLFAHRT = 'ref_aus_soll'
2735
const STORAGE_KEY_AUS_ISTFAHRT_KOMPLETTFAHRT = 'aus_komplett'
2836
const STORAGE_KEY_AUS_ISTFAHRT_PARTIAL = 'aus_partial'
@@ -214,9 +222,12 @@ const createMergeVdvFahrtWithRefAusSollFahrtAndAusIstFahrts = async (cfg) => {
214222
logger,
215223
} = cfg
216224

217-
const storage = await createCache({
218-
prefix: 'vdv:',
219-
})
225+
const storage = await connectToRedis()
226+
// As of [email protected], it doesn't support the HSETEX command, so we patch our client.
227+
// https://redis.io/docs/latest/commands/?group=hash
228+
if (typeof storage.hsetex !== 'function') {
229+
storage.addBuiltinCommand('hsetex')
230+
}
220231

221232
const _storeVdvFahrt = async (vdvFahrt, kind) => {
222233
if (kind !== KIND_SOLLFAHRT && kind !== KIND_ISTFAHRT) {
@@ -231,10 +242,11 @@ const createMergeVdvFahrtWithRefAusSollFahrtAndAusIstFahrts = async (cfg) => {
231242
if (!fahrtId) {
232243
return null
233244
}
245+
const storageKey = STORAGE_KEY_PREFIX + fahrtId
234246

235247
const isKomplettfahrt = vdvFahrt.Komplettfahrt === 'true'
236248
if (kind === KIND_ISTFAHRT && !isKomplettfahrt) {
237-
const entries = halts.map((istHalt, i) => {
249+
const fieldsArgs = halts.flatMap((istHalt, i) => {
238250
ok(istHalt.HaltID, `vdvFahrt.${haltsKey}[${i}].HaltID must not be missing`)
239251
let depOrArrPrefix = null
240252
let depOrArr = null
@@ -255,32 +267,41 @@ const createMergeVdvFahrtWithRefAusSollFahrtAndAusIstFahrts = async (cfg) => {
255267

256268
// todo: what if there are >1 IstHalts stopping at the same HaltID within the same minute?
257269
// todo: what if an IstHalt is first seen only with an Ankunftszeit and later with an Abfahrtszeit? – it will be stored & read twice!
258-
const storageKey = [
259-
fahrtId,
270+
const hashField = [
260271
STORAGE_KEY_AUS_ISTFAHRT_PARTIAL,
261272
istHalt.HaltID,
262273
depOrArrPrefix,
263274
depOrArr,
264275
].join(':')
265276
return [
266-
storageKey,
267-
{
277+
hashField,
278+
JSON.stringify({
268279
...istHalt,
269280
IstFahrt: sparseIstFahrt,
270-
},
281+
}),
271282
]
272283
})
273-
await storage.putMany(entries)
284+
285+
// todo: support Valkey once they have a "Hash set + expiration" command
286+
await storage.hsetex(
287+
storageKey, // key of Redis Hash
288+
'PX', VDV_STORAGE_TTL, // expiration time in milliseconds
289+
'FIELDS', fieldsArgs.length / 2, // number of Hash fields to set
290+
...fieldsArgs,
291+
)
274292
} else {
275-
const storageKeySuffix = kind === KIND_SOLLFAHRT
293+
const field = kind === KIND_SOLLFAHRT
276294
? STORAGE_KEY_REF_AUS_SOLLFAHRT
277295
: STORAGE_KEY_AUS_ISTFAHRT_KOMPLETTFAHRT
278-
const storageKey = [
279-
fahrtId,
280-
storageKeySuffix,
281-
].join(':')
282296

283-
await storage.put(storageKey, vdvFahrt)
297+
// todo: support Valkey once they have a "Hash set + expiration" command
298+
await storage.hsetex(
299+
storageKey, // key of Redis Hash
300+
'PX', VDV_STORAGE_TTL, // expiration time in milliseconds
301+
'FIELDS', 1, // number of Hash fields to set
302+
field,
303+
JSON.stringify(vdvFahrt), // value
304+
)
284305
}
285306
}
286307
const storeRefAusSollFahrt = async (sollFahrt) => {
@@ -300,7 +321,7 @@ const createMergeVdvFahrtWithRefAusSollFahrtAndAusIstFahrts = async (cfg) => {
300321
if (!fahrtId) {
301322
return []
302323
}
303-
const storageKeyPrefix = fahrtId + ':'
324+
const storageKey = STORAGE_KEY_PREFIX + fahrtId
304325

305326
const res = {
306327
refAusSollFahrt: null,
@@ -309,8 +330,10 @@ const createMergeVdvFahrtWithRefAusSollFahrtAndAusIstFahrts = async (cfg) => {
309330
}
310331

311332
// load from storage
312-
for (const [key, item] of await storage.getMany(storageKeyPrefix)) {
313-
const [kindPart] = key.slice(storageKeyPrefix.length).split(':')
333+
const hash = await storage.hgetall(storageKey)
334+
for (const [key, val] of Object.entries(hash)) {
335+
const [kindPart] = key.split(':')
336+
const item = JSON.parse(val)
314337
if (kindPart === STORAGE_KEY_REF_AUS_SOLLFAHRT) {
315338
res.refAusSollFahrt = item
316339
} else if (kindPart === STORAGE_KEY_AUS_ISTFAHRT_KOMPLETTFAHRT) {
@@ -430,7 +453,7 @@ const createMergeVdvFahrtWithRefAusSollFahrtAndAusIstFahrts = async (cfg) => {
430453
}
431454

432455
const stop = async () => {
433-
await storage.stop()
456+
await storage.quit()
434457
}
435458

436459
const res = {

β€Žreadme.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ cd postgis-gtfs-importer && npm install --omit dev
187187

188188
- a [NATS message queue](https://docs.nats.io) with [JetStream](https://docs.nats.io/nats-concepts/jetstream) enabled
189189
- a [PostgreSQL database server](https://postgresql.org), with the permission to dynamically create new databases (see [postgis-gtfs-importer](https://github.com/mobidata-bw/postgis-gtfs-importer)'s readme)
190-
- a [Redis in-memory cache](https://redis.io/docs/latest/)
190+
- a [Redis in-memory cache](https://redis.io/docs/latest/), at least 8.0.0 is required (Valkey currently doesn't support the `HSETEX` command)
191191

192192
#### configure access to PostgreSQL
193193

β€Žtest/vdv-merging.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ const {
2929
})
3030
})
3131
beforeEach(async () => {
32-
await _vdvMergingStorage.redis.flushdb()
32+
await _vdvMergingStorage.flushdb()
3333
})
3434
after(async () => {
3535
await stopVdvMerging()

0 commit comments

Comments
Β (0)