Skip to content

core/filtermaps: hashdb safe delete range #31525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 135 additions & 75 deletions core/filtermaps/filtermaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/leveldb"
"github.com/ethereum/go-ethereum/log"
)

Expand Down Expand Up @@ -59,6 +58,7 @@ type FilterMaps struct {
closeCh chan struct{}
closeWg sync.WaitGroup
history uint64
hashScheme bool // use hashdb-safe delete range method
exportFileName string
Params

Expand All @@ -67,10 +67,11 @@ type FilterMaps struct {
// fields written by the indexer and read by matcher backend. Indexer can
// read them without a lock and write them under indexLock write lock.
// Matcher backend can read them under indexLock read lock.
indexLock sync.RWMutex
indexedRange filterMapsRange
indexedView *ChainView // always consistent with the log index
hasTempRange bool
indexLock sync.RWMutex
indexedRange filterMapsRange
cleanedEpochsBefore uint32 // all unindexed data cleaned before this point
indexedView *ChainView // always consistent with the log index
hasTempRange bool

// also accessed by indexer and matcher backend but no locking needed.
filterMapCache *lru.Cache[uint32, filterMap]
Expand Down Expand Up @@ -180,6 +181,10 @@ type Config struct {
// This option enables the checkpoint JSON file generator.
// If set, the given file will be updated with checkpoint information.
ExportFileName string

// expect trie nodes of hash based state scheme in the filtermaps key range;
// use safe iterator based implementation of DeleteRange that skips them
HashScheme bool
}

// NewFilterMaps creates a new FilterMaps and starts the indexer.
Expand All @@ -197,6 +202,7 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
blockProcessingCh: make(chan bool, 1),
history: config.History,
disabled: config.Disabled,
hashScheme: config.HashScheme,
disabledCh: make(chan struct{}),
exportFileName: config.ExportFileName,
Params: params,
Expand All @@ -208,15 +214,17 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
maps: common.NewRange(rs.MapsFirst, rs.MapsAfterLast-rs.MapsFirst),
tailPartialEpoch: rs.TailPartialEpoch,
},
historyCutoff: historyCutoff,
finalBlock: finalBlock,
matcherSyncCh: make(chan *FilterMapsMatcherBackend),
matchers: make(map[*FilterMapsMatcherBackend]struct{}),
filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps),
lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks),
lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers),
baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows),
renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots),
// deleting last unindexed epoch might have been interrupted by shutdown
cleanedEpochsBefore: max(rs.MapsFirst>>params.logMapsPerEpoch, 1) - 1,
historyCutoff: historyCutoff,
finalBlock: finalBlock,
matcherSyncCh: make(chan *FilterMapsMatcherBackend),
matchers: make(map[*FilterMapsMatcherBackend]struct{}),
filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps),
lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks),
lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers),
baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows),
renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots),
}

// Set initial indexer target.
Expand Down Expand Up @@ -301,14 +309,24 @@ func (f *FilterMaps) reset() {
// deleting the range first ensures that resetDb will be called again at next
// startup and any leftover data will be removed even if it cannot finish now.
rawdb.DeleteFilterMapsRange(f.db)
f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database")
f.safeDeleteWithLogs(rawdb.DeleteFilterMapsDb, "Resetting log index database", f.isShuttingDown)
}

// isShuttingDown returns true if FilterMaps is shutting down.
func (f *FilterMaps) isShuttingDown() bool {
select {
case <-f.closeCh:
return true
default:
return false
}
}

