Skip to content

Commit d096756

Browse files
committed
eth/filters: safe chain view update
1 parent 03e88a8 commit d096756

File tree

10 files changed

+230
-157
lines changed

10 files changed

+230
-157
lines changed

core/filtermaps/matcher.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type MatcherBackend interface {
5757
// all states of the chain since the previous SyncLogIndex or the creation of
5858
// the matcher backend.
5959
type SyncRange struct {
60-
HeadNumber uint64
60+
IndexedView *ChainView
6161
// block range where the index has not changed since the last matcher sync
6262
// and therefore the set of matches found in this region is guaranteed to
6363
// be valid and complete.

core/filtermaps/matcher_backend.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (fm *FilterMapsMatcherBackend) synced() {
128128
indexedBlocks.SetAfterLast(indexedBlocks.Last()) // remove partially indexed last block
129129
}
130130
fm.syncCh <- SyncRange{
131-
HeadNumber: fm.f.targetView.HeadNumber(),
131+
IndexedView: fm.f.indexedView,
132132
ValidBlocks: fm.validBlocks,
133133
IndexedBlocks: indexedBlocks,
134134
}
@@ -154,15 +154,15 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
154154
case <-ctx.Done():
155155
return SyncRange{}, ctx.Err()
156156
case <-fm.f.disabledCh:
157-
return SyncRange{HeadNumber: fm.f.targetView.HeadNumber()}, nil
157+
return SyncRange{IndexedView: fm.f.indexedView}, nil
158158
}
159159
select {
160160
case vr := <-syncCh:
161161
return vr, nil
162162
case <-ctx.Done():
163163
return SyncRange{}, ctx.Err()
164164
case <-fm.f.disabledCh:
165-
return SyncRange{HeadNumber: fm.f.targetView.HeadNumber()}, nil
165+
return SyncRange{IndexedView: fm.f.indexedView}, nil
166166
}
167167
}
168168

eth/api_backend.go

+8
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,14 @@ func (b *EthAPIBackend) RPCTxFeeCap() float64 {
443443
return b.eth.config.RPCTxFeeCap
444444
}
445445

446+
func (b *EthAPIBackend) CurrentView() *filtermaps.ChainView {
447+
head := b.eth.blockchain.CurrentBlock()
448+
if head == nil {
449+
return nil
450+
}
451+
return filtermaps.NewChainView(b.eth.blockchain, head.Number.Uint64(), head.Hash())
452+
}
453+
446454
func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend {
447455
return b.eth.filterMaps.NewMatcherBackend()
448456
}

eth/filters/filter.go

+118-105
Original file line numberDiff line numberDiff line change
@@ -146,25 +146,29 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
146146
}
147147

148148
const (
149-
rangeLogsTestSync = iota
150-
rangeLogsTestTrimmed
151-
rangeLogsTestIndexed
152-
rangeLogsTestUnindexed
153-
rangeLogsTestDone
149+
rangeLogsTestDone = iota // zero range
150+
rangeLogsTestSync // before sync; zero range
151+
rangeLogsTestSynced // after sync; valid blocks range
152+
rangeLogsTestIndexed // individual search range
153+
rangeLogsTestUnindexed // individual search range
154+
rangeLogsTestResults // results range after search iteration
155+
rangeLogsTestReorg // results range trimmed by reorg
154156
)
155157

156158
type rangeLogsTestEvent struct {
157-
event int
158-
begin, end uint64
159+
event int
160+
blocks common.Range[uint64]
159161
}
160162

