Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-538] Apply dYdX patches to Cosmos SDK 0.50.1 fork #30

Merged
merged 10 commits into from
Dec 6, 2023
Prev Previous commit
Next Next commit
[STAB-4] Use unsynchronized local client
This change pushes the mutex that was in the local client to the top level of the ABCI methods and uses the unsynchronized local client. A future change is intended to reduce the critical sections of the various ABCI methods.

We also replace cometbft usage with dYdX fork.
  • Loading branch information
lcwik committed Dec 4, 2023
commit f233d7fd52d4a99d510faa920530c08e899f9bf4
50 changes: 50 additions & 0 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
)

func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitChain, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

if req.ChainId != app.chainID {
return nil, fmt.Errorf("invalid chain-id on InitChain; expected: %s, got: %s", app.chainID, req.ChainId)
}
Expand Down Expand Up @@ -148,6 +151,9 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha
}

func (app *BaseApp) Info(req *abci.RequestInfo) (*abci.ResponseInfo, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

lastCommitID := app.cms.LastCommitID()

return &abci.ResponseInfo{
Expand All @@ -162,6 +168,9 @@ func (app *BaseApp) Info(req *abci.RequestInfo) (*abci.ResponseInfo, error) {
// Query implements the ABCI interface. It delegates to CommitMultiStore if it
// implements Queryable.
func (app *BaseApp) Query(_ context.Context, req *abci.RequestQuery) (resp *abci.ResponseQuery, err error) {
app.mtx.Lock()
defer app.mtx.Unlock()

// add panic recovery for all queries
//
// Ref: https://github.com/cosmos/cosmos-sdk/pull/8039
Expand Down Expand Up @@ -215,6 +224,9 @@ func (app *BaseApp) Query(_ context.Context, req *abci.RequestQuery) (resp *abci

// ListSnapshots implements the ABCI interface. It delegates to app.snapshotManager if set.
func (app *BaseApp) ListSnapshots(req *abci.RequestListSnapshots) (*abci.ResponseListSnapshots, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

resp := &abci.ResponseListSnapshots{Snapshots: []*abci.Snapshot{}}
if app.snapshotManager == nil {
return resp, nil
Expand All @@ -241,6 +253,9 @@ func (app *BaseApp) ListSnapshots(req *abci.RequestListSnapshots) (*abci.Respons

// LoadSnapshotChunk implements the ABCI interface. It delegates to app.snapshotManager if set.
func (app *BaseApp) LoadSnapshotChunk(req *abci.RequestLoadSnapshotChunk) (*abci.ResponseLoadSnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

if app.snapshotManager == nil {
return &abci.ResponseLoadSnapshotChunk{}, nil
}
Expand All @@ -262,6 +277,9 @@ func (app *BaseApp) LoadSnapshotChunk(req *abci.RequestLoadSnapshotChunk) (*abci

// OfferSnapshot implements the ABCI interface. It delegates to app.snapshotManager if set.
func (app *BaseApp) OfferSnapshot(req *abci.RequestOfferSnapshot) (*abci.ResponseOfferSnapshot, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

if app.snapshotManager == nil {
app.logger.Error("snapshot manager not configured")
return &abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil
Expand Down Expand Up @@ -311,6 +329,9 @@ func (app *BaseApp) OfferSnapshot(req *abci.RequestOfferSnapshot) (*abci.Respons

// ApplySnapshotChunk implements the ABCI interface. It delegates to app.snapshotManager if set.
func (app *BaseApp) ApplySnapshotChunk(req *abci.RequestApplySnapshotChunk) (*abci.ResponseApplySnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

if app.snapshotManager == nil {
app.logger.Error("snapshot manager not configured")
return &abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ABORT}, nil
Expand Down Expand Up @@ -347,6 +368,9 @@ func (app *BaseApp) ApplySnapshotChunk(req *abci.RequestApplySnapshotChunk) (*ab
// will contain relevant error information. Regardless of tx execution outcome,
// the ResponseCheckTx will contain relevant gas execution context.
func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

var mode execMode

switch {
Expand Down Expand Up @@ -388,6 +412,9 @@ func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, er
// Ref: https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-060-abci-1.0.md
// Ref: https://github.com/cometbft/cometbft/blob/main/spec/abci/abci%2B%2B_basic_concepts.md
func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abci.ResponsePrepareProposal, err error) {
app.mtx.Lock()
defer app.mtx.Unlock()

if app.prepareProposal == nil {
return nil, errors.New("PrepareProposal handler not set")
}
Expand Down Expand Up @@ -466,6 +493,9 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
// Ref: https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-060-abci-1.0.md
// Ref: https://github.com/cometbft/cometbft/blob/main/spec/abci/abci%2B%2B_basic_concepts.md
func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abci.ResponseProcessProposal, err error) {
app.mtx.Lock()
defer app.mtx.Unlock()

if app.processProposal == nil {
return nil, errors.New("ProcessProposal handler not set")
}
Expand Down Expand Up @@ -562,6 +592,9 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
// height and are committed in the subsequent height, i.e. H+2. An error is
// returned if vote extensions are not enabled or if extendVote fails or panics.
func (app *BaseApp) ExtendVote(_ context.Context, req *abci.RequestExtendVote) (resp *abci.ResponseExtendVote, err error) {
app.mtx.Lock()
defer app.mtx.Unlock()

// Always reset state given that ExtendVote and VerifyVoteExtension can timeout
// and be called again in a subsequent round.
var ctx sdk.Context
Expand Down Expand Up @@ -635,6 +668,9 @@ func (app *BaseApp) ExtendVote(_ context.Context, req *abci.RequestExtendVote) (
// phase. The response MUST be deterministic. An error is returned if vote
// extensions are not enabled or if verifyVoteExt fails or panics.
func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (resp *abci.ResponseVerifyVoteExtension, err error) {
app.mtx.Lock()
defer app.mtx.Unlock()

if app.verifyVoteExt == nil {
return nil, errors.New("application VerifyVoteExtension handler not set")
}
Expand Down Expand Up @@ -848,6 +884,9 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
// extensions into the proposal, which should not themselves be executed in cases
// where they adhere to the sdk.Tx interface.
func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

if app.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := app.optimisticExec.AbortIfNeeded(req.Hash)
Expand Down Expand Up @@ -901,6 +940,9 @@ func (app *BaseApp) checkHalt(height int64, time time.Time) error {
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

header := app.finalizeBlockState.ctx.BlockHeader()
retainHeight := app.GetBlockRetentionHeight(header.Height)

Copy link

@teddyding teddyding Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: do we still need the commiter logic from here? Or is that replaced by the prepareCheckStater below?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is replaced by prepareCheckStater which @prettymuchbryce pushed upstream as the replacement for the committer logic.

Expand Down Expand Up @@ -974,7 +1016,11 @@ func handleQueryApp(app *BaseApp, path []string, req *abci.RequestQuery) *abci.R
case "simulate":
txBytes := req.Data

// Simulate enters runTx which requires us to give up the lock now.
app.mtx.Unlock()
gInfo, res, err := app.Simulate(txBytes)
app.mtx.Lock()

if err != nil {
return sdkerrors.QueryResult(errorsmod.Wrap(err, "failed to simulate tx"), app.trace)
}
Expand Down Expand Up @@ -1122,7 +1168,11 @@ func (app *BaseApp) handleQueryGRPC(handler GRPCQueryHandler, req *abci.RequestQ
return sdkerrors.QueryResult(err, app.trace)
}

// handler recursively can re-enter Query which requires us to give up the lock now.
app.mtx.Unlock()
resp, err := handler(ctx, req)
app.mtx.Lock()

if err != nil {
resp = sdkerrors.QueryResult(gRPCErrorToSDKError(err), app.trace)
resp.Height = req.Height
Expand Down
4 changes: 4 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"sort"
"strconv"
"sync"

"github.com/cockroachdb/errors"
abci "github.com/cometbft/cometbft/abci/types"
Expand Down Expand Up @@ -184,6 +185,9 @@ type BaseApp struct {
// including the goroutine handling.This is experimental and must be enabled
// by developers.
optimisticExec *oe.OptimisticExecution

// Used to synchronize the application when using an unsynchronized ABCI++ client.
mtx sync.Mutex
Copy link

@teddyding teddyding Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question on DeliverTx:

I see that on v0.47 fork we take the app.mtx lock

cosmos-sdk/baseapp/abci.go

Lines 418 to 429 in bdf96fd

func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
return app.DeliverTxShouldLock(req, true)
}
// DeliverTxShouldLock enables control of whether the lock should be acquired. The golang mutex
// is not reentrant so we enable conditional locking to prevent deadlock since cosmos-sdk/x/genutil.InitGenesis
// invokes DeliverTx from the ABCI++ InitGenesis method which already holds the lock.
func (app *BaseApp) DeliverTxShouldLock(req abci.RequestDeliverTx, needsLock bool) (res abci.ResponseDeliverTx) {
if needsLock {
app.mtx.Lock()
defer app.mtx.Unlock()
}
in DeliverTx. How has this changed in v0.50? This is the corresponding deliverTx function - is it intended to not take the lock?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cosmos was re-entrant when delivering txs during block commit which is why we needed the ability to selectively take the lock. CometBFT pushed down block commit logic of individually calling DeliverTx into FinalizeBlock which means that we could elevate the need for acquiring the lock to FinalizeBlock removing it from deliverTx which also meant that we didn't care any more that Cosmos was re-entrant for this method or not.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. I see from here tha BeginBlock, DeliverTx, and EndBlock are coalesced into FinalizeBlock

}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
2 changes: 1 addition & 1 deletion baseapp/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil)

// ExecuteGenesisTx implements genesis.GenesisState from
// cosmossdk.io/core/genesis to set initial state in genesis
func (ba BaseApp) ExecuteGenesisTx(tx []byte) error {
func (ba *BaseApp) ExecuteGenesisTx(tx []byte) error {
res := ba.deliverTx(tx)

if res.Code != types.CodeTypeOK {
Expand Down
3 changes: 2 additions & 1 deletion server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,13 @@ func startCmtNode(
cfg,
pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()),
nodeKey,
proxy.NewLocalClientCreator(cmtApp),
proxy.NewUnsynchronizedLocalClientCreator(cmtApp),
getGenDocProvider(cfg),
cmtcfg.DefaultDBProvider,
node.DefaultMetricsProvider(cfg.Instrumentation),
servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger},
)

if err != nil {
return tmNode, cleanupFn, err
}
Expand Down
2 changes: 1 addition & 1 deletion testutil/network/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func startInProcess(cfg Config, val *Validator) error {
cmtCfg,
pvm.LoadOrGenFilePV(cmtCfg.PrivValidatorKeyFile(), cmtCfg.PrivValidatorStateFile()),
nodeKey,
proxy.NewLocalClientCreator(cmtApp),
proxy.NewUnsynchronizedLocalClientCreator(cmtApp),
appGenesisProvider,
cmtcfg.DefaultDBProvider,
node.DefaultMetricsProvider(cmtCfg.Instrumentation),
Expand Down