Skip to content

Streamable Snapshot #2358

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 57 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
397c1fe
Add Bytes and FromBytes in models
jkrvivian Jul 19, 2022
8a48c2a
Init commit for streamable snapshot
jkrvivian Jul 19, 2022
078fefa
Replace old snapshot functions
jkrvivian Jul 19, 2022
d031784
Refactor and fix bugs
jkrvivian Jul 20, 2022
1b14b0e
Modify LoadSnapshot in notarization manager
jkrvivian Jul 20, 2022
afbfec3
Fix delimiter parsing error
jkrvivian Jul 21, 2022
9fa7c8b
Fix wrong type when marshalling
jkrvivian Jul 21, 2022
b2fc237
Fix typo
jkrvivian Jul 21, 2022
3860bd8
Set ID after deserialization
jkrvivian Jul 21, 2022
793815c
Refactor LoadSnapshot
jkrvivian Jul 21, 2022
7f4c3fd
Use CreateStreamableSnapshot in snapshot plugin
jkrvivian Jul 21, 2022
1e55868
Fix bugs and refactor
jkrvivian Jul 22, 2022
2c202d5
Make use of consumer functions
jkrvivian Jul 22, 2022
3e295f3
Merge branch 'develop' into feat/streamable-snapshot
jkrvivian Jul 22, 2022
71d2357
Disable snapshot webapi plugin
jkrvivian Jul 22, 2022
e3fdb86
Simplify Create and Load snapshot functions
jkrvivian Jul 22, 2022
211318c
Update docker-network snapshot
jkrvivian Jul 22, 2022
703fb06
Rename accepted unspent outputWithMetadata ForEach function
jkrvivian Jul 22, 2022
4ba226c
Panic when loading snapshot
jkrvivian Jul 22, 2022
cbe93e4
Fix unit test
jkrvivian Jul 22, 2022
3639358
Fix CodeQL
jkrvivian Jul 22, 2022
eece9dd
Add comments
jkrvivian Jul 22, 2022
97632db
Add and modify ECRecord Bytes() and FromBytes()
jkrvivian Jul 24, 2022
5dd8603
Rename OutputWithMetadata Producer/Consumer Func
jkrvivian Jul 24, 2022
5e661e9
Make use of ECRecord Bytes()
jkrvivian Jul 24, 2022
c3c78a7
Set ID in ECRecord FromBytes
jkrvivian Jul 25, 2022
a313e76
Add snapshot header
jkrvivian Jul 25, 2022
cdccf94
Minor tweaks
jkrvivian Jul 25, 2022
5da736e
Check returned error
jkrvivian Jul 25, 2022
3555720
Move LoadSnapshot to testutils
jkrvivian Jul 26, 2022
60263e0
Remove Bytes() from OutputWithMetadata
jkrvivian Jul 26, 2022
4d470ef
Rename Load and Create snapshot functions
jkrvivian Jul 26, 2022
f733a66
Fix deadlock in SnapshotEpochDiffs
jkrvivian Jul 26, 2022
385be7c
Rename snapshotread/write.go to read/write.go
jkrvivian Jul 26, 2022
b1b0820
Fixes in producerFromChannels
jkrvivian Jul 26, 2022
0d03659
Return OutputWithMetadata up to last confirmed epoch
jkrvivian Jul 26, 2022
35ce17c
Reset ouptutChunkCounter when it reaches the threshold
jkrvivian Jul 26, 2022
295acbf
Remove producerFromChannels
jkrvivian Jul 26, 2022
1d6e302
Fix comment
jkrvivian Jul 26, 2022
b9f74b6
Rename NotarizationConsumer to HeaderConsumer
jkrvivian Jul 26, 2022
e2cbdd0
Check error from LoadEpochDiffs
jkrvivian Jul 26, 2022
e21a931
Rename ForEachAcceptedUnSpentOutputWithMetadata
jkrvivian Jul 26, 2022
6b8b8de
Fix comment
jkrvivian Jul 26, 2022
0556000
Rename OutputsWithMetadata to FullUTXOStates
jkrvivian Jul 26, 2022
68ad8c4
Remove offsets counter from writeFunc
jkrvivian Jul 26, 2022
9199210
Check returned error
jkrvivian Jul 26, 2022
6ca0aa0
Make utxoStates chunk size a const
jkrvivian Jul 26, 2022
f70d87c
Remove non-streamable snapshot methods
jkrvivian Jul 26, 2022
a1f89aa
Unexport internal functions
jkrvivian Jul 26, 2022
85660f8
Minor tweaks
jkrvivian Jul 26, 2022
f8de807
Fix comment
jkrvivian Jul 26, 2022
f0d4eba
Fix bug and add a unit test
jkrvivian Jul 27, 2022
6f5be35
Refactor snapshot unit test
jkrvivian Jul 27, 2022
26b7fc9
Revert the naming of outputsWithMetadata from snapshot structure
jkrvivian Jul 28, 2022
4edab83
Snapshot ledger states from notarization manager
jkrvivian Jul 28, 2022
27b4a7f
Rename LedgerStates to LedgerState
jkrvivian Aug 2, 2022
1f043ae
Remove unused function
jkrvivian Aug 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions packages/core/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func New(options ...Option) (ledger *Ledger) {
return ledger
}

