Skip to content

Commit 33fb92a

Browse files
committed
CSC POC ontop of Parser
1 parent 4708736 commit 33fb92a

16 files changed

+273
-54
lines changed

Diff for: packages/client/index.ts

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import RedisSentinel from './lib/sentinel';
2020
export { RedisSentinelOptions, RedisSentinelType } from './lib/sentinel/types';
2121
export const createSentinel = RedisSentinel.create;
2222

23+
import { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache';
24+
export { BasicClientSideCache, BasicPooledClientSideCache };
25+
2326
// export { GeoReplyWith } from './lib/commands/generic-transformers';
2427

2528
// export { SetOptions } from './lib/commands/SET';

Diff for: packages/client/lib/client/commands-queue.ts

+20-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ export default class RedisCommandsQueue {
5656
return this.#pubSub.isActive;
5757
}
5858

59+
#invalidateCallback?: (key: RedisArgument | null) => unknown;
60+
5961
constructor(
6062
respVersion: RespVersions,
6163
maxLength: number | null | undefined,
@@ -109,13 +111,30 @@ export default class RedisCommandsQueue {
109111
onErrorReply: err => this.#onErrorReply(err),
110112
onPush: push => {
111113
if (!this.#onPush(push)) {
112-
114+
switch (push[0].toString()) {
115+
case "invalidate": {
116+
if (this.#invalidateCallback) {
117+
if (push[1] !== null) {
118+
for (const key of push[1]) {
119+
this.#invalidateCallback(key);
120+
}
121+
} else {
122+
this.#invalidateCallback(null);
123+
}
124+
}
125+
break;
126+
}
127+
}
113128
}
114129
},
115130
getTypeMapping: () => this.#getTypeMapping()
116131
});
117132
}
118133

134+
setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
135+
this.#invalidateCallback = callback;
136+
}
137+
119138
addCommand<T>(
120139
args: ReadonlyArray<RedisArgument>,
121140
options?: CommandOptions

Diff for: packages/client/lib/client/index.ts

+50-12
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
1515
import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode';
1616
import { RedisPoolOptions, RedisClientPool } from './pool';
1717
import { RedisVariadicArgument, parseArgs, pushVariadicArguments } from '../commands/generic-transformers';
18+
import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } from './cache';
1819
import { BasicCommandParser, CommandParser } from './parser';
1920

2021
export interface RedisClientOptions<
@@ -72,6 +73,10 @@ export interface RedisClientOptions<
7273
* TODO
7374
*/
7475
commandOptions?: CommandOptions<TYPE_MAPPING>;
76+
/**
77+
* TODO
78+
*/
79+
clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig;
7580
}
7681

7782
type WithCommands<
@@ -280,10 +285,12 @@ export default class RedisClient<
280285
#monitorCallback?: MonitorCallback<TYPE_MAPPING>;
281286
private _self = this;
282287
private _commandOptions?: CommandOptions<TYPE_MAPPING>;
288+
283289
#dirtyWatch?: string;
284-
#epoch: number;
285290
#watchEpoch?: number;
286291

292+
#clientSideCache?: ClientSideCacheProvider;
293+
287294
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
288295
return this._self.#options;
289296
}
@@ -300,6 +307,11 @@ export default class RedisClient<
300307
return this._self.#queue.isPubSubActive;
301308
}
302309

310+
get socketEpoch() {
311+
return this._self.#socket.socketEpoch;
312+
}
313+
314+
303315
get isWatching() {
304316
return this._self.#watchEpoch !== undefined;
305317
}
@@ -310,10 +322,20 @@ export default class RedisClient<
310322

311323
constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
312324
super();
325+
313326
this.#options = this.#initiateOptions(options);
314327
this.#queue = this.#initiateQueue();
315328
this.#socket = this.#initiateSocket();
316-
this.#epoch = 0;
329+
330+
if (options?.clientSideCache) {
331+
if (options.clientSideCache instanceof ClientSideCacheProvider) {
332+
this.#clientSideCache = options.clientSideCache;
333+
} else {
334+
const cscConfig = options.clientSideCache;
335+
this.#clientSideCache = new BasicClientSideCache(cscConfig);
336+
}
337+
this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache));
338+
}
317339
}
318340

319341
#initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined {
@@ -347,7 +369,6 @@ export default class RedisClient<
347369

348370
#handshake(selectedDB: number) {
349371
const commands = [];
350-
351372
if (this.#options?.RESP) {
352373
const hello: HelloOptions = {};
353374

@@ -392,6 +413,13 @@ export default class RedisClient<
392413
);
393414
}
394415

416+
if (this.#clientSideCache) {
417+
const tracking = this.#clientSideCache.trackingOn();
418+
if (tracking) {
419+
commands.push(tracking);
420+
}
421+
}
422+
395423
return commands;
396424
}
397425