161163
// searchSession represents a single search session.
162164
type searchSession struct {
163-
ctx context.Context
164-
filter *Filter
165-
mb filtermaps.MatcherBackend
166-
syncRange filtermaps.SyncRange // latest synchronized state with the matcher
167-
firstBlock, lastBlock uint64 // specified search range; each can be MaxUint64
165+
ctx context.Context
166+
filter *Filter
167+
mb filtermaps.MatcherBackend
168+
syncRange filtermaps.SyncRange // latest synchronized state with the matcher
169+
chainView *filtermaps.ChainView // can be more recent than the indexed view in syncRange
170+
// block ranges always refer to the current chainView
171+
firstBlock, lastBlock uint64 // specified search range; MaxUint64 means latest block
168172
searchRange common.Range[uint64] // actual search range; end trimmed to latest head
169173
matchRange common.Range[uint64] // range in which we have results (subset of searchRange)
170174
matches []*types.Log // valid set of matches in matchRange
@@ -182,140 +186,136 @@ func newSearchSession(ctx context.Context, filter *Filter, mb filtermaps.Matcher
182186
}
183187
// enforce a consistent state before starting the search in order to be able
184188
// to determine valid range later
185-
if err := s.syncMatcher(0); err != nil {
189+
var err error
190+
s.syncRange, err = s.mb.SyncLogIndex(s.ctx)
191+
if err != nil {
192+
return nil, err
193+
}
194+
if err := s.updateChainView(); err != nil {
186195
return nil, err
187196
}
188197
return s, nil
189198
}
190199

191-
// syncMatcher performs a synchronization step with the matcher. The resulting
192-
// syncRange structure holds information about the latest range of indexed blocks
193-
// and the guaranteed valid blocks whose log index have not been changed since
194-
// the previous synchronization.
195-
// The function also performs trimming of the match set in order to always keep
196-
// it consistent with the synced matcher state.
197-
// Tail trimming is only performed if the first block of the valid log index range
198-
// is higher than trimTailThreshold. This is useful because unindexed log search
199-
// is not affected by the valid tail (on the other hand, valid head is taken into
200-
// account in order to provide reorg safety, even though the log index is not used).
201-
// In case of indexed search the tail is only trimmed if the first part of the
202-
// recently obtained results might be invalid. If guaranteed valid new results
203-
// have been added at the head of previously validated results then there is no
204-
// need to discard those even if the index tail have been unindexed since that.
205-
func (s *searchSession) syncMatcher(trimTailThreshold uint64) error {
206-
if s.filter.rangeLogsTestHook != nil && !s.matchRange.IsEmpty() {
207-
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestSync, begin: s.matchRange.First(), end: s.matchRange.Last()}
200+
// updateChainView updates to the latest view of the underlying chain and sets
201+
// searchRange by replacing MaxUint64 (meaning latest block) with actual head
202+
// number in the specified search range.
203+
// If the session already had an existing chain view and set of matches then
204+
// it also trims part of the match set that a chain reorg might have invalidated.
205+
func (s *searchSession) updateChainView() error {
206+
// update chain view based on current chain head (might be more recent than
207+
// the indexed view of syncRange as the indexer updates it asynchronously
208+
// with some delay
209+
newChainView := s.filter.sys.backend.CurrentView()
210+
if newChainView == nil {
211+
return errors.New("head block not available")
212+
}
213+
head := newChainView.HeadNumber()
214+
// update actual search range based on current head number
215+
firstBlock, lastBlock := s.firstBlock, s.lastBlock
216+
if firstBlock == math.MaxUint64 {
217+
firstBlock = head
208218
}
209-
var err error
210-
s.syncRange, err = s.mb.SyncLogIndex(s.ctx)
211-
if err != nil {
212-
return err
219+
if lastBlock == math.MaxUint64 {
220+
lastBlock = head
213221
}
214-
// update actual search range based on current head number
215-
first := min(s.firstBlock, s.syncRange.HeadNumber)
216-
last := min(s.lastBlock, s.syncRange.HeadNumber)
217-
s.searchRange = common.NewRange(first, last+1-first)
218-
// discard everything that is not needed or might be invalid
219-
trimRange := s.syncRange.ValidBlocks
220-
if trimRange.First() <= trimTailThreshold {
221-
// everything before this point is already known to be valid; if this is
222-
// valid then keep everything before
223-
trimRange.SetFirst(0)
224-
}
225-
trimRange = trimRange.Intersection(s.searchRange)
226-
s.trimMatches(trimRange)
227-
if s.filter.rangeLogsTestHook != nil {
228-
if !s.matchRange.IsEmpty() {
229-
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestTrimmed, begin: s.matchRange.First(), end: s.matchRange.Last()}
230-
} else {
231-
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestTrimmed, begin: 0, end: 0}
232-
}
222+
if firstBlock > lastBlock || lastBlock > head {
223+
return errInvalidBlockRange
224+
}
225+
s.searchRange = common.NewRange(firstBlock, lastBlock+1-firstBlock)
226+
if !s.matchRange.IsEmpty() {
227+
// trim existing match set in case a reorg may have invalidated some results
228+
trimRange := newChainView.SharedRange(s.chainView).Intersection(s.searchRange)
229+
s.matchRange, s.matches = s.trimMatches(trimRange, s.matchRange, s.matches)
233230
}
231+
s.chainView = newChainView
234232
return nil
235233
}
236234

237-
// trimMatches removes any entries from the current set of matches that is outside
238-
// the given range.
239-
func (s *searchSession) trimMatches(trimRange common.Range[uint64]) {
240-
s.matchRange = s.matchRange.Intersection(trimRange)
241-
if s.matchRange.IsEmpty() {
242-
s.matches = nil
243-
return
235+
// trimMatches removes any entries from the specified set of matches that is
236+
// outside the given range.
237+
func (s *searchSession) trimMatches(trimRange, matchRange common.Range[uint64], matches []*types.Log) (common.Range[uint64], []*types.Log) {
238+
newRange := matchRange.Intersection(trimRange)
239+
if newRange == matchRange {
240+
return matchRange, matches
244241
}
245-
for len(s.matches) > 0 && s.matches[0].BlockNumber < s.matchRange.First() {
246-
s.matches = s.matches[1:]
242+
if newRange.IsEmpty() {
243+
return newRange, nil
247244
}
248-
for len(s.matches) > 0 && s.matches[len(s.matches)-1].BlockNumber > s.matchRange.Last() {
249-
s.matches = s.matches[:len(s.matches)-1]
245+
for len(matches) > 0 && matches[0].BlockNumber < newRange.First() {
246+
matches = matches[1:]
250247
}
248+
for len(matches) > 0 && matches[len(matches)-1].BlockNumber > newRange.Last() {
249+
matches = matches[:len(matches)-1]
250+
}
251+
return newRange, matches
251252
}
252253

253254
// searchInRange performs a single range search, either indexed or unindexed.
254-
func (s *searchSession) searchInRange(r common.Range[uint64], indexed bool) ([]*types.Log, error) {
255-
first, last := r.First(), r.Last()
255+
func (s *searchSession) searchInRange(r common.Range[uint64], indexed bool) (common.Range[uint64], []*types.Log, error) {
256256
if indexed {
257257
if s.filter.rangeLogsTestHook != nil {
258-
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestIndexed, first, last}
258+
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestIndexed, r}
259259
}
260-
results, err := s.filter.indexedLogs(s.ctx, s.mb, first, last)
260+
results, err := s.filter.indexedLogs(s.ctx, s.mb, r.First(), r.Last())
261261
if err != filtermaps.ErrMatchAll {
262-
return results, err
262+
// sync with filtermaps matcher
263+
if s.filter.rangeLogsTestHook != nil {
264+
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestSync, common.Range[uint64]{}}
265+
}
266+
var syncErr error
267+
if s.syncRange, syncErr = s.mb.SyncLogIndex(s.ctx); syncErr != nil {
268+
return common.Range[uint64]{}, nil, syncErr
269+
}
270+
if s.filter.rangeLogsTestHook != nil {
271+
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestSynced, s.syncRange.ValidBlocks}
272+
}
273+
// discard everything that might be invalid
274+
trimRange := s.syncRange.ValidBlocks.Intersection(s.chainView.SharedRange(s.syncRange.IndexedView))
275+
matchRange, matches := s.trimMatches(trimRange, r, results)
276+
return matchRange, matches, err
263277
}
264278
// "match all" filters are not supported by filtermaps; fall back to
265279
// unindexed search which is the most efficient in this case
266280
s.forceUnindexed = true
267281
// fall through to unindexed case
268282
}
269283
if s.filter.rangeLogsTestHook != nil {
270-
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, first, last}
284+
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, r}
271285
}
272-
return s.filter.unindexedLogs(s.ctx, first, last)
286+
matches, err := s.filter.unindexedLogs(s.ctx, s.chainView, r.First(), r.Last())
287+
return r, matches, err
273288
}
274289

275290
// doSearchIteration performs a search on a range missing from an incomplete set
276291
// of results, adds the new section and removes invalidated entries.
277292
func (s *searchSession) doSearchIteration() error {
278293
switch {
279-
case s.syncRange.IndexedBlocks.IsEmpty():
280-
// indexer is not ready; fallback to completely unindexed search, do not check valid range
281-
var err error
282-
s.matchRange = s.searchRange
283-
s.matches, err = s.searchInRange(s.searchRange, false)
284-
return err
285-
286294
case s.matchRange.IsEmpty():
287295
// no results yet; try search in entire range
288296
indexedSearchRange := s.searchRange.Intersection(s.syncRange.IndexedBlocks)
289297
var err error
290298
if s.forceUnindexed = indexedSearchRange.IsEmpty(); !s.forceUnindexed {
291299
// indexed search on the intersection of indexed and searched range
292-
s.matchRange = indexedSearchRange
293-
s.matches, err = s.searchInRange(indexedSearchRange, true)
294-
if err != nil {
295-
return err
296-
}
297-
return s.syncMatcher(0) // trim everything that the matcher considers potentially invalid
300+
s.matchRange, s.matches, err = s.searchInRange(indexedSearchRange, true)
301+
return err
298302
} else {
299303
// no intersection of indexed and searched range; unindexed search on the whole searched range
300-
s.matchRange = s.searchRange
301-
s.matches, err = s.searchInRange(s.searchRange, false)
302-
if err != nil {
303-
return err
304-
}
305-
return s.syncMatcher(math.MaxUint64) // unindexed search is not affected by the tail of valid range
304+
s.matchRange, s.matches, err = s.searchInRange(s.searchRange, false)
305+
return err
306306
}
307307

308308
case !s.matchRange.IsEmpty() && s.matchRange.First() > s.searchRange.First():
309309
// we have results but tail section is missing; do unindexed search for
310310
// the tail part but still allow indexed search for missing head section
311311
tailRange := common.NewRange(s.searchRange.First(), s.matchRange.First()-s.searchRange.First())
312-
tailMatches, err := s.searchInRange(tailRange, false)
312+
_, tailMatches, err := s.searchInRange(tailRange, false)
313313
if err != nil {
314314
return err
315315
}
316316
s.matches = append(tailMatches, s.matches...)
317317
s.matchRange = tailRange.Union(s.matchRange)
318-
return s.syncMatcher(math.MaxUint64) // unindexed search is not affected by the tail of valid range
318+
return nil
319319

320320
case !s.matchRange.IsEmpty() && s.matchRange.First() == s.searchRange.First() && s.searchRange.AfterLast() > s.matchRange.AfterLast():
321321
// we have results but head section is missing
@@ -329,17 +329,15 @@ func (s *searchSession) doSearchIteration() error {
329329
s.forceUnindexed = true
330330
}
331331
}
332-
headMatches, err := s.searchInRange(headRange, !s.forceUnindexed)
333-
if err != nil {
332+
headMatchRange, headMatches, err := s.searchInRange(headRange, !s.forceUnindexed)
333+
if headMatchRange.First() != s.matchRange.AfterLast() {
334+
// improbable corner case, first part of new head range invalidated by tail unindexing
335+
s.matches, s.matchRange = headMatches, headMatchRange
334336
return err
335337
}
336338
s.matches = append(s.matches, headMatches...)
337-
s.matchRange = s.matchRange.Union(headRange)
338-
if s.forceUnindexed {
339-
return s.syncMatcher(math.MaxUint64) // unindexed search is not affected by the tail of valid range
340-
} else {
341-
return s.syncMatcher(headRange.First()) // trim if the tail of latest head search results might be invalid
342-
}
339+
s.matchRange = s.matchRange.Union(headMatchRange)
340+
return err
343341

344342
default:
345343
panic("invalid search session state")
@@ -349,7 +347,7 @@ func (s *searchSession) doSearchIteration() error {
349347
func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([]*types.Log, error) {
350348
if f.rangeLogsTestHook != nil {
351349
defer func() {
352-
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestDone, 0, 0}
350+
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestDone, common.Range[uint64]{}}
353351
close(f.rangeLogsTestHook)
354352
}()
355353
}
@@ -368,6 +366,16 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([
368366
if err := session.doSearchIteration(); err != nil {
369367
return session.matches, err
370368
}
369+
if f.rangeLogsTestHook != nil {
370+
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestResults, session.matchRange}
371+
}
372+
mr := session.matchRange
373+
if err := session.updateChainView(); err != nil {
374+
return session.matches, err
375+
}
376+
if f.rangeLogsTestHook != nil && session.matchRange != mr {
377+
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestReorg, session.matchRange}
378+
}
371379
}
372380
return session.matches, nil
373381
}
@@ -382,7 +390,7 @@ func (f *Filter) indexedLogs(ctx context.Context, mb filtermaps.MatcherBackend,
382390

383391
// unindexedLogs returns the logs matching the filter criteria based on raw block
384392
// iteration and bloom matching.
385-
func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) {
393+
func (f *Filter) unindexedLogs(ctx context.Context, chainView *filtermaps.ChainView, begin, end uint64) ([]*types.Log, error) {
386394
start := time.Now()
387395
log.Debug("Performing unindexed log search", "begin", begin, "end", end)
388396
var matches []*types.Log
@@ -392,9 +400,14 @@ func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types
392400
return matches, ctx.Err()
393401
default:
394402
}
395-
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(blockNumber))
396-
if header == nil || err != nil {
397-
return matches, err
403+
if blockNumber > chainView.HeadNumber() {
404+
// check here so that we can return matches up until head along with
405+
// the error
406+
return matches, errInvalidBlockRange
407+
}
408+
header := chainView.Header(blockNumber)
409+
if header == nil {
410+
return matches, errors.New("header not found")
398411
}
399412
found, err := f.blockLogs(ctx, header)
400413
if err != nil {

eth/filters/filter_system.go

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ type Backend interface {
7171
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
7272
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
7373

74+
CurrentView() *filtermaps.ChainView
7475
NewMatcherBackend() filtermaps.MatcherBackend
7576
}
7677

0 commit comments

Comments
 (0)