@@ -2,8 +2,10 @@ package rueidis
2
2
3
3
import (
4
4
"context"
5
+ "runtime"
5
6
"sync"
6
7
"time"
8
+ "unsafe"
7
9
)
8
10
9
11
// NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation
@@ -178,3 +180,230 @@ func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) {
178
180
return a .val , a .err
179
181
}
180
182
}
183
+
184
+ type flatentry struct {
185
+ ovfl * flatentry
186
+ next unsafe.Pointer
187
+ prev unsafe.Pointer
188
+ cmd string
189
+ key string
190
+ val []byte
191
+ ttl int64
192
+ siz int64
193
+ mu sync.Mutex
194
+ mark int64
195
+ }
196
+
197
+ func (f * flatentry ) insert (e * flatentry ) {
198
+ f .siz += e .siz
199
+ f .mu .Lock ()
200
+ defer f .mu .Unlock ()
201
+ e .ovfl = f .ovfl
202
+ f .ovfl = e
203
+ }
204
+
205
+ func (f * flatentry ) find (cmd string , ts int64 ) (ret RedisMessage , expired bool ) {
206
+ if f == nil {
207
+ return
208
+ }
209
+ if ts >= f .ttl {
210
+ expired = true
211
+ return
212
+ }
213
+ if cmd == f .cmd {
214
+ _ = ret .CacheUnmarshalView (f .val )
215
+ return
216
+ }
217
+ f .mu .Lock ()
218
+ ovfl := f .ovfl
219
+ f .mu .Unlock ()
220
+ return ovfl .find (cmd , ts )
221
+ }
222
+
223
+ const lrBatchSize = 64
224
+
225
+ type lrBatch struct {
226
+ m map [* flatentry ]struct {}
227
+ }
228
+
229
+ func NewFlattenCache (limit int64 ) CacheStore {
230
+ f := & flatten {
231
+ flights : make (map [string ]* adapterEntry ),
232
+ cache : make (map [string ]* flatentry ),
233
+ head : & flatentry {},
234
+ tail : & flatentry {},
235
+ size : 0 ,
236
+ limit : limit ,
237
+ }
238
+ f .head .next = unsafe .Pointer (f .tail )
239
+ f .tail .prev = unsafe .Pointer (f .head )
240
+ f .lrup = sync.Pool {New : func () any {
241
+ b := & lrBatch {m : make (map [* flatentry ]struct {}, lrBatchSize )}
242
+ runtime .SetFinalizer (b , func () {
243
+ f .llTailBatch (b )
244
+ })
245
+ return b
246
+ }}
247
+ return f
248
+ }
249
+
250
+ type flatten struct {
251
+ flights map [string ]* adapterEntry
252
+ cache map [string ]* flatentry
253
+ head * flatentry
254
+ tail * flatentry
255
+ lrup sync.Pool
256
+ mark int64
257
+ size int64
258
+ limit int64
259
+ mu sync.RWMutex
260
+ }
261
+
262
+ func (f * flatten ) llAdd (e * flatentry ) {
263
+ e .mark = f .mark
264
+ e .prev = f .tail .prev
265
+ e .next = unsafe .Pointer (f .tail )
266
+ f .tail .prev = unsafe .Pointer (e )
267
+ (* flatentry )(e .prev ).next = unsafe .Pointer (e )
268
+ }
269
+
270
+ func (f * flatten ) llDel (e * flatentry ) {
271
+ (* flatentry )(e .prev ).next = e .next
272
+ (* flatentry )(e .next ).prev = e .prev
273
+ e .mark = 0
274
+ }
275
+
276
+ func (f * flatten ) llTail (e * flatentry ) {
277
+ if e .mark == f .mark {
278
+ f .llDel (e )
279
+ f .llAdd (e )
280
+ }
281
+ }
282
+
283
+ func (f * flatten ) llTailBatch (b * lrBatch ) {
284
+ f .mu .Lock ()
285
+ for e := range b .m {
286
+ f .llTail (e )
287
+ }
288
+ f .mu .Unlock ()
289
+ clear (b .m )
290
+ }
291
+
292
+ func (f * flatten ) remove (e * flatentry ) {
293
+ f .size -= e .siz
294
+ f .llDel (e )
295
+ delete (f .cache , e .key )
296
+ }
297
+
298
+ func (f * flatten ) Flight (key , cmd string , ttl time.Duration , now time.Time ) (RedisMessage , CacheEntry ) {
299
+ f .mu .RLock ()
300
+ e := f .cache [key ]
301
+ f .mu .RUnlock ()
302
+ ts := now .UnixMilli ()
303
+ if v , _ := e .find (cmd , ts ); v .typ != 0 {
304
+ batch := f .lrup .Get ().(* lrBatch )
305
+ batch .m [e ] = struct {}{}
306
+ if len (batch .m ) == lrBatchSize {
307
+ f .llTailBatch (batch )
308
+ }
309
+ f .lrup .Put (batch )
310
+ return v , nil
311
+ }
312
+ fk := key + cmd
313
+ f .mu .RLock ()
314
+ af := f .flights [fk ]
315
+ f .mu .RUnlock ()
316
+ if af != nil {
317
+ return RedisMessage {}, af
318
+ }
319
+ f .mu .Lock ()
320
+ defer f .mu .Unlock ()
321
+ e = f .cache [key ]
322
+ v , expired := e .find (cmd , ts )
323
+ if v .typ != 0 {
324
+ f .llTail (e )
325
+ return v , nil
326
+ }
327
+ if expired {
328
+ f .remove (e )
329
+ }
330
+ if af = f .flights [fk ]; af != nil {
331
+ return RedisMessage {}, af
332
+ }
333
+ f .flights [fk ] = & adapterEntry {ch : make (chan struct {}), xat : ts + ttl .Milliseconds ()}
334
+ return RedisMessage {}, nil
335
+ }
336
+
337
+ func (f * flatten ) Update (key , cmd string , val RedisMessage ) int64 {
338
+ fk := key + cmd
339
+ bs := val .CacheMarshal (nil )
340
+ fe := & flatentry {cmd : cmd , val : bs , ttl : val .CachePXAT (), siz : int64 (len (bs )+ len (key )+ len (cmd )) + int64 (unsafe .Sizeof (flatentry {}))}
341
+ f .mu .Lock ()
342
+ af := f .flights [fk ]
343
+ if af != nil {
344
+ delete (f .flights , fk )
345
+ if af .xat < fe .ttl {
346
+ fe .ttl = af .xat
347
+ }
348
+ }
349
+ f .size += fe .siz
350
+ for ep := f .head .next ; f .size > f .limit && ep != unsafe .Pointer (f .tail ); {
351
+ e := (* flatentry )(ep )
352
+ f .remove (e )
353
+ ep = e .next
354
+ }
355
+ if e := f .cache [key ]; e == nil {
356
+ fe .key = key
357
+ f .cache [key ] = fe
358
+ f .llAdd (fe )
359
+ } else {
360
+ e .insert (fe )
361
+ }
362
+ f .mu .Unlock ()
363
+ if af != nil {
364
+ af .set (val , nil )
365
+ }
366
+ return fe .ttl
367
+ }
368
+
369
+ func (f * flatten ) Cancel (key , cmd string , err error ) {
370
+ fk := key + cmd
371
+ f .mu .Lock ()
372
+ defer f .mu .Unlock ()
373
+ if af := f .flights [fk ]; af != nil {
374
+ delete (f .flights , fk )
375
+ af .set (RedisMessage {}, err )
376
+ }
377
+ }
378
+
379
+ func (f * flatten ) Delete (keys []RedisMessage ) {
380
+ f .mu .Lock ()
381
+ defer f .mu .Unlock ()
382
+ if keys == nil {
383
+ f .cache = make (map [string ]* flatentry , len (f .cache ))
384
+ f .head .next = unsafe .Pointer (f .tail )
385
+ f .tail .prev = unsafe .Pointer (f .head )
386
+ f .mark ++
387
+ f .size = 0
388
+ } else {
389
+ for _ , k := range keys {
390
+ if e := f .cache [k .string ]; e != nil {
391
+ f .remove (e )
392
+ }
393
+ }
394
+ }
395
+ }
396
+
397
+ func (f * flatten ) Close (err error ) {
398
+ f .mu .Lock ()
399
+ flights := f .flights
400
+ f .flights = nil
401
+ f .cache = nil
402
+ f .tail = nil
403
+ f .head = nil
404
+ f .mark ++
405
+ f .mu .Unlock ()
406
+ for _ , entry := range flights {
407
+ entry .set (RedisMessage {}, err )
408
+ }
409
+ }
0 commit comments