@@ -445,6 +473,7 @@ export default class RedisClient<
445473
})
446474
.on('error', err => {
447475
this.emit('error', err);
476+
this.#clientSideCache?.onError();
448477
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
449478
this.#queue.flushWaitingForReply(err);
450479
} else {
@@ -453,7 +482,6 @@ export default class RedisClient<
453482
})
454483
.on('connect', () => this.emit('connect'))
455484
.on('ready', () => {
456-
this.#epoch++;
457485
this.emit('ready');
458486
this.#setPingTimer();
459487
this.#maybeScheduleWrite();
@@ -581,13 +609,21 @@ export default class RedisClient<
581609
commandOptions: CommandOptions<TYPE_MAPPING> | undefined,
582610
transformReply: TransformReply | undefined,
583611
) {
584-
const reply = await this.sendCommand(parser.redisArgs, commandOptions);
612+
const csc = this._self.#clientSideCache;
613+
const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions;
585614

586-
if (transformReply) {
587-
return transformReply(reply, parser.preserve, commandOptions?.typeMapping);
588-
}
615+
const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) };
616+
617+
if (csc && command.CACHEABLE && defaultTypeMapping) {
618+
return await csc.handleCache(this._self, parser as BasicCommandParser, fn, transformReply, commandOptions?.typeMapping);
619+
} else {
620+
const reply = await fn();
589621

590-
return reply;
622+
if (transformReply) {
623+
return transformReply(reply, parser.preserve, commandOptions?.typeMapping);
624+
}
625+
return reply;
626+
}
591627
}
592628

593629
/**
@@ -752,7 +788,7 @@ export default class RedisClient<
752788
const reply = await this._self.sendCommand(
753789
pushVariadicArguments(['WATCH'], key)
754790
);
755-
this._self.#watchEpoch ??= this._self.#epoch;
791+
this._self.#watchEpoch ??= this._self.socketEpoch;
756792
return reply as unknown as ReplyWithTypeMapping<SimpleStringReply<'OK'>, TYPE_MAPPING>;
757793
}
758794

@@ -819,7 +855,7 @@ export default class RedisClient<
819855
}
820856

821857
const chainId = Symbol('Pipeline Chain'),
822-
promise = Promise.all(
858+
promise = Promise.allSettled(
823859
commands.map(({ args }) => this._self.#queue.addCommand(args, {
824860
chainId,
825861
typeMapping: this._commandOptions?.typeMapping
@@ -855,7 +891,7 @@ export default class RedisClient<
855891
throw new WatchError(dirtyWatch);
856892
}
857893

858-
if (watchEpoch && watchEpoch !== this._self.#epoch) {
894+
if (watchEpoch && watchEpoch !== this._self.socketEpoch) {
859895
throw new WatchError('Client reconnected after WATCH');
860896
}
861897

@@ -1075,6 +1111,7 @@ export default class RedisClient<
10751111
return new Promise<void>(resolve => {
10761112
clearTimeout(this._self.#pingTimer);
10771113
this._self.#socket.close();
1114+
this._self.#clientSideCache?.onClose();
10781115

10791116
if (this._self.#queue.isEmpty()) {
10801117
this._self.#socket.destroySocket();
@@ -1099,6 +1136,7 @@ export default class RedisClient<
10991136
clearTimeout(this._self.#pingTimer);
11001137
this._self.#queue.flushAll(new DisconnectsClientError());
11011138
this._self.#socket.destroy();
1139+
this._self.#clientSideCache?.onClose();
11021140
}
11031141

11041142
ref() {

Diff for: packages/client/lib/client/linked-list.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ export class DoublyLinkedList<T> {
114114
export interface SinglyLinkedNode<T> {
115115
value: T;
116116
next: SinglyLinkedNode<T> | undefined;
117+
removed: boolean;
117118
}
118119

119120
export class SinglyLinkedList<T> {
@@ -140,7 +141,8 @@ export class SinglyLinkedList<T> {
140141

141142
const node = {
142143
value,
143-
next: undefined
144+
next: undefined,
145+
removed: false
144146
};
145147

146148
if (this.#head === undefined) {
@@ -151,6 +153,9 @@ export class SinglyLinkedList<T> {
151153
}
152154

153155
remove(node: SinglyLinkedNode<T>, parent: SinglyLinkedNode<T> | undefined) {
156+
if (node.removed) {
157+
throw new Error("node already removed");
158+
}
154159
--this.#length;
155160

156161
if (this.#head === node) {
@@ -165,6 +170,8 @@ export class SinglyLinkedList<T> {
165170
} else {
166171
parent!.next = node.next;
167172
}
173+
174+
node.removed = true;
168175
}
169176

170177
shift() {
@@ -177,6 +184,7 @@ export class SinglyLinkedList<T> {
177184
this.#head = node.next;
178185
}
179186

187+
node.removed = true;
180188
return node.value;
181189
}
182190

Diff for: packages/client/lib/client/parser.ts

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ export class BasicCommandParser implements CommandParser {
3333
return this.#keys[0];
3434
}
3535

36+
get cacheKey() {
37+
let cacheKey = this.#redisArgs.map((arg) => arg.length).join('_');
38+
return cacheKey + '_' + this.#redisArgs.join('_');
39+
}
40+
3641
push(...arg: Array<RedisArgument>) {
3742
this.#redisArgs.push(...arg);
3843
};

0 commit comments

Comments
 (0)