Skip to content

Commit 4d6f399

Browse files
committed
add basic support in sdk for block-stm
1 parent e7bcfc2 commit 4d6f399

File tree

10 files changed

+195
-52
lines changed

10 files changed

+195
-52
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
6060
* x/distribution can now utilize an externally managed community pool. NOTE: this will make the message handlers for FundCommunityPool and CommunityPoolSpend error, as well as the query handler for CommunityPool.
6161
* (client) [#18101](https://github.com/cosmos/cosmos-sdk/pull/18101) Add a `keyring-default-keyname` in `client.toml` for specifying a default key name, and skip the need to use the `--from` flag when signing transactions.
6262
* (x/gov) [#24355](https://github.com/cosmos/cosmos-sdk/pull/24355) Allow users to set a custom CalculateVoteResultsAndVotingPower function to be used in govkeeper.Tally.
63+
* (baseapp) [#24458](https://github.com/cosmos/cosmos-sdk/pull/24458) Add `TxExecutor` baseapp option, add `TxIndex`/`TxCount`/`MsgIndex`/`BlockGasUsed` fields to `Context, to support parallel execution, introduce incarnation cache for performance optimisation
6364

6465
### Improvements
6566

baseapp/abci.go

+49-29
Original file line numberDiff line numberDiff line change
@@ -791,41 +791,21 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
791791

792792
// Reset the gas meter so that the AnteHandlers aren't required to
793793
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
794-
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
794+
app.finalizeBlockState.SetContext(
795+
app.finalizeBlockState.Context().
796+
WithBlockGasMeter(gasMeter).
797+
WithTxCount(len(req.Txs)),
798+
)
795799

796800
// Iterate over all raw transactions in the proposal and attempt to execute
797801
// them, gathering the execution results.
798802
//
799803
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
800804
// vote extensions, so skip those.
801-
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
802-
for _, rawTx := range req.Txs {
803-
var response *abci.ExecTxResult
804-
805-
if _, err := app.txDecoder(rawTx); err == nil {
806-
response = app.deliverTx(rawTx)
807-
} else {
808-
// In the case where a transaction included in a block proposal is malformed,
809-
// we still want to return a default response to comet. This is because comet
810-
// expects a response for each transaction included in a block proposal.
811-
response = sdkerrors.ResponseExecTxResultWithEvents(
812-
sdkerrors.ErrTxDecode,
813-
0,
814-
0,
815-
nil,
816-
false,
817-
)
818-
}
819-
820-
// check after every tx if we should abort
821-
select {
822-
case <-ctx.Done():
823-
return nil, ctx.Err()
824-
default:
825-
// continue
826-
}
827-
828-
txResults = append(txResults, response)
805+
txResults, err := app.executeTxsWithExecutor(ctx, req.Txs)
806+
if err != nil {
807+
// usually due to canceled
808+
return nil, err
829809
}
830810

831811
if app.finalizeBlockState.ms.TracingEnabled() {
@@ -856,6 +836,46 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
856836
}, nil
857837
}
858838

839+
func (app *BaseApp) executeTxsWithExecutor(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
840+
if app.txExecutor != nil {
841+
return app.txExecutor(ctx, txs, app.finalizeBlockState.ms, func(i int, memTx sdk.Tx, ms storetypes.MultiStore, incarnationCache map[string]any) *abci.ExecTxResult {
842+
return app.deliverTxWithMultiStore(txs[i], i, ms, incarnationCache)
843+
})
844+
}
845+
846+
// Fallback to the default execution logic
847+
txResults := make([]*abci.ExecTxResult, 0, len(txs))
848+
for i, rawTx := range txs {
849+
var response *abci.ExecTxResult
850+
851+
if _, err := app.txDecoder(rawTx); err == nil {
852+
response = app.deliverTx(rawTx, i)
853+
} else {
854+
// In the case where a transaction included in a block proposal is malformed,
855+
// we still want to return a default response to comet. This is because comet
856+
// expects a response for each transaction included in a block proposal.
857+
response = sdkerrors.ResponseExecTxResultWithEvents(
858+
sdkerrors.ErrTxDecode,
859+
0,
860+
0,
861+
nil,
862+
false,
863+
)
864+
}
865+
866+
// check after every tx if we should abort
867+
select {
868+
case <-ctx.Done():
869+
return nil, ctx.Err()
870+
default:
871+
// continue
872+
}
873+
874+
txResults = append(txResults, response)
875+
}
876+
return txResults, nil
877+
}
878+
859879
// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
860880
// Specifically, it will execute an application's BeginBlock (if defined), followed
861881
// by the transactions in the proposal, finally followed by the application's

baseapp/baseapp.go

+23-5
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ type BaseApp struct {
198198
//
199199
// SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler.
200200
disableBlockGasMeter bool
201+
202+
// Optional alternative tx executor, used for block-stm parallel transaction execution. If nil, default executor is used.
203+
txExecutor TxExecutor
201204
}
202205

203206
// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
@@ -674,7 +677,7 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter {
674677
}
675678

676679
// retrieve the context for the tx w/ txBytes and other memoized values.
677-
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
680+
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context {
678681
app.mu.Lock()
679682
defer app.mu.Unlock()
680683

@@ -684,7 +687,8 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
684687
}
685688
ctx := modeState.Context().
686689
WithTxBytes(txBytes).
687-
WithGasMeter(storetypes.NewInfiniteGasMeter())
690+
WithGasMeter(storetypes.NewInfiniteGasMeter()).
691+
WithTxIndex(txIndex)
688692
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed
689693

690694
ctx = ctx.WithIsSigverifyTx(app.sigverifyTx)
@@ -769,7 +773,11 @@ func (app *BaseApp) beginBlock(_ *abci.RequestFinalizeBlock) (sdk.BeginBlock, er
769773
return resp, nil
770774
}
771775

772-
func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
776+
func (app *BaseApp) deliverTx(tx []byte, txIndex int) *abci.ExecTxResult {
777+
return app.deliverTxWithMultiStore(tx, txIndex, nil, nil)
778+
}
779+
780+
func (app *BaseApp) deliverTxWithMultiStore(tx []byte, txIndex int, txMultiStore storetypes.MultiStore, incarnationCache map[string]any) *abci.ExecTxResult {
773781
gInfo := sdk.GasInfo{}
774782
resultStr := "successful"
775783

@@ -782,7 +790,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
782790
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
783791
}()
784792

785-
gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx, nil)
793+
gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, nil, txIndex, txMultiStore, incarnationCache)
786794
if err != nil {
787795
resultStr = "failed"
788796
resp = sdkerrors.ResponseExecTxResultWithEvents(
@@ -842,12 +850,20 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
842850
// both txbytes and the decoded tx are passed to runTx to avoid the state machine encoding the tx and decoding the transaction twice
843851
// passing the decoded tx to runTX is optional, it will be decoded if the tx is nil
844852
func (app *BaseApp) runTx(mode execMode, txBytes []byte, tx sdk.Tx) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
853+
return app.runTxWithMultiStore(mode, txBytes, tx, -1, nil, nil)
854+
}
855+
856+
func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, tx sdk.Tx, txIndex int, txMultiStore storetypes.MultiStore, incarnationCache map[string]any) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
845857
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
846858
// determined by the GasMeter. We need access to the context to get the gas
847859
// meter, so we initialize upfront.
848860
var gasWanted uint64
849861

850-
ctx := app.getContextForTx(mode, txBytes)
862+
ctx := app.getContextForTx(mode, txBytes, txIndex)
863+
ctx = ctx.WithIncarnationCache(incarnationCache)
864+
if txMultiStore != nil {
865+
ctx = ctx.WithMultiStore(txMultiStore)
866+
}
851867
ms := ctx.MultiStore()
852868

853869
// only run the tx if there is block gas remaining
@@ -1040,6 +1056,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me
10401056
break
10411057
}
10421058