// LoadOutputWithMetadatas loads OutputWithMetadatas from a snapshot file to the storage.
func (l *Ledger) LoadOutputWithMetadatas(outputsWithMetadatas []*OutputWithMetadata) {
// LoadFullUTXOStates loads OutputWithMetadatas from a snapshot file to the storage.
func (l *Ledger) LoadFullUTXOStates(outputsWithMetadatas []*OutputWithMetadata) {
for _, outputWithMetadata := range outputsWithMetadatas {
newOutputMetadata := NewOutputMetadata(outputWithMetadata.ID())
newOutputMetadata.SetAccessManaPledgeID(outputWithMetadata.AccessManaPledgeID())
Expand Down Expand Up @@ -126,12 +126,6 @@ func (l *Ledger) LoadEpochDiffs(header *SnapshotHeader, epochDiffs map[epoch.Ind
return nil
}

// LoadSnapshot loads a snapshot of the Ledger from the given snapshot.
func (l *Ledger) LoadSnapshot(snapshot *Snapshot) {
l.LoadOutputWithMetadatas(snapshot.OutputsWithMetadata)
l.LoadEpochDiffs(snapshot.Header, snapshot.EpochDiffs)
}

// TakeSnapshot returns a snapshot of the Ledger state.
func (l *Ledger) TakeSnapshot() (snapshot *Snapshot) {
snapshot = NewSnapshot([]*OutputWithMetadata{})
Expand All @@ -143,7 +137,7 @@ func (l *Ledger) TakeSnapshot() (snapshot *Snapshot) {

l.Storage.CachedOutput(outputMetadata.ID()).Consume(func(output utxo.Output) {
outputWithMetadata := NewOutputWithMetadata(output.ID(), output, outputMetadata.CreationTime(), outputMetadata.ConsensusManaPledgeID(), outputMetadata.AccessManaPledgeID())
snapshot.OutputsWithMetadata = append(snapshot.OutputsWithMetadata, outputWithMetadata)
snapshot.FullUTXOStates = append(snapshot.FullUTXOStates, outputWithMetadata)
})
})

Expand Down Expand Up @@ -192,7 +186,8 @@ func (l *Ledger) PruneTransaction(txID utxo.TransactionID, pruneFutureCone bool)
l.Storage.pruneTransaction(txID, pruneFutureCone)
}

func (l *Ledger) ForEachAcceptedUnSpentOutputWithMetadata(consumer func(*OutputWithMetadata)) {
// ForEachAcceptedUnspentOutputWithMetadata returns the Accepted unspent OutputWithMetadata before a latest confirmed epoch.
func (l *Ledger) ForEachAcceptedUnspentOutputWithMetadata(consumer func(*OutputWithMetadata)) {
l.Storage.outputMetadataStorage.ForEach(func(key []byte, cachedOutputMetadata *objectstorage.CachedObject[*OutputMetadata]) bool {
cachedOutputMetadata.Consume(func(outputMetadata *OutputMetadata) {
if outputMetadata.IsSpent() || !l.Utils.OutputConfirmationState(outputMetadata.ID()).IsAccepted() {
Expand Down
11 changes: 2 additions & 9 deletions packages/core/ledger/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/iotaledger/hive.go/stringify"
"github.com/iotaledger/hive.go/types/confirmation"

"github.com/iotaledger/goshimmer/packages/node/clock"
"github.com/iotaledger/goshimmer/packages/core/ledger/utxo"
"github.com/iotaledger/goshimmer/packages/node/clock"
)

// region TransactionMetadata //////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -575,7 +575,7 @@ func (e *EpochDiff) Spent() []*OutputWithMetadata {

// Created returns the outputs created for this epoch diff.
func (e *EpochDiff) Created() []*OutputWithMetadata {
return e.M.Spent
return e.M.Created
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -635,13 +635,6 @@ func (o *OutputWithMetadata) FromBytes(data []byte) error {
return err
}

// Bytes marshals an OutputWithMetadata to a sequence of bytes.
func (o *OutputWithMetadata) Bytes() (bytes []byte, err error) {
bytes, err = o.Storable.Bytes()

return
}

// Output returns the Output field.
func (o *OutputWithMetadata) Output() (output utxo.Output) {
o.RLock()
Expand Down
14 changes: 7 additions & 7 deletions packages/core/ledger/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

// Snapshot represents a snapshot of the current ledger state.
type Snapshot struct {
Header *SnapshotHeader `serix:"0"`
OutputsWithMetadata []*OutputWithMetadata `serix:"1,lengthPrefixType=uint32"`
EpochDiffs map[epoch.Index]*EpochDiff `serix:"2,lengthPrefixType=uint32"`
Header *SnapshotHeader `serix:"0"`
FullUTXOStates []*OutputWithMetadata `serix:"1,lengthPrefixType=uint32"`
EpochDiffs map[epoch.Index]*EpochDiff `serix:"2,lengthPrefixType=uint32"`
}

// SnapshotHeader represents the info of a snapshot.
Expand All @@ -24,23 +24,23 @@ type SnapshotHeader struct {
// NewSnapshot creates a new Snapshot from the given details.
func NewSnapshot(outputsWithMetadata []*OutputWithMetadata) (new *Snapshot) {
return &Snapshot{
Header: &SnapshotHeader{OutputWithMetadataCount: uint64(len(outputsWithMetadata))},
OutputsWithMetadata: outputsWithMetadata,
Header: &SnapshotHeader{OutputWithMetadataCount: uint64(len(outputsWithMetadata))},
FullUTXOStates: outputsWithMetadata,
}
}

// String returns a human-readable version of the Snapshot.
func (s *Snapshot) String() (humanReadable string) {
structBuilder := stringify.StructBuilder("Snapshot")
structBuilder.AddField(stringify.StructField("SnapshotHeader", s.Header))
structBuilder.AddField(stringify.StructField("OutputsWithMetadata", s.OutputsWithMetadata))
structBuilder.AddField(stringify.StructField("FullUTXOStates", s.FullUTXOStates))
structBuilder.AddField(stringify.StructField("EpochDiffs", s.EpochDiffs))
return structBuilder.String()
}

// String returns a human-readable version of the Snapshot.
func (h *SnapshotHeader) String() (humanReadable string) {
return stringify.Struct("Snapshot",
return stringify.Struct("SnapshotHeader",
stringify.StructField("OutputWithMetadataCount", h.OutputWithMetadataCount),
stringify.StructField("FullEpochIndex", h.FullEpochIndex),
stringify.StructField("DiffEpochIndex", h.DiffEpochIndex),
Expand Down
24 changes: 9 additions & 15 deletions packages/core/notarization/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func onlyIfBootstrapped[E any](timeManager *tangleold.TimeManager, handler func(
})
}

func (m *Manager) LoadOutputWithMetadatas(outputsWithMetadatas []*ledger.OutputWithMetadata) {
// LoadFullUTXOStates initiates the state and mana trees from a given snapshot.
func (m *Manager) LoadFullUTXOStates(outputsWithMetadatas []*ledger.OutputWithMetadata) {
m.epochCommitmentFactoryMutex.Lock()
defer m.epochCommitmentFactoryMutex.Unlock()

Expand All @@ -129,7 +130,7 @@ func (m *Manager) LoadOutputWithMetadatas(outputsWithMetadatas []*ledger.OutputW
}
}

// LoadEpochDiffs initiates the state and mana trees from a given snapshot.
// LoadEpochDiffs updates the state tree from a given snapshot.
func (m *Manager) LoadEpochDiffs(header *ledger.SnapshotHeader, epochDiffs map[epoch.Index]*ledger.EpochDiff) {
m.epochCommitmentFactoryMutex.Lock()
defer m.epochCommitmentFactoryMutex.Unlock()
Expand Down Expand Up @@ -159,7 +160,7 @@ func (m *Manager) LoadEpochDiffs(header *ledger.SnapshotHeader, epochDiffs map[e
return
}

// LoadEpochDiffs initiates the state and mana trees from a given snapshot.
// LoadECandEIs initiates the ECRecord, latest committable EI, last confirmed EI and acceptance EI from a given snapshot.
func (m *Manager) LoadECandEIs(header *ledger.SnapshotHeader) {
m.epochCommitmentFactoryMutex.Lock()
defer m.epochCommitmentFactoryMutex.Unlock()
Expand All @@ -182,27 +183,20 @@ func (m *Manager) LoadECandEIs(header *ledger.SnapshotHeader) {
m.epochCommitmentFactory.storage.ecRecordStorage.Store(header.LatestECRecord).Release()
}

// LoadSnapshot initiates the state and mana trees from a given snapshot.
func (m *Manager) LoadSnapshot(s *ledger.Snapshot) {
m.LoadOutputWithMetadatas(s.OutputsWithMetadata)
m.LoadEpochDiffs(s.Header, s.EpochDiffs)
m.LoadECandEIs(s.Header)
}

// SnapshotEpochDiffs returns the EpochDiffs when a snapshot is created.
func (m *Manager) SnapshotEpochDiffs() (map[epoch.Index]*ledger.EpochDiff, error) {
m.epochCommitmentFactoryMutex.Lock()
defer m.epochCommitmentFactoryMutex.Unlock()

start, err := m.LatestConfirmedEpochIndex()
if err != nil {
return nil, err
}

ec, err := m.GetLatestEC()
m.epochCommitmentFactoryMutex.Lock()
defer m.epochCommitmentFactoryMutex.Unlock()

end, err := m.epochCommitmentFactory.storage.latestCommittableEpochIndex()
if err != nil {
return nil, err
}
end := ec.EI()

epochDiffsMap := make(map[epoch.Index]*ledger.EpochDiff)
for ei := start + 1; ei <= end; ei++ {
Expand Down
8 changes: 5 additions & 3 deletions packages/core/notarization/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,10 +1082,10 @@ func loadSnapshot(m *Manager, testFramework *tangleold.BlockTestFramework) {
header.FullEpochIndex = epoch.Index(0)

var createMetadata []*ledger.OutputWithMetadata
for _, metadata := range snapshot.OutputsWithMetadata {
for _, metadata := range snapshot.FullUTXOStates {
createMetadata = append(createMetadata, metadata)
}
header.OutputWithMetadataCount = uint64(len(snapshot.OutputsWithMetadata))
header.OutputWithMetadataCount = uint64(len(snapshot.FullUTXOStates))
snapshot.EpochDiffs = make(map[epoch.Index]*ledger.EpochDiff)
snapshot.EpochDiffs[epoch.Index(0)] = ledger.NewEpochDiff([]*ledger.OutputWithMetadata{}, createMetadata)

Expand All @@ -1095,7 +1095,9 @@ func loadSnapshot(m *Manager, testFramework *tangleold.BlockTestFramework) {
header.LatestECRecord = ecRecord
snapshot.Header = header

m.LoadSnapshot(snapshot)
m.LoadFullUTXOStates(snapshot.FullUTXOStates)
m.LoadEpochDiffs(snapshot.Header, snapshot.EpochDiffs)
m.LoadECandEIs(snapshot.Header)
}

func registerToTangleEvents(sfg *acceptance.Gadget, testTangle *tangleold.Tangle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
"github.com/iotaledger/hive.go/serix"
)

// StreamSnapshotDataFrom consumes a snapshot from the given reader.
func StreamSnapshotDataFrom(
// streamSnapshotDataFrom consumes a snapshot from the given reader.
func streamSnapshotDataFrom(
reader io.ReadSeeker,
outputConsumer OutputWithMetadataConsumerFunc,
epochDiffsConsumer EpochDiffsConsumerFunc,
notarizationConsumer NotarizationConsumerFunc) error {
headerConsumer HeaderConsumerFunc,
outputConsumer UTXOStatesConsumerFunc,
epochDiffsConsumer EpochDiffsConsumerFunc) error {

header, err := ReadSnapshotHeader(reader)
header, err := readSnapshotHeader(reader)
if err != nil {
return err
}
Expand All @@ -30,16 +30,16 @@ func StreamSnapshotDataFrom(
scanner.Split(scanDelimiter)

// read latest ECRecord
ecRecord, err := ReadECRecord(scanner)
ecRecord, err := readECRecord(scanner)
if err != nil {
return err
}
header.LatestECRecord = ecRecord
notarizationConsumer(header)
headerConsumer(header)

// read outputWithMetadata
for i := 0; uint64(i) < header.OutputWithMetadataCount; {
outputs, err := ReadOutputWithMetadata(scanner)
outputs, err := readOutputWithMetadata(scanner)
if err != nil {
return err
}
Expand All @@ -48,7 +48,7 @@ func StreamSnapshotDataFrom(
outputConsumer(outputs)
}

epochDiffs, err := ReadEpochDiffs(scanner)
epochDiffs, err := readEpochDiffs(scanner)
if err != nil {
return errors.Errorf("failed to parse epochDiffs from bytes: %w", err)
}
Expand All @@ -57,7 +57,7 @@ func StreamSnapshotDataFrom(
return nil
}

func ReadSnapshotHeader(reader io.ReadSeeker) (*ledger.SnapshotHeader, error) {
func readSnapshotHeader(reader io.ReadSeeker) (*ledger.SnapshotHeader, error) {
header := &ledger.SnapshotHeader{}

if err := binary.Read(reader, binary.LittleEndian, &header.OutputWithMetadataCount); err != nil {
Expand All @@ -78,15 +78,14 @@ func ReadSnapshotHeader(reader io.ReadSeeker) (*ledger.SnapshotHeader, error) {
return header, nil
}

// ReadOutputWithMetadata consumes a slice of OutputWithMetadata from the given reader.
func ReadOutputWithMetadata(scanner *bufio.Scanner) (outputMetadatas []*ledger.OutputWithMetadata, err error) {
// readOutputWithMetadata consumes a slice of OutputWithMetadata from the given reader.
func readOutputWithMetadata(scanner *bufio.Scanner) (outputMetadatas []*ledger.OutputWithMetadata, err error) {
scanner.Scan()
data := scanner.Bytes()

if len(data) > 0 {
typeSet := new(serix.TypeSettings)
outputMetadatas = make([]*ledger.OutputWithMetadata, 0)
_, err = serix.DefaultAPI.Decode(context.Background(), data, &outputMetadatas, serix.WithTypeSettings(typeSet.WithLengthPrefixType(serix.LengthPrefixTypeAsUint32)))
_, err = serix.DefaultAPI.Decode(context.Background(), data, &outputMetadatas, serix.WithValidation())
if err != nil {
return nil, err
}
Expand All @@ -100,15 +99,14 @@ func ReadOutputWithMetadata(scanner *bufio.Scanner) (outputMetadatas []*ledger.O
return
}

// ReadEpochDiffs consumes a map of EpochDiff from the given reader.
func ReadEpochDiffs(scanner *bufio.Scanner) (epochDiffs map[epoch.Index]*ledger.EpochDiff, err error) {
// readEpochDiffs consumes a map of EpochDiff from the given reader.
func readEpochDiffs(scanner *bufio.Scanner) (epochDiffs map[epoch.Index]*ledger.EpochDiff, err error) {
epochDiffs = make(map[epoch.Index]*ledger.EpochDiff)

scanner.Scan()
data := scanner.Bytes()
if len(data) > 0 {
typeSet := new(serix.TypeSettings)
_, err = serix.DefaultAPI.Decode(context.Background(), data, &epochDiffs, serix.WithTypeSettings(typeSet.WithLengthPrefixType(serix.LengthPrefixTypeAsUint32)))
_, err = serix.DefaultAPI.Decode(context.Background(), data, &epochDiffs, serix.WithValidation())
if err != nil {
return nil, errors.Errorf("failed to parse epochDiffs from bytes: %w", err)
}
Expand All @@ -128,8 +126,8 @@ func ReadEpochDiffs(scanner *bufio.Scanner) (epochDiffs map[epoch.Index]*ledger.
return
}

// ReadECRecord consumes the latest ECRecord from the given reader.
func ReadECRecord(scanner *bufio.Scanner) (ecRecord *epoch.ECRecord, err error) {
// readECRecord consumes the latest ECRecord from the given reader.
func readECRecord(scanner *bufio.Scanner) (ecRecord *epoch.ECRecord, err error) {
scanner.Scan()

ecRecord = &epoch.ECRecord{}
Expand Down
Loading