Skip to content

introduce concurrent map for managing block hash to block node index #1405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions collections/concurrent_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ func NewConcurrentMap[Key comparable, Value any]() *ConcurrentMap[Key, Value] {
}
}

func NewConcurrentMapFromMap[Key comparable, Value any](input map[Key]Value) *ConcurrentMap[Key, Value] {
return &ConcurrentMap[Key, Value]{
m: input,
}
}

func (cm *ConcurrentMap[Key, Value]) Set(key Key, val Value) {
cm.mtx.Lock()
defer cm.mtx.Unlock()
Expand Down Expand Up @@ -78,3 +84,12 @@ func (cm *ConcurrentMap[Key, Value]) Count() int {

return len(cm.m)
}

func (cm *ConcurrentMap[Key, Value]) Iterate(fn func(Key, Value)) {
cm.mtx.RLock()
defer cm.mtx.RUnlock()

for key, val := range cm.m {
fn(key, val)
}
}
5 changes: 5 additions & 0 deletions lib/block_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ func setupTestDeSoEncoder(t *testing.T) {
{
for ii := 0; ii < testDeSoEncoderRetries; ii++ {
newVersionByte := encoder.GetVersionByte(blockHeight)
// If the version byte changes, we can't compare the encoding as we know that a
// fork height was changed underneath us.
if newVersionByte != versionByte {
continue
}
reEncodingBytes := encodeToBytes(blockHeight, encoder, skipMetadata...)
if !bytes.Equal(encodingBytes, reEncodingBytes) {
t.Fatalf("EncodeToBytes: Found non-deterministic encoding for a DeSoEncoder. Attempted "+
Expand Down
68 changes: 38 additions & 30 deletions lib/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ type Blockchain struct {
//
// An in-memory index of the "tree" of blocks we are currently aware of.
// This index includes forks and side-chains.
blockIndexByHash map[BlockHash]*BlockNode
blockIndexByHash *collections.ConcurrentMap[BlockHash, *BlockNode]
// blockIndexByHeight is an in-memory map of block height to block nodes. This is
// used to quickly find the safe blocks from which the chain can be extended for PoS
blockIndexByHeight map[uint64]map[BlockHash]*BlockNode
Expand Down Expand Up @@ -705,36 +705,39 @@ func getCheckpointBlockInfoFromProviderHelper(provider string) *CheckpointBlockI
}

func (bc *Blockchain) addNewBlockNodeToBlockIndex(blockNode *BlockNode) {
bc.blockIndexByHash[*blockNode.Hash] = blockNode
bc.blockIndexByHash.Set(*blockNode.Hash, blockNode)
if _, exists := bc.blockIndexByHeight[uint64(blockNode.Height)]; !exists {
bc.blockIndexByHeight[uint64(blockNode.Height)] = make(map[BlockHash]*BlockNode)
}
bc.blockIndexByHeight[uint64(blockNode.Height)][*blockNode.Hash] = blockNode
}

func (bc *Blockchain) CopyBlockIndexes() (_blockIndexByHash map[BlockHash]*BlockNode, _blockIndexByHeight map[uint64]map[BlockHash]*BlockNode) {
newBlockIndexByHash := make(map[BlockHash]*BlockNode)
func (bc *Blockchain) CopyBlockIndexes() (
_blockIndexByHash *collections.ConcurrentMap[BlockHash, *BlockNode],
_blockIndexByHeight map[uint64]map[BlockHash]*BlockNode,
) {
newBlockIndexByHash := collections.NewConcurrentMap[BlockHash, *BlockNode]()
newBlockIndexByHeight := make(map[uint64]map[BlockHash]*BlockNode)
for kk, vv := range bc.blockIndexByHash {
newBlockIndexByHash[kk] = vv
bc.blockIndexByHash.Iterate(func(kk BlockHash, vv *BlockNode) {
newBlockIndexByHash.Set(kk, vv)
blockHeight := uint64(vv.Height)
if _, exists := newBlockIndexByHeight[blockHeight]; !exists {
newBlockIndexByHeight[blockHeight] = make(map[BlockHash]*BlockNode)
}
newBlockIndexByHeight[blockHeight][kk] = vv
}
})
return newBlockIndexByHash, newBlockIndexByHeight
}

func (bc *Blockchain) constructBlockIndexByHeight() map[uint64]map[BlockHash]*BlockNode {
newBlockIndex := make(map[uint64]map[BlockHash]*BlockNode)
for _, blockNode := range bc.blockIndexByHash {
bc.blockIndexByHash.Iterate(func(_ BlockHash, blockNode *BlockNode) {
blockHeight := uint64(blockNode.Height)
if _, exists := newBlockIndex[blockHeight]; !exists {
newBlockIndex[blockHeight] = make(map[BlockHash]*BlockNode)
}
newBlockIndex[blockHeight][*blockNode.Hash] = blockNode
}
})
return newBlockIndex
}

Expand Down Expand Up @@ -852,14 +855,14 @@ func (bc *Blockchain) _initChain() error {
// nodes pointing to valid parent nodes.
{
// Find the tip node with the best node hash.
tipNode := bc.blockIndexByHash[*bestBlockHash]
if tipNode == nil {
tipNode, exists := bc.blockIndexByHash.Get(*bestBlockHash)
if !exists {
return fmt.Errorf("_initChain(block): Best hash (%#v) not found in block index", bestBlockHash)
}

// Walk back from the best node to the genesis block and store them all
// in bestChain.
bc.bestChain, err = GetBestChain(tipNode, bc.blockIndexByHash)
bc.bestChain, err = GetBestChain(tipNode)
if err != nil {
return errors.Wrapf(err, "_initChain(block): Problem reading best chain from db")
}
Expand All @@ -871,14 +874,14 @@ func (bc *Blockchain) _initChain() error {
// TODO: This code is a bit repetitive but this seemed clearer than factoring it out.
{
// Find the tip node with the best node hash.
tipNode := bc.blockIndexByHash[*bestHeaderHash]
if tipNode == nil {
tipNode, exists := bc.blockIndexByHash.Get(*bestHeaderHash)
if !exists {
return fmt.Errorf("_initChain(header): Best hash (%#v) not found in block index", bestHeaderHash)
}

// Walk back from the best node to the genesis block and store them all
// in bestChain.
bc.bestHeaderChain, err = GetBestChain(tipNode, bc.blockIndexByHash)
bc.bestHeaderChain, err = GetBestChain(tipNode)
if err != nil {
return errors.Wrapf(err, "_initChain(header): Problem reading best chain from db")
}
Expand Down Expand Up @@ -989,7 +992,7 @@ func NewBlockchain(
eventManager: eventManager,
archivalMode: archivalMode,

blockIndexByHash: make(map[BlockHash]*BlockNode),
blockIndexByHash: collections.NewConcurrentMap[BlockHash, *BlockNode](),
blockIndexByHeight: make(map[uint64]map[BlockHash]*BlockNode),
bestChainMap: make(map[BlockHash]*BlockNode),

Expand Down Expand Up @@ -1062,12 +1065,12 @@ func fastLog2Floor(n uint32) uint8 {
//
// This function MUST be called with the chain state lock held (for reads).
func locateInventory(locator []*BlockHash, stopHash *BlockHash, maxEntries uint32,
blockIndex map[BlockHash]*BlockNode, bestChainList []*BlockNode,
blockIndex *collections.ConcurrentMap[BlockHash, *BlockNode], bestChainList []*BlockNode,
bestChainMap map[BlockHash]*BlockNode) (*BlockNode, uint32) {

// There are no block locators so a specific block is being requested
// as identified by the stop hash.
stopNode, stopNodeExists := blockIndex[*stopHash]
stopNode, stopNodeExists := blockIndex.Get(*stopHash)
if len(locator) == 0 {
if !stopNodeExists {
// No blocks with the stop hash were found so there is
Expand Down Expand Up @@ -1123,7 +1126,7 @@ func locateInventory(locator []*BlockHash, stopHash *BlockHash, maxEntries uint3
//
// This function MUST be called with the ChainLock held (for reads).
func locateHeaders(locator []*BlockHash, stopHash *BlockHash, maxHeaders uint32,
blockIndex map[BlockHash]*BlockNode, bestChainList []*BlockNode,
blockIndex *collections.ConcurrentMap[BlockHash, *BlockNode], bestChainList []*BlockNode,
bestChainMap map[BlockHash]*BlockNode) []*MsgDeSoHeader {

// Find the node after the first known block in the locator and the
Expand Down Expand Up @@ -1253,7 +1256,7 @@ func (bc *Blockchain) LatestLocator(tip *BlockNode) []*BlockHash {
}

func (bc *Blockchain) HeaderLocatorWithNodeHash(blockHash *BlockHash) ([]*BlockHash, error) {
node, exists := bc.blockIndexByHash[*blockHash]
node, exists := bc.blockIndexByHash.Get(*blockHash)
if !exists {
return nil, fmt.Errorf("Blockchain.HeaderLocatorWithNodeHash: Node for hash %v is not in our blockIndexByHash", blockHash)
}
Expand Down Expand Up @@ -1334,7 +1337,7 @@ func (bc *Blockchain) GetBlockNodesToFetch(
}

func (bc *Blockchain) HasHeader(headerHash *BlockHash) bool {
_, exists := bc.blockIndexByHash[*headerHash]
_, exists := bc.blockIndexByHash.Get(*headerHash)
return exists
}

Expand All @@ -1347,7 +1350,7 @@ func (bc *Blockchain) HeaderAtHeight(blockHeight uint32) *BlockNode {
}

func (bc *Blockchain) HasBlock(blockHash *BlockHash) bool {
node, nodeExists := bc.blockIndexByHash[*blockHash]
node, nodeExists := bc.blockIndexByHash.Get(*blockHash)
if !nodeExists {
glog.V(2).Infof("Blockchain.HasBlock: Node with hash %v does not exist in node index", blockHash)
return false
Expand All @@ -1366,7 +1369,7 @@ func (bc *Blockchain) HasBlockInBlockIndex(blockHash *BlockHash) bool {
bc.ChainLock.RLock()
defer bc.ChainLock.RUnlock()

_, exists := bc.blockIndexByHash[*blockHash]
_, exists := bc.blockIndexByHash.Get(*blockHash)
return exists
}

Expand All @@ -1376,7 +1379,7 @@ func (bc *Blockchain) GetBlockHeaderFromIndex(blockHash *BlockHash) *MsgDeSoHead
bc.ChainLock.RLock()
defer bc.ChainLock.RUnlock()

block, blockExists := bc.blockIndexByHash[*blockHash]
block, blockExists := bc.blockIndexByHash.Get(*blockHash)
if !blockExists {
return nil
}
Expand Down Expand Up @@ -1684,7 +1687,12 @@ func (bc *Blockchain) SetBestChain(bestChain []*BlockNode) {
bc.bestChain = bestChain
}

func (bc *Blockchain) SetBestChainMap(bestChain []*BlockNode, bestChainMap map[BlockHash]*BlockNode, blockIndexByHash map[BlockHash]*BlockNode, blockIndexByHeight map[uint64]map[BlockHash]*BlockNode) {
func (bc *Blockchain) SetBestChainMap(
bestChain []*BlockNode,
bestChainMap map[BlockHash]*BlockNode,
blockIndexByHash *collections.ConcurrentMap[BlockHash, *BlockNode],
blockIndexByHeight map[uint64]map[BlockHash]*BlockNode,
) {
bc.bestChain = bestChain
bc.bestChainMap = bestChainMap
bc.blockIndexByHash = blockIndexByHash
Expand Down Expand Up @@ -2017,7 +2025,7 @@ func (bc *Blockchain) processHeaderPoW(blockHeader *MsgDeSoHeader, headerHash *B
// index. If it does, then return an error. We should generally
// expect that processHeaderPoW will only be called on headers we
// haven't seen before.
_, nodeExists := bc.blockIndexByHash[*headerHash]
_, nodeExists := bc.blockIndexByHash.Get(*headerHash)
if nodeExists {
return false, false, HeaderErrorDuplicateHeader
}
Expand All @@ -2042,7 +2050,7 @@ func (bc *Blockchain) processHeaderPoW(blockHeader *MsgDeSoHeader, headerHash *B
if blockHeader.PrevBlockHash == nil {
return false, false, HeaderErrorNilPrevHash
}
parentNode, parentNodeExists := bc.blockIndexByHash[*blockHeader.PrevBlockHash]
parentNode, parentNodeExists := bc.blockIndexByHash.Get(*blockHeader.PrevBlockHash)
if !parentNodeExists {
// This block is an orphan if its parent doesn't exist and we don't
// process unconnectedTxns.
Expand Down Expand Up @@ -2304,7 +2312,7 @@ func (bc *Blockchain) processBlockPoW(desoBlock *MsgDeSoBlock, verifySignatures
bc.timer.Start("Blockchain.ProcessBlock: BlockNode")

// See if a node for the block exists in our node index.
nodeToValidate, nodeExists := bc.blockIndexByHash[*blockHash]
nodeToValidate, nodeExists := bc.blockIndexByHash.Get(*blockHash)
// If no node exists for this block at all, then process the header
// first before we do anything. This should create a node and set
// the header validation status for it.
Expand All @@ -2325,7 +2333,7 @@ func (bc *Blockchain) processBlockPoW(desoBlock *MsgDeSoBlock, verifySignatures

// Reset the pointers after having presumably added the header to the
// block index.
nodeToValidate, nodeExists = bc.blockIndexByHash[*blockHash]
nodeToValidate, nodeExists = bc.blockIndexByHash.Get(*blockHash)
}
// At this point if the node still doesn't exist or if the header's validation
// failed then we should return an error for the block. Note that at this point
Expand All @@ -2344,7 +2352,7 @@ func (bc *Blockchain) processBlockPoW(desoBlock *MsgDeSoBlock, verifySignatures
// In this case go ahead and return early. If its parents are truly legitimate then we
// should re-request it and its parents from a node and reprocess it
// once it is no longer an orphan.
parentNode, parentNodeExists := bc.blockIndexByHash[*blockHeader.PrevBlockHash]
parentNode, parentNodeExists := bc.blockIndexByHash.Get(*blockHeader.PrevBlockHash)
if !parentNodeExists || (parentNode.Status&StatusBlockProcessed) == 0 {
return false, true, nil
}
Expand Down
14 changes: 9 additions & 5 deletions lib/db_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/deso-protocol/core/collections"
"io"
"log"
"math"
Expand Down Expand Up @@ -5470,8 +5471,11 @@ func GetBlockTipHeight(handle *badger.DB, bitcoinNodes bool) (uint64, error) {
return blockHeight, err
}

func GetBlockIndex(handle *badger.DB, bitcoinNodes bool, params *DeSoParams) (map[BlockHash]*BlockNode, error) {
blockIndex := make(map[BlockHash]*BlockNode)
func GetBlockIndex(handle *badger.DB, bitcoinNodes bool, params *DeSoParams) (
*collections.ConcurrentMap[BlockHash, *BlockNode],
error,
) {
blockIndex := collections.NewConcurrentMap[BlockHash, *BlockNode]()

prefix := _heightHashToNodeIndexPrefix(bitcoinNodes)

Expand Down Expand Up @@ -5503,7 +5507,7 @@ func GetBlockIndex(handle *badger.DB, bitcoinNodes bool, params *DeSoParams) (ma

// If we got here it means we read a blockNode successfully. Store it
// into our node index.
blockIndex[*blockNode.Hash] = blockNode
blockIndex.Set(*blockNode.Hash, blockNode)

// Find the parent of this block, which should already have been read
// in and connect it. Skip the genesis block, which has height 0. Also
Expand All @@ -5517,7 +5521,7 @@ func GetBlockIndex(handle *badger.DB, bitcoinNodes bool, params *DeSoParams) (ma
if blockNode.Height == 0 || (*blockNode.Header.PrevBlockHash == BlockHash{}) {
continue
}
if parent, ok := blockIndex[*blockNode.Header.PrevBlockHash]; ok {
if parent, ok := blockIndex.Get(*blockNode.Header.PrevBlockHash); ok {
// We found the parent node so connect it.
blockNode.Parent = parent
} else {
Expand All @@ -5540,7 +5544,7 @@ func GetBlockIndex(handle *badger.DB, bitcoinNodes bool, params *DeSoParams) (ma
return blockIndex, nil
}

func GetBestChain(tipNode *BlockNode, blockIndex map[BlockHash]*BlockNode) ([]*BlockNode, error) {
func GetBestChain(tipNode *BlockNode) ([]*BlockNode, error) {
reversedBestChain := []*BlockNode{}
for tipNode != nil {
if (tipNode.Status&StatusBlockValidated) == 0 &&
Expand Down
18 changes: 9 additions & 9 deletions lib/db_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,17 +174,17 @@ func TestBlockNodePutGet(t *testing.T) {
blockIndex, err := GetBlockIndex(db, false /*bitcoinNodes*/, &DeSoTestnetParams)
require.NoError(err)

require.Len(blockIndex, 4)
b1Ret, exists := blockIndex[*b1.Hash]
require.Equal(blockIndex.Count(), 4)
b1Ret, exists := blockIndex.Get(*b1.Hash)
require.True(exists, "b1 not found")

b2Ret, exists := blockIndex[*b2.Hash]
b2Ret, exists := blockIndex.Get(*b2.Hash)
require.True(exists, "b2 not found")

b3Ret, exists := blockIndex[*b3.Hash]
b3Ret, exists := blockIndex.Get(*b3.Hash)
require.True(exists, "b3 not found")

b4Ret, exists := blockIndex[*b4.Hash]
b4Ret, exists := blockIndex.Get(*b4.Hash)
require.True(exists, "b4 not found")

// Make sure the hashes all line up.
Expand All @@ -201,7 +201,7 @@ func TestBlockNodePutGet(t *testing.T) {

// Check that getting the best chain works.
{
bestChain, err := GetBestChain(b3Ret, blockIndex)
bestChain, err := GetBestChain(b3Ret)
require.NoError(err)
require.Len(bestChain, 3)
require.Equal(b1Ret, bestChain[0])
Expand All @@ -226,15 +226,15 @@ func TestInitDbWithGenesisBlock(t *testing.T) {
// Check the block index.
blockIndex, err := GetBlockIndex(db, false /*bitcoinNodes*/, &DeSoTestnetParams)
require.NoError(err)
require.Len(blockIndex, 1)
require.Equal(blockIndex.Count(), 1)
genesisHash := *MustDecodeHexBlockHash(DeSoTestnetParams.GenesisBlockHashHex)
genesis, exists := blockIndex[genesisHash]
genesis, exists := blockIndex.Get(genesisHash)
require.True(exists, "genesis block not found in index")
require.NotNil(genesis)
require.Equal(&genesisHash, genesis.Hash)

// Check the bestChain.
bestChain, err := GetBestChain(genesis, blockIndex)
bestChain, err := GetBestChain(genesis)
require.NoError(err)
require.Len(bestChain, 1)
require.Equal(genesis, bestChain[0])
Expand Down
Loading
Loading