Skip to content

Commit 9d2e307

Browse files
committed
add basic support in sdk for block-stm
1 parent e7bcfc2 commit 9d2e307

File tree

8 files changed

+132
-37
lines changed

8 files changed

+132
-37
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
6060
* x/distribution can now utilize an externally managed community pool. NOTE: this will make the message handlers for FundCommunityPool and CommunityPoolSpend error, as well as the query handler for CommunityPool.
6161
* (client) [#18101](https://github.com/cosmos/cosmos-sdk/pull/18101) Add a `keyring-default-keyname` in `client.toml` for specifying a default key name, and skip the need to use the `--from` flag when signing transactions.
6262
* (x/gov) [#24355](https://github.com/cosmos/cosmos-sdk/pull/24355) Allow users to set a custom CalculateVoteResultsAndVotingPower function to be used in govkeeper.Tally.
63+
* (baseapp) [#24458](https://github.com/cosmos/cosmos-sdk/pull/24458) Add `TxExecutor` baseapp option, add `TxIndex`/`TxCount`/`MsgIndex`/`BlockGasUsed` fields to `Context, to support parallel execution.
6364

6465
### Improvements
6566

baseapp/abci.go

+48-29
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
758758

759759
// GasMeter must be set after we get a context with updated consensus params.
760760
gasMeter := app.getBlockGasMeter(app.finalizeBlockState.Context())
761-
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
761+
app.finalizeBlockState.SetContext(
762+
app.finalizeBlockState.Context().
763+
WithBlockGasMeter(gasMeter).
764+
WithTxCount(len(req.Txs)),
765+
)
762766

763767
if app.checkState != nil {
764768
app.checkState.SetContext(app.checkState.Context().
@@ -798,34 +802,10 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
798802
//
799803
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
800804
// vote extensions, so skip those.
801-
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
802-
for _, rawTx := range req.Txs {
803-
var response *abci.ExecTxResult
804-
805-
if _, err := app.txDecoder(rawTx); err == nil {
806-
response = app.deliverTx(rawTx)
807-
} else {
808-
// In the case where a transaction included in a block proposal is malformed,
809-
// we still want to return a default response to comet. This is because comet
810-
// expects a response for each transaction included in a block proposal.
811-
response = sdkerrors.ResponseExecTxResultWithEvents(
812-
sdkerrors.ErrTxDecode,
813-
0,
814-
0,
815-
nil,
816-
false,
817-
)
818-
}
819-
820-
// check after every tx if we should abort
821-
select {
822-
case <-ctx.Done():
823-
return nil, ctx.Err()
824-
default:
825-
// continue
826-
}
827-
828-
txResults = append(txResults, response)
805+
txResults, err := app.executeTxs(ctx, req.Txs)
806+
if err != nil {
807+
// usually due to canceled
808+
return nil, err
829809
}
830810

831811
if app.finalizeBlockState.ms.TracingEnabled() {
@@ -856,6 +836,45 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
856836
}, nil
857837
}
858838

839+
func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
840+
if app.txExecutor != nil {
841+
return app.txExecutor(ctx, len(txs), app.finalizeBlockState.ms, func(i int, ms storetypes.MultiStore) *abci.ExecTxResult {
842+
return app.deliverTxWithMultiStore(txs[i], i, ms)
843+
})
844+
}
845+
846+
txResults := make([]*abci.ExecTxResult, 0, len(txs))
847+
for i, rawTx := range txs {
848+
var response *abci.ExecTxResult
849+
850+
if _, err := app.txDecoder(rawTx); err == nil {
851+
response = app.deliverTx(rawTx, i)
852+
} else {
853+
// In the case where a transaction included in a block proposal is malformed,
854+
// we still want to return a default response to comet. This is because comet
855+
// expects a response for each transaction included in a block proposal.
856+
response = sdkerrors.ResponseExecTxResultWithEvents(
857+
sdkerrors.ErrTxDecode,
858+
0,
859+
0,
860+
nil,
861+
false,
862+
)
863+
}
864+
865+
// check after every tx if we should abort
866+
select {
867+
case <-ctx.Done():
868+
return nil, ctx.Err()
869+
default:
870+
// continue
871+
}
872+
873+
txResults = append(txResults, response)
874+
}
875+
return txResults, nil
876+
}
877+
859878
// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
860879
// Specifically, it will execute an application's BeginBlock (if defined), followed
861880
// by the transactions in the proposal, finally followed by the application's

baseapp/baseapp.go

+22-5
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ type BaseApp struct {
198198
//
199199
// SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler.
200200
disableBlockGasMeter bool
201+
202+
// Optional alternative tx executor, used for block-stm parallel transaction execution.
203+
txExecutor TxExecutor
201204
}
202205

203206
// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
@@ -674,7 +677,7 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter {
674677
}
675678

676679
// retrieve the context for the tx w/ txBytes and other memoized values.
677-
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
680+
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context {
678681
app.mu.Lock()
679682
defer app.mu.Unlock()
680683

@@ -684,7 +687,8 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
684687
}
685688
ctx := modeState.Context().
686689
WithTxBytes(txBytes).
687-
WithGasMeter(storetypes.NewInfiniteGasMeter())
690+
WithGasMeter(storetypes.NewInfiniteGasMeter()).
691+
WithMsgIndex(txIndex)
688692
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed
689693

690694
ctx = ctx.WithIsSigverifyTx(app.sigverifyTx)
@@ -769,7 +773,11 @@ func (app *BaseApp) beginBlock(_ *abci.RequestFinalizeBlock) (sdk.BeginBlock, er
769773
return resp, nil
770774
}
771775

772-
func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
776+
func (app *BaseApp) deliverTx(tx []byte, txIndex int) *abci.ExecTxResult {
777+
return app.deliverTxWithMultiStore(tx, txIndex, nil)
778+
}
779+
780+
func (app *BaseApp) deliverTxWithMultiStore(tx []byte, txIndex int, txMultiStore storetypes.MultiStore) *abci.ExecTxResult {
773781
gInfo := sdk.GasInfo{}
774782
resultStr := "successful"
775783

@@ -782,7 +790,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
782790
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
783791
}()
784792

785-
gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx, nil)
793+
gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, nil, txIndex, txMultiStore)
786794
if err != nil {
787795
resultStr = "failed"
788796
resp = sdkerrors.ResponseExecTxResultWithEvents(
@@ -842,12 +850,19 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
842850
// both txbytes and the decoded tx are passed to runTx to avoid the state machine encoding the tx and decoding the transaction twice
843851
// passing the decoded tx to runTX is optional, it will be decoded if the tx is nil
844852
func (app *BaseApp) runTx(mode execMode, txBytes []byte, tx sdk.Tx) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
853+
return app.runTxWithMultiStore(mode, txBytes, tx, -1, nil)
854+
}
855+
856+
func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, tx sdk.Tx, txIndex int, txMultiStore storetypes.MultiStore) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
845857
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
846858
// determined by the GasMeter. We need access to the context to get the gas
847859
// meter, so we initialize upfront.
848860
var gasWanted uint64
849861

850-
ctx := app.getContextForTx(mode, txBytes)
862+
ctx := app.getContextForTx(mode, txBytes, txIndex)
863+
if txMultiStore != nil {
864+
ctx = ctx.WithMultiStore(txMultiStore)
865+
}
851866
ms := ctx.MultiStore()
852867

853868
// only run the tx if there is block gas remaining
@@ -1040,6 +1055,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me
10401055
break
10411056
}
10421057

1058+
ctx = ctx.WithMsgIndex(i)
1059+
10431060
handler := app.msgServiceRouter.Handler(msg)
10441061
if handler == nil {
10451062
return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg)

baseapp/genesis.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil)
1313
// ExecuteGenesisTx implements genesis.GenesisState from
1414
// cosmossdk.io/core/genesis to set initial state in genesis
1515
func (ba *BaseApp) ExecuteGenesisTx(tx []byte) error {
16-
res := ba.deliverTx(tx)
16+
res := ba.deliverTx(tx, -1)
1717

1818
if res.Code != types.CodeTypeOK {
1919
return errors.New(res.Log)

baseapp/options.go

+10
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ func SetOptimisticExecution(opts ...func(*oe.OptimisticExecution)) func(*BaseApp
124124
}
125125
}
126126

127+
// SetTxExecutor sets a custom tx executor for the BaseApp (e.g for parallel execution).
128+
func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
129+
return func(app *BaseApp) { app.txExecutor = executor }
130+
}
131+
127132
// DisableBlockGasMeter disables the block gas meter.
128133
func DisableBlockGasMeter() func(*BaseApp) {
129134
return func(app *BaseApp) { app.SetDisableBlockGasMeter(true) }
@@ -403,3 +408,8 @@ func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
403408
func (app *BaseApp) SetGRPCQueryRouter(grpcQueryRouter *GRPCQueryRouter) {
404409
app.grpcQueryRouter = grpcQueryRouter
405410
}
411+
412+
// SetTxExecutor sets a custom tx executor for the BaseApp (e.g for parallel execution).
413+
func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
414+
app.txExecutor = executor
415+
}