1059+
ctx = ctx.WithMsgIndex(i)
1060+
10431061
handler := app.msgServiceRouter.Handler(msg)
10441062
if handler == nil {
10451063
return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg)

baseapp/genesis.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil)
1313
// ExecuteGenesisTx implements genesis.GenesisState from
1414
// cosmossdk.io/core/genesis to set initial state in genesis
1515
func (ba *BaseApp) ExecuteGenesisTx(tx []byte) error {
16-
res := ba.deliverTx(tx)
16+
res := ba.deliverTx(tx, -1)
1717

1818
if res.Code != types.CodeTypeOK {
1919
return errors.New(res.Log)

baseapp/options.go

+10
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ func SetOptimisticExecution(opts ...func(*oe.OptimisticExecution)) func(*BaseApp
124124
}
125125
}
126126

127+
// SetTxExecutor sets a custom tx executor for the BaseApp (e.g for parallel execution).
128+
func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
129+
return func(app *BaseApp) { app.txExecutor = executor }
130+
}
131+
127132
// DisableBlockGasMeter disables the block gas meter.
128133
func DisableBlockGasMeter() func(*BaseApp) {
129134
return func(app *BaseApp) { app.SetDisableBlockGasMeter(true) }
@@ -403,3 +408,8 @@ func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
403408
func (app *BaseApp) SetGRPCQueryRouter(grpcQueryRouter *GRPCQueryRouter) {
404409
app.grpcQueryRouter = grpcQueryRouter
405410
}
411+
412+
// SetTxExecutor sets a custom tx executor for the BaseApp (e.g for parallel execution).
413+
func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
414+
app.txExecutor = executor
415+
}

