diff --git a/consensus/replay_stubs.go b/consensus/replay_stubs.go index 5c94c9fce..adab0edf8 100644 --- a/consensus/replay_stubs.go +++ b/consensus/replay_stubs.go @@ -27,7 +27,6 @@ func (emptyMempool) Update( _ *types.Block, _ []*abci.ResponseDeliverTx, _ mempl.PreCheckFunc, - _ mempl.PostCheckFunc, ) error { return nil } diff --git a/mempool/cache_test.go b/mempool/cache_test.go index e36f36740..e560e8163 100644 --- a/mempool/cache_test.go +++ b/mempool/cache_test.go @@ -69,7 +69,7 @@ func TestCacheAfterUpdate(t *testing.T) { updateTxs = append(updateTxs, tx) } err := mempool.Update(newTestBlock(int64(tcIndex), updateTxs), - abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil) + abciResponses(len(updateTxs), abci.CodeTypeOK), nil) require.NoError(t, err) for _, v := range tc.reAddIndices { diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 0a8c6b438..808a6b0d0 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -40,6 +40,10 @@ type CListMempool struct { height int64 // the last block Update()'d to txsBytes int64 // total size of mempool, in bytes + reserved int // the number of checking tx and it should be considered when checking mempool full + reservedBytes int64 // size of checking tx and it should be considered when checking mempool full + reservedMtx sync.Mutex + // notify listeners (ie. consensus) when txs are available notifiedTxsAvailable bool txsAvailable chan struct{} // fires once for each height, when the mempool is not empty @@ -50,7 +54,6 @@ type CListMempool struct { // CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods. updateMtx tmsync.RWMutex preCheck PreCheckFunc - postCheck PostCheckFunc wal *auto.AutoFile // a log of mempool txs txs *clist.CList // concurrent linked-list of good txs @@ -126,13 +129,6 @@ func WithPreCheck(f PreCheckFunc) CListMempoolOption { return func(mem *CListMempool) { mem.preCheck = f } } -// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns -// false. This is ran after CheckTx. Only applies to the first created block. -// After that, Update overwrites the existing value. -func WithPostCheck(f PostCheckFunc) CListMempoolOption { - return func(mem *CListMempool) { mem.postCheck = f } -} - // WithMetrics sets the metrics. func WithMetrics(metrics *Metrics) CListMempoolOption { return func(mem *CListMempool) { mem.metrics = metrics } @@ -282,6 +278,14 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx return ErrTxInCache } + // reserve mempool that should be called just before calling `mem.proxyAppConn.CheckTxAsync()` + if err := mem.reserve(int64(txSize)); err != nil { + // remove from cache + mem.cache.Remove(tx) + return err + } + + // CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb)) @@ -391,6 +395,35 @@ func (mem *CListMempool) isFull(txSize int) error { return nil } +func (mem *CListMempool) reserve(txSize int64) error { + mem.reservedMtx.Lock() + defer mem.reservedMtx.Unlock() + + var ( + memSize = mem.Size() + txsBytes = mem.TxsBytes() + ) + + if memSize+mem.reserved >= mem.config.Size || txSize+mem.reservedBytes+txsBytes > mem.config.MaxTxsBytes { + return ErrMempoolIsFull{ + memSize + mem.reserved, mem.config.Size, + txsBytes + mem.reservedBytes, mem.config.MaxTxsBytes, + } + } + + mem.reserved++ + mem.reservedBytes += txSize + return nil +} + +func (mem *CListMempool) releaseReserve(txSize int64) { + mem.reservedMtx.Lock() + defer mem.reservedMtx.Unlock() + + mem.reserved-- + mem.reservedBytes -= txSize +} + // callback, which is called after the app checked the tx for the first time. // // The case where the app checks the tx for the second and subsequent times is @@ -403,20 +436,7 @@ func (mem *CListMempool) resCbFirstTime( ) { switch r := res.Value.(type) { case *abci.Response_CheckTx: - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { - // Check mempool isn't full again to reduce the chance of exceeding the - // limits. - if err := mem.isFull(len(tx)); err != nil { - // remove from cache (mempool might have a space later) - mem.cache.Remove(tx) - mem.logger.Error(err.Error()) - return - } - + if r.CheckTx.Code == abci.CodeTypeOK { memTx := &mempoolTx{ height: mem.height, gasWanted: r.CheckTx.GasWanted, @@ -434,13 +454,16 @@ func (mem *CListMempool) resCbFirstTime( } else { // ignore bad transaction mem.logger.Debug("rejected bad transaction", - "tx", txID(tx), "peerID", peerP2PID, "res", r, "err", postCheckErr) + "tx", txID(tx), "peerID", peerP2PID, "res", r) mem.metrics.FailedTxs.Add(1) if !mem.config.KeepInvalidTxsInCache { // remove from cache (it might be good later) mem.cache.Remove(tx) } } + + // release `reserve` regardless it's OK or not (it might be good later) + mem.releaseReserve(int64(len(tx))) default: // ignore other messages } @@ -461,15 +484,11 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { memTx.tx, tx)) } - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { + if r.CheckTx.Code == abci.CodeTypeOK { // Good, nothing to do. } else { // Tx became invalidated due to newly committed block. - mem.logger.Debug("tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr) + mem.logger.Debug("tx is no longer valid", "tx", txID(tx), "res", r) // NOTE: we remove tx from the cache because it might be good later mem.removeTx(tx, mem.recheckCursor, !mem.config.KeepInvalidTxsInCache) } @@ -562,12 +581,11 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { return txs } -// Lock() must be help by the caller during execution. +// Lock() must be held by the caller during execution. func (mem *CListMempool) Update( block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx, preCheck PreCheckFunc, - postCheck PostCheckFunc, ) error { // Set height mem.height = block.Height @@ -576,9 +594,6 @@ func (mem *CListMempool) Update( if preCheck != nil { mem.preCheck = preCheck } - if postCheck != nil { - mem.postCheck = postCheck - } for i, tx := range block.Txs { if deliverTxResponses[i].Code == abci.CodeTypeOK { diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 5b6f63638..eebbb0e33 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -147,31 +147,23 @@ func TestMempoolFilters(t *testing.T) { emptyTxArr := []types.Tx{[]byte{}} nopPreFilter := func(tx types.Tx) error { return nil } - nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil } // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs. // each tx has 20 bytes tests := []struct { numTxsToCreate int preFilter PreCheckFunc - postFilter PostCheckFunc expectedNumTxs int }{ - {10, nopPreFilter, nopPostFilter, 10}, - {10, PreCheckMaxBytes(10), nopPostFilter, 0}, - {10, PreCheckMaxBytes(22), nopPostFilter, 10}, - {10, nopPreFilter, PostCheckMaxGas(-1), 10}, - {10, nopPreFilter, PostCheckMaxGas(0), 0}, - {10, nopPreFilter, PostCheckMaxGas(1), 10}, - {10, nopPreFilter, PostCheckMaxGas(3000), 10}, - {10, PreCheckMaxBytes(10), PostCheckMaxGas(20), 0}, - {10, PreCheckMaxBytes(30), PostCheckMaxGas(20), 10}, - {10, PreCheckMaxBytes(22), PostCheckMaxGas(1), 10}, - {10, PreCheckMaxBytes(22), PostCheckMaxGas(0), 0}, + {10, nopPreFilter, 10}, + {10, PreCheckMaxBytes(10), 0}, + {10, PreCheckMaxBytes(20), 0}, + {10, PreCheckMaxBytes(22), 10}, + {10, PreCheckMaxBytes(30), 10}, } for tcIndex, tt := range tests { err := mempool.Update(newTestBlock(1, emptyTxArr), - abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter) + abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter) require.NoError(t, err) checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID) require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex) @@ -188,7 +180,7 @@ func TestMempoolUpdate(t *testing.T) { // 1. Adds valid txs to the cache { err := mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x01}}), - abciResponses(1, abci.CodeTypeOK), nil, nil) + abciResponses(1, abci.CodeTypeOK), nil) require.NoError(t, err) err = mempool.CheckTx([]byte{0x01}, nil, TxInfo{}) if assert.Error(t, err) { @@ -200,7 +192,7 @@ func TestMempoolUpdate(t *testing.T) { { err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{}) require.NoError(t, err) - err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x02}}), abciResponses(1, abci.CodeTypeOK), nil, nil) + err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x02}}), abciResponses(1, abci.CodeTypeOK), nil) require.NoError(t, err) assert.Zero(t, mempool.Size()) } @@ -209,7 +201,7 @@ func TestMempoolUpdate(t *testing.T) { { err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{}) require.NoError(t, err) - err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x03}}), abciResponses(1, 1), nil, nil) + err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x03}}), abciResponses(1, 1), nil) require.NoError(t, err) assert.Zero(t, mempool.Size()) @@ -241,7 +233,7 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) { _ = app.DeliverTx(abci.RequestDeliverTx{Tx: a}) _ = app.DeliverTx(abci.RequestDeliverTx{Tx: b}) err = mempool.Update(newTestBlock(1, []types.Tx{a, b}), - []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil, nil) + []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil) require.NoError(t, err) // a must be added to the cache @@ -297,7 +289,7 @@ func TestTxsAvailable(t *testing.T) { // since there are still txs left committedTxs, txs := txs[:50], txs[50:] if err := mempool.Update(newTestBlock(1, committedTxs), - abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { + abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil { t.Error(err) } ensureFire(t, mempool.TxsAvailable(), timeoutMS) @@ -310,7 +302,7 @@ func TestTxsAvailable(t *testing.T) { // now call update with all the txs. it should not fire as there are no txs left committedTxs = append(txs, moreTxs...) //nolint: gocritic if err := mempool.Update(newTestBlock(2, committedTxs), - abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { + abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil { t.Error(err) } ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) @@ -370,7 +362,7 @@ func TestSerialReap(t *testing.T) { txs = append(txs, txBytes) } if err := mempool.Update(newTestBlock(0, txs), - abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil { + abciResponses(len(txs), abci.CodeTypeOK), nil); err != nil { t.Error(err) } } @@ -542,7 +534,7 @@ func TestMempoolTxsBytes(t *testing.T) { // 3. zero again after tx is removed by Update err = mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x01}}), - abciResponses(1, abci.CodeTypeOK), nil, nil) + abciResponses(1, abci.CodeTypeOK), nil) require.NoError(t, err) assert.EqualValues(t, 0, mempool.TxsBytes()) @@ -592,7 +584,7 @@ func TestMempoolTxsBytes(t *testing.T) { require.NotEmpty(t, res2.Data) // Pretend like we committed nothing so txBytes gets rechecked and removed. - err = mempool.Update(newTestBlock(1, []types.Tx{}), abciResponses(0, abci.CodeTypeOK), nil, nil) + err = mempool.Update(newTestBlock(1, []types.Tx{}), abciResponses(0, abci.CodeTypeOK), nil) require.NoError(t, err) assert.EqualValues(t, 0, mempool.TxsBytes()) diff --git a/mempool/mempool.go b/mempool/mempool.go index 799f88b4a..336a4eb98 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -42,7 +42,6 @@ type Mempool interface { block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx, newPreFn PreCheckFunc, - newPostFn PostCheckFunc, ) error // Flush removes all transactions from the mempool and cache @@ -79,11 +78,6 @@ type Mempool interface { // transaction doesn't exceeded the block size. type PreCheckFunc func(types.Tx) error -// PostCheckFunc is an optional filter executed after CheckTx and rejects -// transaction if false is returned. An example would be to ensure a -// transaction doesn't require more gas than available for the block. -type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error - // TxInfo are parameters that get passed when attempting to add a tx to the // mempool. type TxInfo struct { @@ -108,22 +102,3 @@ func PreCheckMaxBytes(maxBytes int64) PreCheckFunc { return nil } } - -// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed -// maxGas. Returns nil if maxGas is -1. -func PostCheckMaxGas(maxGas int64) PostCheckFunc { - return func(tx types.Tx, res *abci.ResponseCheckTx) error { - if maxGas == -1 { - return nil - } - if res.GasWanted < 0 { - return fmt.Errorf("gas wanted %d is negative", - res.GasWanted) - } - if res.GasWanted > maxGas { - return fmt.Errorf("gas wanted %d is greater than max gas %d", - res.GasWanted, maxGas) - } - return nil - } -} diff --git a/mempool/mock/mempool.go b/mempool/mock/mempool.go index de0b65d4c..9b9f9efe3 100644 --- a/mempool/mock/mempool.go +++ b/mempool/mock/mempool.go @@ -24,7 +24,6 @@ func (Mempool) Update( _ *types.Block, _ []*abci.ResponseDeliverTx, _ mempl.PreCheckFunc, - _ mempl.PostCheckFunc, ) error { return nil } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index d4cc3d03a..6725c9988 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -102,7 +102,7 @@ func TestReactorConcurrency(t *testing.T) { for i := range txs { deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0} } - err := reactors[0].mempool.Update(newTestBlock(1, txs), deliverTxResponses, nil, nil) + err := reactors[0].mempool.Update(newTestBlock(1, txs), deliverTxResponses, nil) assert.NoError(t, err) }() @@ -115,7 +115,7 @@ func TestReactorConcurrency(t *testing.T) { reactors[1].mempool.Lock() defer reactors[1].mempool.Unlock() err := reactors[1].mempool.Update(newTestBlock(1, []types.Tx{}), - make([]*abci.ResponseDeliverTx, 0), nil, nil) + make([]*abci.ResponseDeliverTx, 0), nil) assert.NoError(t, err) }() diff --git a/node/node.go b/node/node.go index a742eb2f4..a4c49493b 100644 --- a/node/node.go +++ b/node/node.go @@ -323,7 +323,6 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, state.LastBlockHeight, mempl.WithMetrics(memplMetrics), mempl.WithPreCheck(sm.TxPreCheck(state)), - mempl.WithPostCheck(sm.TxPostCheck(state)), ) mempoolLogger := logger.With("module", "mempool") mempoolReactor := mempl.NewReactor(config.Mempool, mempool) diff --git a/node/node_test.go b/node/node_test.go index f94c7edfe..2c56948be 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -249,7 +249,6 @@ func TestCreateProposalBlock(t *testing.T) { state.LastBlockHeight, mempl.WithMetrics(memplMetrics), mempl.WithPreCheck(sm.TxPreCheck(state)), - mempl.WithPostCheck(sm.TxPostCheck(state)), ) mempool.SetLogger(logger) @@ -341,7 +340,6 @@ func TestMaxProposalBlockSize(t *testing.T) { state.LastBlockHeight, mempl.WithMetrics(memplMetrics), mempl.WithPreCheck(sm.TxPreCheck(state)), - mempl.WithPostCheck(sm.TxPostCheck(state)), ) mempool.SetLogger(logger) diff --git a/state/execution.go b/state/execution.go index 463904097..d92f13944 100644 --- a/state/execution.go +++ b/state/execution.go @@ -253,7 +253,6 @@ func (blockExec *BlockExecutor) Commit( block, deliverTxResponses, TxPreCheck(state), - TxPostCheck(state), ) updateMempoolEndTime := time.Now().UnixNano() diff --git a/state/tx_filter.go b/state/tx_filter.go index 66bb14943..aa940badd 100644 --- a/state/tx_filter.go +++ b/state/tx_filter.go @@ -14,9 +14,3 @@ func TxPreCheck(state State) mempl.PreCheckFunc { ) return mempl.PreCheckMaxBytes(maxDataBytes) } - -// TxPostCheck returns a function to filter transactions after processing. -// The function limits the gas wanted by a transaction to the block's maximum total gas. -func TxPostCheck(state State) mempl.PostCheckFunc { - return mempl.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas) -} diff --git a/test/maverick/consensus/replay_stubs.go b/test/maverick/consensus/replay_stubs.go index 5c94c9fce..adab0edf8 100644 --- a/test/maverick/consensus/replay_stubs.go +++ b/test/maverick/consensus/replay_stubs.go @@ -27,7 +27,6 @@ func (emptyMempool) Update( _ *types.Block, _ []*abci.ResponseDeliverTx, _ mempl.PreCheckFunc, - _ mempl.PostCheckFunc, ) error { return nil } diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 73df87d5a..a6f73ed65 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -370,7 +370,6 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, state.LastBlockHeight, mempl.WithMetrics(memplMetrics), mempl.WithPreCheck(sm.TxPreCheck(state)), - mempl.WithPostCheck(sm.TxPostCheck(state)), ) mempoolLogger := logger.With("module", "mempool") mempoolReactor := mempl.NewReactor(config.Mempool, mempool)