From 0bbcfdd6ea677d3c7952d88ef25d72ae656ac44c Mon Sep 17 00:00:00 2001 From: axelKingsley Date: Tue, 29 Oct 2024 15:53:47 -0500 Subject: [PATCH] Ingress Filtering for Interop Enabled Mempool --- core/error.go | 4 ++ core/txpool/ingress_filters.go | 52 +++++++++++++++++++++ core/txpool/ingress_filters_test.go | 70 +++++++++++++++++++++++++++++ core/txpool/txpool.go | 32 ++++++++++--- eth/backend.go | 55 ++++++++++++++++++++++- eth/protocols/eth/handler_test.go | 2 +- miner/miner_test.go | 2 +- miner/payload_building_test.go | 2 +- 8 files changed, 209 insertions(+), 10 deletions(-) create mode 100644 core/txpool/ingress_filters.go create mode 100644 core/txpool/ingress_filters_test.go diff --git a/core/error.go b/core/error.go index e6ad999bdd..b8b00121eb 100644 --- a/core/error.go +++ b/core/error.go @@ -84,6 +84,10 @@ var ( // current network configuration. ErrTxTypeNotSupported = types.ErrTxTypeNotSupported + // ErrTxFilteredOut indicates an ingress filter has rejected the transaction from + // being included in the pool. + ErrTxFilteredOut = errors.New("transaction filtered out") + // ErrTipAboveFeeCap is a sanity error to ensure no one is able to specify a // transaction with a tip higher than the total fee cap. ErrTipAboveFeeCap = errors.New("max priority fee per gas higher than max fee per gas") diff --git a/core/txpool/ingress_filters.go b/core/txpool/ingress_filters.go new file mode 100644 index 0000000000..b25a6f87ae --- /dev/null +++ b/core/txpool/ingress_filters.go @@ -0,0 +1,52 @@ +package txpool + +import ( + "context" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/interoptypes" +) + +// IngressFilter is an interface that allows filtering of transactions before they are added to the transaction pool. +// Implementations of this interface can be used to filter transactions based on various criteria. +// FilterTx will return true if the transaction should be allowed, and false if it should be rejected. +type IngressFilter interface { + FilterTx(tx *types.Transaction) bool +} + +type interopFilter struct { + logsFn func(tx *types.Transaction) ([]*types.Log, error) + checkFn func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error +} + +func NewInteropFilter( + logsFn func(tx *types.Transaction) ([]*types.Log, error), + checkFn func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error) IngressFilter { + return &interopFilter{ + logsFn: logsFn, + checkFn: checkFn, + } +} + +// FilterTx implements IngressFilter.FilterTx +// it gets logs checks for message safety based on the function provided +func (f *interopFilter) FilterTx(tx *types.Transaction) bool { + logs, err := f.logsFn(tx) + if err != nil { + return true // default to allow if logs cannot be retrieved + } + if len(logs) == 0 { + return true // default to allow if there are no logs + } + ems, err := interoptypes.ExecutingMessagesFromLogs(logs) + if err != nil { + return true // default to allow if logs cannot be parsed + } + if len(ems) == 0 { + return true // default to allow if there are no executing messages + } + + // check with the supervisor if the transaction should be allowed given the executing messages + ctx := context.Background() + return f.checkFn(ctx, ems, interoptypes.Unsafe) == nil +} diff --git a/core/txpool/ingress_filters_test.go b/core/txpool/ingress_filters_test.go new file mode 100644 index 0000000000..271bd0f71e --- /dev/null +++ b/core/txpool/ingress_filters_test.go @@ -0,0 +1,70 @@ +package txpool + +import ( + "context" + "errors" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/interoptypes" + "github.com/stretchr/testify/require" +) + +func TestInteropFilter(t *testing.T) { + t.Run("Tx has no logs", func(t *testing.T) { + logFn := func(tx *types.Transaction) ([]*types.Log, error) { + return []*types.Log{}, nil + } + checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error { + // make this return error, but it won't be called because logs are empty + return errors.New("error") + } + // when there are no logs to process, the transaction should be allowed + filter := NewInteropFilter(logFn, checkFn) + require.True(t, filter.FilterTx(&types.Transaction{})) + }) + t.Run("Tx errored when getting logs", func(t *testing.T) { + logFn := func(tx *types.Transaction) ([]*types.Log, error) { + return []*types.Log{}, errors.New("error") + } + checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error { + // make this return error, but it won't be called because logs retrieval errored + return errors.New("error") + } + // when log retrieval errors, the transaction should be allowed + filter := NewInteropFilter(logFn, checkFn) + require.True(t, filter.FilterTx(&types.Transaction{})) + }) + t.Run("Tx has no executing messages", func(t *testing.T) { + logFn := func(tx *types.Transaction) ([]*types.Log, error) { + l1 := &types.Log{ + Topics: []common.Hash{common.BytesToHash([]byte("topic1"))}, + } + return []*types.Log{l1}, errors.New("error") + } + checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error { + // make this return error, but it won't be called because logs retrieval doesn't have executing messages + return errors.New("error") + } + // when no executing messages are included, the transaction should be allowed + filter := NewInteropFilter(logFn, checkFn) + require.True(t, filter.FilterTx(&types.Transaction{})) + }) + t.Run("Tx has valid executing message", func(t *testing.T) { + logFn := func(tx *types.Transaction) ([]*types.Log, error) { + // TODO: make executing messages here + l1 := &types.Log{ + Topics: []common.Hash{common.BytesToHash([]byte("topic1"))}, + } + return []*types.Log{l1}, errors.New("error") + } + checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error { + // make this return error, but it won't be called because logs retrieval doesn't have executing messages + return nil + } + // when no executing messages are included, the transaction should be allowed + filter := NewInteropFilter(logFn, checkFn) + require.True(t, filter.FilterTx(&types.Transaction{})) + }) +} diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index ff31ffc200..0f8d14d227 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -77,22 +77,25 @@ type TxPool struct { term chan struct{} // Termination channel to detect a closed pool sync chan chan error // Testing / simulator channel to block until internal reset is done + + ingressFilters []IngressFilter // List of filters to apply to incoming transactions } // New creates a new transaction pool to gather, sort and filter inbound // transactions from the network. -func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { +func New(gasTip uint64, chain BlockChain, subpools []SubPool, poolFilters []IngressFilter) (*TxPool, error) { // Retrieve the current head so that all subpools and this main coordinator // pool will have the same starting state, even if the chain moves forward // during initialization. head := chain.CurrentBlock() pool := &TxPool{ - subpools: subpools, - reservations: make(map[common.Address]SubPool), - quit: make(chan chan error), - term: make(chan struct{}), - sync: make(chan chan error), + subpools: subpools, + reservations: make(map[common.Address]SubPool), + quit: make(chan chan error), + term: make(chan struct{}), + sync: make(chan chan error), + ingressFilters: poolFilters, } for i, subpool := range subpools { if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil { @@ -319,11 +322,23 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { // so we can piece back the returned errors into the original order. txsets := make([][]*types.Transaction, len(p.subpools)) splits := make([]int, len(txs)) + filtered_out := make([]bool, len(txs)) for i, tx := range txs { // Mark this transaction belonging to no-subpool splits[i] = -1 + // Filter the transaction through the ingress filters + for _, f := range p.ingressFilters { + if !f.FilterTx(tx) { + filtered_out[i] = true + } + } + // if the transaction is filtered out, don't add it to any subpool + if filtered_out[i] { + continue + } + // Try to find a subpool that accepts the transaction for j, subpool := range p.subpools { if subpool.Filter(tx) { @@ -341,6 +356,11 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { } errs := make([]error, len(txs)) for i, split := range splits { + // If the transaction was filtered out, mark it as such + if filtered_out[i] { + errs[i] = core.ErrTxFilteredOut + continue + } // If the transaction was rejected by all subpools, mark it unsupported if split == -1 { errs[i] = core.ErrTxTypeNotSupported diff --git a/eth/backend.go b/eth/backend.go index 9c83e64952..ada43eaead 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -275,7 +275,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { blobPool := blobpool.New(config.BlobPool, eth.blockchain) txPools = append(txPools, blobPool) } - eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, txPools) + // if interop is enabled, establish an Interop Filter connected to this Ethereum instance's + // simulated logs and message safety check functions + poolFilters := []txpool.IngressFilter{} + if config.InteropMessageRPC != "" { + poolFilters = append(poolFilters, txpool.NewInteropFilter(eth.SimLogs, eth.CheckMessages)) + } + eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, txPools, poolFilters) if err != nil { return nil, err } @@ -567,3 +573,50 @@ func (s *Ethereum) CheckMessages(ctx context.Context, messages []interoptypes.Me } return s.interopRPC.CheckMessages(ctx, messages, minSafety) } + +// simLogs simulates the logs that would be generated by a transaction if it were executed on the current state. +// This is used by the interop filter to determine if a transaction should be allowed. +// if errors are encountered, no logs are returned. +func (s *Ethereum) SimLogs(tx *types.Transaction) ([]*types.Log, error) { + // prepare all necessary resources for the transaction simulation + state, err := s.BlockChain().State() + header := s.BlockChain().CurrentBlock() + if err != nil { + return nil, err + } + var vmConf vm.Config + chainConfig := s.APIBackend.ChainConfig() + signer := types.MakeSigner(chainConfig, header.Number, header.Time) + message, err := core.TransactionToMessage(tx, signer, header.BaseFee) + if err != nil { + return nil, err + } + ctx := context.Background() + chainCtx := ethapi.NewChainContext(ctx, s.APIBackend) + blockCtx := core.NewEVMBlockContext( + header, + chainCtx, + nil, + chainConfig, + state) + txCtx := core.NewEVMTxContext(message) + vmenv := vm.NewEVM( + blockCtx, + txCtx, + state, + chainConfig, + vmConf, + ) + + // prepare the state and apply the transaction to it + state.SetTxContext(tx.Hash(), 0) + _, err = core.ApplyMessage( + vmenv, + message, + new(core.GasPool).AddGas(message.GasLimit)) + if err != nil { + return nil, err + } + + return state.GetLogs(tx.Hash(), header.Number.Uint64(), header.Hash()), nil +} diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index fc82b42947..0acd50ed5b 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -117,7 +117,7 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int, txconfig.Journal = "" // Don't litter the disk with test journals pool := legacypool.New(txconfig, chain) - txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{pool}) + txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{pool}, nil) return &testBackend{ db: db, diff --git a/miner/miner_test.go b/miner/miner_test.go index 46a14e97f3..d365ee18ac 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -164,7 +164,7 @@ func createMiner(t *testing.T) *Miner { blockchain := &testBlockChain{bc.Genesis().Root(), chainConfig, statedb, 10000000, new(event.Feed)} pool := legacypool.New(testTxPoolConfig, blockchain) - txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, blockchain, []txpool.SubPool{pool}) + txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, blockchain, []txpool.SubPool{pool}, nil) // Create Miner backend := NewMockBackend(bc, txpool) diff --git a/miner/payload_building_test.go b/miner/payload_building_test.go index bf76a2040e..af7e2a37fc 100644 --- a/miner/payload_building_test.go +++ b/miner/payload_building_test.go @@ -135,7 +135,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine t.Fatalf("core.NewBlockChain failed: %v", err) } pool := legacypool.New(testTxPoolConfig, chain) - txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, chain, []txpool.SubPool{pool}) + txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, chain, []txpool.SubPool{pool}, nil) return &testWorkerBackend{ db: db,