baseapp/test_helpers.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s
7777
}
7878

7979
func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context {
80-
return app.getContextForTx(execModeFinalize, txBytes)
80+
return app.getContextForTx(execModeFinalize, txBytes, -1)
8181
}
8282

8383
func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context {
84-
return app.getContextForTx(execModeCheck, txBytes)
84+
return app.getContextForTx(execModeCheck, txBytes, -1)
8585
}

baseapp/txexecutor.go

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package baseapp
2+
3+
import (
4+
"context"
5+
6+
abci "github.com/cometbft/cometbft/abci/types"
7+
8+
"cosmossdk.io/store/types"
9+
)
10+
11+
type TxExecutor func(
12+
ctx context.Context,
13+
blockSize int,
14+
cms types.MultiStore,
15+
deliverTxWithMultiStore func(int, types.MultiStore) *abci.ExecTxResult,
16+
) ([]*abci.ExecTxResult, error)

types/context.go

+32
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ type Context struct {
6464
streamingManager storetypes.StreamingManager
6565
cometInfo comet.BlockInfo
6666
headerInfo header.Info
67+
68+
// For block-stm
69+
txIndex int // the index of the current tx in the block, -1 means not in finalize block context
70+
msgIndex int // the index of the current msg in the tx, -1 means not in finalize block context
71+
txCount int // the total number of transactions in current block
72+
blockGasUsed uint64 // sum the gas used by all the transactions in the current block, only accessible by end blocker
6773
}
6874