// init initializes an empty log index according to the current targetView.
func (f *FilterMaps) init() error {
// ensure that there is no remaining data in the filter maps key range
if !f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database") {
return errors.New("could not reset log index database")
if err := f.safeDeleteWithLogs(rawdb.DeleteFilterMapsDb, "Resetting log index database", f.isShuttingDown); err != nil {
return err
}

f.indexLock.Lock()
Expand Down Expand Up @@ -358,38 +376,37 @@ func (f *FilterMaps) init() error {

// removeBloomBits removes old bloom bits data from the database.
func (f *FilterMaps) removeBloomBits() {
f.safeDeleteRange(rawdb.DeleteBloomBitsDb, "Removing old bloom bits database")
f.safeDeleteWithLogs(rawdb.DeleteBloomBitsDb, "Removing old bloom bits database", f.isShuttingDown)
f.closeWg.Done()
}

// safeDeleteRange calls the specified database range deleter function
// repeatedly as long as it returns leveldb.ErrTooManyKeys.
// This wrapper is necessary because of the leveldb fallback implementation
// of DeleteRange.
func (f *FilterMaps) safeDeleteRange(removeFn func(ethdb.KeyValueRangeDeleter) error, action string) bool {
start := time.Now()
var retry bool
for {
err := removeFn(f.db)
if err == nil {
if retry {
log.Info(action+" finished", "elapsed", time.Since(start))
}
return true
}
if err != leveldb.ErrTooManyKeys {
log.Error(action+" failed", "error", err)
return false
// safeDeleteWithLogs is a wrapper for a function that performs a safe range
// delete operation using rawdb.SafeDeleteRange. It emits log messages if the
// process takes long enough to call the stop callback.
func (f *FilterMaps) safeDeleteWithLogs(deleteFn func(db ethdb.KeyValueStore, hashScheme bool, stopCb func(bool) bool) error, action string, stopCb func() bool) error {
var (
start = time.Now()
logPrinted bool
lastLogPrinted = start
)
switch err := deleteFn(f.db, f.hashScheme, func(deleted bool) bool {
if deleted && !logPrinted || time.Since(lastLogPrinted) > time.Second*10 {
log.Info(action+" in progress...", "elapsed", common.PrettyDuration(time.Since(start)))
logPrinted, lastLogPrinted = true, time.Now()
}
select {
case <-f.closeCh:
return false
default:
}
if !retry {
log.Info(action+" in progress...", "elapsed", time.Since(start))
retry = true
return stopCb()
}); {
case err == nil:
if logPrinted {
log.Info(action+" finished", "elapsed", common.PrettyDuration(time.Since(start)))
}
return nil
case errors.Is(err, rawdb.ErrDeleteRangeInterrupted):
log.Warn(action+" interrupted", "elapsed", common.PrettyDuration(time.Since(start)))
return err
default:
log.Error(action+" failed", "error", err)
return err
}
}

Expand Down Expand Up @@ -658,54 +675,97 @@ func (f *FilterMaps) deleteLastBlockOfMap(batch ethdb.Batch, mapIndex uint32) {
rawdb.DeleteFilterMapLastBlock(batch, mapIndex)
}

// deleteTailEpoch deletes index data from the earliest, either fully or partially
// indexed epoch. The last block pointer for the last map of the epoch and the
// corresponding block log value pointer are retained as these are always assumed
// to be available for each epoch.
func (f *FilterMaps) deleteTailEpoch(epoch uint32) error {
// deleteTailEpoch deletes index data from the specified epoch. The last block
// pointer for the last map of the epoch and the corresponding block log value
// pointer are retained as these are always assumed to be available for each
// epoch as boundary markers.
// The function returns true if all index data related to the epoch (except for
// the boundary markers) has been fully removed.
func (f *FilterMaps) deleteTailEpoch(epoch uint32) (bool, error) {
f.indexLock.Lock()
defer f.indexLock.Unlock()

// determine epoch boundaries
firstMap := epoch << f.logMapsPerEpoch
lastBlock, _, err := f.getLastBlockOfMap(firstMap + f.mapsPerEpoch - 1)
if err != nil {
return fmt.Errorf("failed to retrieve last block of deleted epoch %d: %v", epoch, err)
return false, fmt.Errorf("failed to retrieve last block of deleted epoch %d: %v", epoch, err)
}
var firstBlock uint64
if epoch > 0 {
firstBlock, _, err = f.getLastBlockOfMap(firstMap - 1)
if err != nil {
return fmt.Errorf("failed to retrieve last block before deleted epoch %d: %v", epoch, err)
return false, fmt.Errorf("failed to retrieve last block before deleted epoch %d: %v", epoch, err)
}
firstBlock++
}
fmr := f.indexedRange
if f.indexedRange.maps.First() == firstMap &&
f.indexedRange.maps.AfterLast() > firstMap+f.mapsPerEpoch &&
f.indexedRange.tailPartialEpoch == 0 {
fmr.maps.SetFirst(firstMap + f.mapsPerEpoch)
fmr.blocks.SetFirst(lastBlock + 1)
} else if f.indexedRange.maps.First() == firstMap+f.mapsPerEpoch {
// update rendered range if necessary
var (
fmr = f.indexedRange
firstEpoch = f.indexedRange.maps.First() >> f.logMapsPerEpoch
afterLastEpoch = (f.indexedRange.maps.AfterLast() + f.mapsPerEpoch - 1) >> f.logMapsPerEpoch
)
if f.indexedRange.tailPartialEpoch != 0 && firstEpoch > 0 {
firstEpoch--
}
switch {
case epoch < firstEpoch:
// cleanup of already unindexed epoch; range not affected
case epoch == firstEpoch && epoch+1 < afterLastEpoch:
// first fully or partially rendered epoch and there is at least one
// rendered map in the next epoch; remove from indexed range
fmr.tailPartialEpoch = 0
fmr.maps.SetFirst((epoch + 1) << f.logMapsPerEpoch)
fmr.blocks.SetFirst(lastBlock + 1)
f.setRange(f.db, f.indexedView, fmr, false)
default:
// cannot be cleaned or unindexed; return with error
return false, errors.New("invalid tail epoch number")
}
// remove index data
if err := f.safeDeleteWithLogs(func(db ethdb.KeyValueStore, hashScheme bool, stopCb func(bool) bool) error {
first := f.mapRowIndex(firstMap, 0)
count := f.mapRowIndex(firstMap+f.mapsPerEpoch, 0) - first
if err := rawdb.DeleteFilterMapRows(f.db, common.NewRange(first, count), hashScheme, stopCb); err != nil {
return err
}
for mapIndex := firstMap; mapIndex < firstMap+f.mapsPerEpoch; mapIndex++ {
f.filterMapCache.Remove(mapIndex)
}
delMapRange := common.NewRange(firstMap, f.mapsPerEpoch-1) // keep last entry
if err := rawdb.DeleteFilterMapLastBlocks(f.db, delMapRange, hashScheme, stopCb); err != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the counter is f.mapsPerEpoch-1? Shouldn't it be f.mapsPerEpoch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment says // keep last enrty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is intended, it might be easier to see if the delete range is assigned into a variable on its own line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is intentional, last entry of each epoch should always be available. It's even added for past epochs based on the checkpoints.

}
for mapIndex := firstMap; mapIndex < firstMap+f.mapsPerEpoch-1; mapIndex++ {
f.lastBlockCache.Remove(mapIndex)
}
delBlockRange := common.NewRange(firstBlock, lastBlock-firstBlock) // keep last entry
if err := rawdb.DeleteBlockLvPointers(f.db, delBlockRange, hashScheme, stopCb); err != nil {
return err
}
for blockNumber := firstBlock; blockNumber < lastBlock; blockNumber++ {
f.lvPointerCache.Remove(blockNumber)
}
return nil
}, fmt.Sprintf("Deleting tail epoch #%d", epoch), func() bool {
f.processEvents()
return f.stop || !f.targetHeadIndexed()
}); err == nil {
// everything removed; mark as cleaned and report success
if f.cleanedEpochsBefore == epoch {
f.cleanedEpochsBefore = epoch + 1
}
return true, nil
} else {
return errors.New("invalid tail epoch number")
}
f.setRange(f.db, f.indexedView, fmr, false)
first := f.mapRowIndex(firstMap, 0)
count := f.mapRowIndex(firstMap+f.mapsPerEpoch, 0) - first
rawdb.DeleteFilterMapRows(f.db, common.NewRange(first, count))
for mapIndex := firstMap; mapIndex < firstMap+f.mapsPerEpoch; mapIndex++ {
f.filterMapCache.Remove(mapIndex)
}
rawdb.DeleteFilterMapLastBlocks(f.db, common.NewRange(firstMap, f.mapsPerEpoch-1)) // keep last enrty
for mapIndex := firstMap; mapIndex < firstMap+f.mapsPerEpoch-1; mapIndex++ {
f.lastBlockCache.Remove(mapIndex)
}
rawdb.DeleteBlockLvPointers(f.db, common.NewRange(firstBlock, lastBlock-firstBlock)) // keep last enrty
for blockNumber := firstBlock; blockNumber < lastBlock; blockNumber++ {
f.lvPointerCache.Remove(blockNumber)
// more data left in epoch range; mark as dirty and report unfinished
if f.cleanedEpochsBefore > epoch {
f.cleanedEpochsBefore = epoch
}
if errors.Is(err, rawdb.ErrDeleteRangeInterrupted) {
return false, nil
}
return false, err
}
return nil
}

// exportCheckpoints exports epoch checkpoints in the format used by checkpoints.go.
Expand Down
32 changes: 17 additions & 15 deletions core/filtermaps/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,17 @@ func (f *FilterMaps) indexerLoop() {
}
f.lastFinal = f.finalBlock
}
if done, err := f.tryIndexTail(); err != nil {
f.disableForError("tail rendering", err)
// always attempt unindexing before indexing the tail in order to
// ensure that a potentially dirty previously unindexed epoch is
// always cleaned up before any new maps are rendered.
if done, err := f.tryUnindexTail(); err != nil {
f.disableForError("tail unindexing", err)
return
} else if !done {
continue
}
if done, err := f.tryUnindexTail(); err != nil {
f.disableForError("tail unindexing", err)
if done, err := f.tryIndexTail(); err != nil {
f.disableForError("tail rendering", err)
return
} else if !done {
continue
Expand Down Expand Up @@ -349,25 +352,24 @@ func (f *FilterMaps) tryIndexTail() (bool, error) {
// Note that unindexing is very quick as it only removes continuous ranges of
// data from the database and is also called while running head indexing.
func (f *FilterMaps) tryUnindexTail() (bool, error) {
for {
firstEpoch := (f.indexedRange.maps.First() - f.indexedRange.tailPartialEpoch) >> f.logMapsPerEpoch
if f.needTailEpoch(firstEpoch) {
break
}
f.processEvents()
if f.stop {
return false, nil
}
firstEpoch := f.indexedRange.maps.First() >> f.logMapsPerEpoch
if f.indexedRange.tailPartialEpoch > 0 && firstEpoch > 0 {
firstEpoch--
}
for epoch := min(firstEpoch, f.cleanedEpochsBefore); !f.needTailEpoch(epoch); epoch++ {
if !f.startedTailUnindex {
f.startedTailUnindexAt = time.Now()
f.startedTailUnindex = true
f.ptrTailUnindexMap = f.indexedRange.maps.First() - f.indexedRange.tailPartialEpoch
f.ptrTailUnindexBlock = f.indexedRange.blocks.First() - f.tailPartialBlocks()
}
if err := f.deleteTailEpoch(firstEpoch); err != nil {
log.Error("Log index tail epoch unindexing failed", "error", err)
if done, err := f.deleteTailEpoch(epoch); !done {
return false, err
}
f.processEvents()
if f.stop || !f.targetHeadIndexed() {
return false, nil
}
}
if f.startedTailUnindex && f.indexedRange.hasIndexedBlocks() {
log.Info("Log index tail unindexing finished",
Expand Down
Loading