baseapp/test_helpers.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s
7777
}
7878

7979
func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context {
80-
return app.getContextForTx(execModeFinalize, txBytes)
80+
return app.getContextForTx(execModeFinalize, txBytes, -1)
8181
}
8282

8383
func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context {
84-
return app.getContextForTx(execModeCheck, txBytes)
84+
return app.getContextForTx(execModeCheck, txBytes, -1)
8585
}

baseapp/txexecutor.go

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package baseapp
2+
3+
import (
4+
"context"
5+
6+
abci "github.com/cometbft/cometbft/abci/types"
7+
8+
"cosmossdk.io/store/types"
9+
10+
sdk "github.com/cosmos/cosmos-sdk/types"
11+
)
12+
13+
type TxExecutor func(
14+
ctx context.Context,
15+
block [][]byte,
16+
cms types.MultiStore,
17+
deliverTxWithMultiStore func(int, sdk.Tx, types.MultiStore, map[string]any) *abci.ExecTxResult,
18+
) ([]*abci.ExecTxResult, error)

simapp/app.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ package simapp
55
import (
66
"encoding/json"
77
"fmt"
8+
"io"
9+
"maps"
10+
811
abci "github.com/cometbft/cometbft/abci/types"
912
dbm "github.com/cosmos/cosmos-db"
1013
"github.com/cosmos/gogoproto/proto"
1114
"github.com/spf13/cast"
12-
"io"
13-
"maps"
1415

1516
autocliv1 "cosmossdk.io/api/cosmos/autocli/v1"
1617
reflectionv1 "cosmossdk.io/api/cosmos/reflection/v1"
@@ -123,7 +124,8 @@ var (
123124
govtypes.ModuleName: {authtypes.Burner},
124125
nft.ModuleName: nil,
125126
protocolpooltypes.ModuleName: nil,
126-
protocolpooltypes.ProtocolPoolEscrowAccount: nil}
127+
protocolpooltypes.ProtocolPoolEscrowAccount: nil,
128+
}
127129
)
128130

