@@ -2,6 +2,7 @@ package blockstore
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
5
6
"sync/atomic"
6
7
"time"
7
8
@@ -19,82 +20,95 @@ func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) (
19
20
if err != nil {
20
21
return nil , err
21
22
}
22
- bc := & bloomcache {blockstore : bs , bloom : bl }
23
- bc .hits = metrics .NewCtx (ctx , "bloom.hits_total" ,
24
- "Number of cache hits in bloom cache" ).Counter ()
25
- bc .total = metrics .NewCtx (ctx , "bloom_total" ,
26
- "Total number of requests to bloom cache" ).Counter ()
27
-
28
- bc .Invalidate ()
29
- go bc .Rebuild (ctx )
30
- if metrics .Active () {
31
- go func () {
23
+ bc := & bloomcache {
24
+ blockstore : bs ,
25
+ bloom : bl ,
26
+ hits : metrics .NewCtx (ctx , "bloom.hits_total" ,
27
+ "Number of cache hits in bloom cache" ).Counter (),
28
+ total : metrics .NewCtx (ctx , "bloom_total" ,
29
+ "Total number of requests to bloom cache" ).Counter (),
30
+ buildChan : make (chan struct {}),
31
+ }
32
+ go func () {
33
+ err := bc .build (ctx )
34
+ if err != nil {
35
+ select {
36
+ case <- ctx .Done ():
37
+ log .Warning ("Cache rebuild closed by context finishing: " , err )
38
+ default :
39
+ log .Error (err )
40
+ }
41
+ return
42
+ }
43
+ if metrics .Active () {
32
44
fill := metrics .NewCtx (ctx , "bloom_fill_ratio" ,
33
45
"Ratio of bloom filter fullnes, (updated once a minute)" ).Gauge ()
34
46
35
- <- bc .rebuildChan
36
47
t := time .NewTicker (1 * time .Minute )
48
+ defer t .Stop ()
37
49
for {
38
50
select {
39
51
case <- ctx .Done ():
40
- t .Stop ()
41
52
return
42
53
case <- t .C :
43
54
fill .Set (bc .bloom .FillRatio ())
44
55
}
45
56
}
46
- }()
47
- }
57
+ }
58
+ }()
48
59
return bc , nil
49
60
}
50
61
51
62
type bloomcache struct {
52
- bloom * bloom.Bloom
53
63
active int32
54
64
55
- // This chan is only used for testing to wait for bloom to enable
56
- rebuildChan chan struct {}
57
- blockstore Blockstore
65
+ bloom * bloom.Bloom
66
+ buildErr error
67
+
68
+ buildChan chan struct {}
69
+ blockstore Blockstore
58
70
59
71
// Statistics
60
72
hits metrics.Counter
61
73
total metrics.Counter
62
74
}
63
75
64
- func (b * bloomcache ) Invalidate () {
65
- b .rebuildChan = make (chan struct {})
66
- atomic .StoreInt32 (& b .active , 0 )
67
- }
68
-
69
76
func (b * bloomcache ) BloomActive () bool {
70
77
return atomic .LoadInt32 (& b .active ) != 0
71
78
}
72
79
73
- func (b * bloomcache ) Rebuild (ctx context.Context ) {
74
- evt := log .EventBegin (ctx , "bloomcache.Rebuild" )
80
+ func (b * bloomcache ) Wait (ctx context.Context ) error {
81
+ select {
82
+ case <- ctx .Done ():
83
+ return ctx .Err ()
84
+ case <- b .buildChan :
85
+ return b .buildErr
86
+ }
87
+ }
88
+
89
+ func (b * bloomcache ) build (ctx context.Context ) error {
90
+ evt := log .EventBegin (ctx , "bloomcache.build" )
75
91
defer evt .Done ()
92
+ defer close (b .buildChan )
76
93
77
94
ch , err := b .blockstore .AllKeysChan (ctx )
78
95
if err != nil {
79
- log .Errorf ("AllKeysChan failed in bloomcache rebuild with: %v" , err )
80
- return
96
+ b . buildErr = fmt .Errorf ("AllKeysChan failed in bloomcache rebuild with: %v" , err )
97
+ return b . buildErr
81
98
}
82
- finish := false
83
- for ! finish {
99
+ for {
84
100
select {
85
101
case key , ok := <- ch :
86
- if ok {
87
- b .bloom .AddTS (key .Bytes ()) // Use binary key, the more compact the better
88
- } else {
89
- finish = true
102
+ if ! ok {
103
+ atomic .StoreInt32 (& b .active , 1 )
104
+ return nil
90
105
}
106
+ b .bloom .AddTS (key .Bytes ()) // Use binary key, the more compact the better
91
107
case <- ctx .Done ():
92
- log . Warning ( "Cache rebuild closed by context finishing." )
93
- return
108
+ b . buildErr = ctx . Err ( )
109
+ return b . buildErr
94
110
}
95
111
}
96
- close (b .rebuildChan )
97
- atomic .StoreInt32 (& b .active , 1 )
98
112
}
99
113
100
114
func (b * bloomcache ) DeleteBlock (k cid.Cid ) error {
0 commit comments