@@ -8,26 +8,26 @@ import (
8
8
)
9
9
10
10
const (
11
- prefetchEvictionInterval = 5 * time .Second
12
- prefetchSegmentIdleTTL = 10 * time .Second // remove stale segments if no reads in the past 30s
13
11
preemptiveFetchThresholdBytes = 16 * 1024 * 1024 // if the next segment is within 16MB of where we are reading, start fetching it
14
12
)
15
13
16
14
type PrefetchManager struct {
17
- ctx context.Context
18
- config BlobCacheConfig
19
- buffers sync.Map
20
- client * BlobCacheClient
21
- currentPrefetchSizeBytes uint64
15
+ ctx context.Context
16
+ config BlobCacheConfig
17
+ buffers sync.Map
18
+ client * BlobCacheClient
19
+ segmentIdleTTL time.Duration
20
+ evictionInterval time.Duration
22
21
}
23
22
24
23
func NewPrefetchManager (ctx context.Context , config BlobCacheConfig , client * BlobCacheClient ) * PrefetchManager {
25
24
return & PrefetchManager {
26
- ctx : ctx ,
27
- config : config ,
28
- buffers : sync.Map {},
29
- client : client ,
30
- currentPrefetchSizeBytes : 0 ,
25
+ ctx : ctx ,
26
+ config : config ,
27
+ buffers : sync.Map {},
28
+ client : client ,
29
+ segmentIdleTTL : time .Duration (config .BlobFs .Prefetch .IdleTtlS ) * time .Second ,
30
+ evictionInterval : time .Duration (config .BlobFs .Prefetch .EvictionIntervalS ) * time .Second ,
31
31
}
32
32
}
33
33
@@ -62,15 +62,15 @@ func (pm *PrefetchManager) evictIdleBuffers() {
62
62
select {
63
63
case <- pm .ctx .Done ():
64
64
return
65
- case <- time .After (prefetchEvictionInterval ):
65
+ case <- time .After (pm . evictionInterval ):
66
66
pm .buffers .Range (func (key , value any ) bool {
67
67
buffer := value .(* PrefetchBuffer )
68
68
69
- // If no reads have happened in any segments in the buffer
69
+ // If no reads have happened in any windows in the buffer
70
70
// stop any fetch operations and clear the buffer so it can
71
71
// be garbage collected
72
- unused := buffer .evictIdle ()
73
- if unused {
72
+ idle := buffer .IsIdle ()
73
+ if idle {
74
74
buffer .Clear ()
75
75
pm .buffers .Delete (key )
76
76
}
@@ -149,12 +149,23 @@ func (pb *PrefetchBuffer) fetch(offset uint64, bufferSize uint64) {
149
149
}
150
150
}
151
151
152
- w := & window {
153
- index : bufferIndex ,
154
- data : make ([]byte , 0 , bufferSize ),
155
- readLength : 0 ,
156
- lastRead : time .Now (),
157
- fetching : true ,
152
+ existingWindow := pb .prevWindow
153
+ var w * window
154
+ if existingWindow != nil {
155
+ w = existingWindow
156
+ w .index = bufferIndex
157
+ w .readLength = 0
158
+ w .data = make ([]byte , 0 , bufferSize )
159
+ w .lastRead = time .Now ()
160
+ w .fetching = true
161
+ } else {
162
+ w = & window {
163
+ index : bufferIndex ,
164
+ data : make ([]byte , 0 , bufferSize ),
165
+ readLength : 0 ,
166
+ lastRead : time .Now (),
167
+ fetching : true ,
168
+ }
158
169
}
159
170
160
171
// Slide windows
@@ -192,24 +203,21 @@ func (pb *PrefetchBuffer) fetch(offset uint64, bufferSize uint64) {
192
203
}
193
204
}
194
205
195
- func (pb * PrefetchBuffer ) evictIdle () bool {
196
- unused := true
206
+ func (pb * PrefetchBuffer ) IsIdle () bool {
207
+ idle := true
197
208
198
209
pb .mu .Lock ()
199
210
windows := []* window {pb .prevWindow , pb .currentWindow , pb .nextWindow }
200
- for i , w := range windows {
201
- if w != nil && time .Since (w .lastRead ) > prefetchSegmentIdleTTL && ! w .fetching {
202
- Logger .Debugf ("Evicting segment %s-%d" , pb .hash , w .index )
203
- w .data = nil
204
- windows [i ] = nil
211
+ for _ , w := range windows {
212
+ if w != nil && time .Since (w .lastRead ) > pb .manager .segmentIdleTTL && ! w .fetching {
213
+ continue
205
214
} else {
206
- unused = false
215
+ idle = false
207
216
}
208
217
}
209
- pb .prevWindow , pb .currentWindow , pb .nextWindow = windows [0 ], windows [1 ], windows [2 ]
210
218
pb .mu .Unlock ()
211
219
212
- return unused
220
+ return idle
213
221
}
214
222
215
223
func (pb * PrefetchBuffer ) Clear () {
@@ -218,6 +226,8 @@ func (pb *PrefetchBuffer) Clear() {
218
226
pb .mu .Lock ()
219
227
defer pb .mu .Unlock ()
220
228
229
+ Logger .Infof ("Evicting idle prefetch buffer - %s" , pb .hash )
230
+
221
231
// Clear all window data
222
232
windows := []* window {pb .prevWindow , pb .currentWindow , pb .nextWindow }
223
233
for _ , window := range windows {
0 commit comments