129131
var (
@@ -466,7 +468,7 @@ func NewSimApp(
466468

467469
app.GovKeeper = *govKeeper.SetHooks(
468470
govtypes.NewMultiGovHooks(
469-
// register the governance hooks
471+
// register the governance hooks
470472
),
471473
)
472474

@@ -496,7 +498,7 @@ func NewSimApp(
496498

497499
app.EpochsKeeper.SetHooks(
498500
epochstypes.NewMultiEpochHooks(
499-
// insert epoch hooks receivers here
501+
// insert epoch hooks receivers here
500502
),
501503
)
502504

types/context.go

+55
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ type Context struct {
6464
streamingManager storetypes.StreamingManager
6565
cometInfo comet.BlockInfo
6666
headerInfo header.Info
67+
68+
// For block-stm
69+
txIndex int // the index of the current tx in the block, -1 means not in finalize block context
70+
msgIndex int // the index of the current msg in the tx, -1 means not in finalize block context
71+
txCount int // the total number of transactions in current block
72+
blockGasUsed uint64 // sum the gas used by all the transactions in the current block, only accessible by end blocker
73+
incarnationCache map[string]any // incarnationCache is shared between multiple incarnations of the same transaction, it must only cache stateless computation results that only depends on tx body and block level information that don't change during block execution, like the result of tx signature verification.
6774
}
6875

6976
// Proposed rename, not done to avoid API breakage
@@ -92,6 +99,11 @@ func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.trans
9299
func (c Context) StreamingManager() storetypes.StreamingManager { return c.streamingManager }
93100
func (c Context) CometInfo() comet.BlockInfo { return c.cometInfo }
94101
func (c Context) HeaderInfo() header.Info { return c.headerInfo }
102+
func (c Context) TxIndex() int { return c.txIndex }
103+
func (c Context) MsgIndex() int { return c.msgIndex }
104+
func (c Context) TxCount() int { return c.txCount }
105+
func (c Context) BlockGasUsed() uint64 { return c.blockGasUsed }
106+
func (c Context) IncarnationCache() map[string]any { return c.incarnationCache }
95107

96108
// BlockHeader returns the header by value.
97109
func (c Context) BlockHeader() cmtproto.Header {
@@ -138,6 +150,8 @@ func NewContext(ms storetypes.MultiStore, header cmtproto.Header, isCheckTx bool
138150
eventManager: NewEventManager(),
139151
kvGasConfig: storetypes.KVGasConfig(),
140152
transientKVGasConfig: storetypes.TransientGasConfig(),
153+
txIndex: -1,
154+
msgIndex: -1,
141155
}
142156
}
143157

@@ -317,6 +331,26 @@ func (c Context) WithHeaderInfo(headerInfo header.Info) Context {
317331
return c
318332
}
319333

334+
func (c Context) WithTxIndex(txIndex int) Context {
335+
c.txIndex = txIndex
336+
return c
337+
}
338+
339+
func (c Context) WithTxCount(txCount int) Context {
340+
c.txCount = txCount
341+
return c
342+
}
343+
344+
func (c Context) WithMsgIndex(msgIndex int) Context {
345+
c.msgIndex = msgIndex
346+
return c
347+
}
348+
349+
func (c Context) WithBlockGasUsed(gasUsed uint64) Context {
350+
c.blockGasUsed = gasUsed
351+
return c
352+
}
353+
320354
// TODO: remove???
321355
func (c Context) IsZero() bool {
322356
return c.ms == nil
@@ -365,6 +399,27 @@ func (c Context) CacheContext() (cc Context, writeCache func()) {
365399
return cc, writeCache
366400
}
367401

402+
func (c Context) GetIncarnationCache(key string) (any, bool) {
403+
if c.incarnationCache == nil {
404+
return nil, false
405+
}
406+
val, ok := c.incarnationCache[key]
407+
return val, ok
408+
}
409+
410+
func (c Context) SetIncarnationCache(key string, value any) {
411+
if c.incarnationCache == nil {
412+
// noop if cache is not initialized
413+
return
414+
}
415+
c.incarnationCache[key] = value
416+
}
417+
418+
func (c Context) WithIncarnationCache(cache map[string]any) Context {
419+
c.incarnationCache = cache
420+
return c
421+
}
422+
368423
var (
369424
_ context.Context = Context{}
370425
_ storetypes.Context = Context{}

0 commit comments

Comments
 (0)