6975
// Proposed rename, not done to avoid API breakage
@@ -92,6 +98,10 @@ func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.trans
9298
func (c Context) StreamingManager() storetypes.StreamingManager { return c.streamingManager }
9399
func (c Context) CometInfo() comet.BlockInfo { return c.cometInfo }
94100
func (c Context) HeaderInfo() header.Info { return c.headerInfo }
101+
func (c Context) TxIndex() int { return c.txIndex }
102+
func (c Context) MsgIndex() int { return c.msgIndex }
103+
func (c Context) TxCount() int { return c.txCount }
104+
func (c Context) BlockGasUsed() uint64 { return c.blockGasUsed }
95105

96106
// BlockHeader returns the header by value.
97107
func (c Context) BlockHeader() cmtproto.Header {
@@ -138,6 +148,8 @@ func NewContext(ms storetypes.MultiStore, header cmtproto.Header, isCheckTx bool
138148
eventManager: NewEventManager(),
139149
kvGasConfig: storetypes.KVGasConfig(),
140150
transientKVGasConfig: storetypes.TransientGasConfig(),
151+
txIndex: -1,
152+
msgIndex: -1,
141153
}
142154
}
143155

@@ -317,6 +329,26 @@ func (c Context) WithHeaderInfo(headerInfo header.Info) Context {
317329
return c
318330
}
319331

332+
func (c Context) WithTxIndex(txIndex int) Context {
333+
c.txIndex = txIndex
334+
return c
335+
}
336+
337+
func (c Context) WithTxCount(txCount int) Context {
338+
c.txCount = txCount
339+
return c
340+
}
341+
342+
func (c Context) WithMsgIndex(msgIndex int) Context {
343+
c.msgIndex = msgIndex
344+
return c
345+
}
346+
347+
func (c Context) WithBlockGasUsed(gasUsed uint64) Context {
348+
c.blockGasUsed = gasUsed
349+
return c
350+
}
351+
320352
// TODO: remove???
321353
func (c Context) IsZero() bool {
322354
return c.ms == nil

0 commit comments

Comments
 (0)