Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

interop: Ingress Filtering for Interop Enabled Mempool #422

Merged
merged 5 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ var (
utils.RollupHistoricalRPCFlag,
utils.RollupHistoricalRPCTimeoutFlag,
utils.RollupInteropRPCFlag,
utils.RollupInteropMempoolFilteringFlag,
utils.RollupDisableTxPoolGossipFlag,
utils.RollupComputePendingBlock,
utils.RollupHaltOnIncompatibleProtocolVersionFlag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,12 @@ var (
Category: flags.RollupCategory,
}

RollupInteropMempoolFilteringFlag = &cli.BoolFlag{
Name: "rollup.interopmempoolfiltering",
Usage: "If using interop, transactions are checked for interop validity before being added to the mempool (experimental).",
Category: flags.RollupCategory,
}

RollupDisableTxPoolGossipFlag = &cli.BoolFlag{
Name: "rollup.disabletxpoolgossip",
Usage: "Disable transaction pool gossip.",
Expand Down Expand Up @@ -1950,6 +1956,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.IsSet(RollupInteropRPCFlag.Name) {
cfg.InteropMessageRPC = ctx.String(RollupInteropRPCFlag.Name)
}
if ctx.IsSet(RollupInteropMempoolFilteringFlag.Name) {
cfg.InteropMempoolFiltering = ctx.Bool(RollupInteropMempoolFilteringFlag.Name)
}
cfg.RollupDisableTxPoolGossip = ctx.Bool(RollupDisableTxPoolGossipFlag.Name)
cfg.RollupDisableTxPoolAdmission = cfg.RollupSequencerHTTP != "" && !ctx.Bool(RollupEnableTxPoolAdmissionFlag.Name)
cfg.RollupHaltOnIncompatibleProtocolVersion = ctx.String(RollupHaltOnIncompatibleProtocolVersionFlag.Name)
Expand Down
4 changes: 4 additions & 0 deletions core/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ var (
// current network configuration.
ErrTxTypeNotSupported = types.ErrTxTypeNotSupported

// ErrTxFilteredOut indicates an ingress filter has rejected the transaction from
// being included in the pool.
ErrTxFilteredOut = errors.New("transaction filtered out")

// ErrTipAboveFeeCap is a sanity error to ensure no one is able to specify a
// transaction with a tip higher than the total fee cap.
ErrTipAboveFeeCap = errors.New("max priority fee per gas higher than max fee per gas")
Expand Down
57 changes: 57 additions & 0 deletions core/txpool/ingress_filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package txpool

import (
"context"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/interoptypes"
"github.com/ethereum/go-ethereum/log"
)

// IngressFilter is an interface that allows filtering of transactions before they are added to the transaction pool.
// Implementations of this interface can be used to filter transactions based on various criteria.
// FilterTx will return true if the transaction should be allowed, and false if it should be rejected.
type IngressFilter interface {
FilterTx(ctx context.Context, tx *types.Transaction) bool
}

type interopFilter struct {
logsFn func(tx *types.Transaction) ([]*types.Log, error)
checkFn func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error
}

func NewInteropFilter(
logsFn func(tx *types.Transaction) ([]*types.Log, error),
checkFn func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error) IngressFilter {
return &interopFilter{
logsFn: logsFn,
checkFn: checkFn,
}
}

// FilterTx implements IngressFilter.FilterTx
// it gets logs checks for message safety based on the function provided
func (f *interopFilter) FilterTx(ctx context.Context, tx *types.Transaction) bool {
logs, err := f.logsFn(tx)
if err != nil {
log.Debug("Failed to retrieve logs of tx", "txHash", tx.Hash(), "err", err)
return false // default to deny if logs cannot be retrieved
}
if len(logs) == 0 {
return true // default to allow if there are no logs
}
ems, err := interoptypes.ExecutingMessagesFromLogs(logs)
if err != nil {
log.Debug("Failed to parse executing messages of tx", "txHash", tx.Hash(), "err", err)
return false // default to deny if logs cannot be parsed
}
if len(ems) == 0 {
return true // default to allow if there are no executing messages
}

ctx, cancel := context.WithTimeout(ctx, time.Second*2)
defer cancel()
// check with the supervisor if the transaction should be allowed given the executing messages
return f.checkFn(ctx, ems, interoptypes.Unsafe) == nil
}
188 changes: 188 additions & 0 deletions core/txpool/ingress_filters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package txpool

import (
"context"
"errors"
"math/big"
"net"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/interoptypes"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)

func TestInteropFilter(t *testing.T) {
// some placeholder transaction to test with
tx := types.NewTx(&types.DynamicFeeTx{
ChainID: big.NewInt(1),
Nonce: 1,
To: &common.Address{},
Value: big.NewInt(1),
Data: []byte{},
})
t.Run("Tx has no logs", func(t *testing.T) {
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
return []*types.Log{}, nil
}
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
// make this return error, but it won't be called because logs are empty
return errors.New("error")
}
// when there are no logs to process, the transaction should be allowed
filter := NewInteropFilter(logFn, checkFn)
require.True(t, filter.FilterTx(context.Background(), tx))
})
t.Run("Tx errored when getting logs", func(t *testing.T) {
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
return []*types.Log{}, errors.New("error")
}
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
// make this return error, but it won't be called because logs retrieval errored
return errors.New("error")
}
// when log retrieval errors, the transaction should be denied
filter := NewInteropFilter(logFn, checkFn)
require.False(t, filter.FilterTx(context.Background(), tx))
})
t.Run("Tx has no executing messages", func(t *testing.T) {
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
l1 := &types.Log{
Topics: []common.Hash{common.BytesToHash([]byte("topic1"))},
}
return []*types.Log{l1}, nil
}
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
// make this return error, but it won't be called because logs retrieval doesn't have executing messages
return errors.New("error")
}
// when no executing messages are included, the transaction should be allowed
filter := NewInteropFilter(logFn, checkFn)
require.True(t, filter.FilterTx(context.Background(), tx))
})
t.Run("Tx has valid executing message", func(t *testing.T) {
// build a basic executing message
// the executing message must pass basic decode validation,
// but the validity check is done by the checkFn
l1 := &types.Log{
Address: params.InteropCrossL2InboxAddress,
Topics: []common.Hash{
common.BytesToHash(interoptypes.ExecutingMessageEventTopic[:]),
common.BytesToHash([]byte("payloadHash")),
},
Data: []byte{},
}
// using all 0s for data allows all takeZeros to pass
for i := 0; i < 32*5; i++ {
l1.Data = append(l1.Data, 0)
}
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
return []*types.Log{l1}, nil
}
var spyEMs []interoptypes.Message
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
spyEMs = ems
return nil
}
// when there is one executing message, the transaction should be allowed
// if the checkFn returns nil
filter := NewInteropFilter(logFn, checkFn)
require.True(t, filter.FilterTx(context.Background(), tx))
// confirm that one executing message was passed to the checkFn
require.Equal(t, 1, len(spyEMs))
})
t.Run("Tx has invalid executing message", func(t *testing.T) {
// build a basic executing message
// the executing message must pass basic decode validation,
// but the validity check is done by the checkFn
l1 := &types.Log{
Address: params.InteropCrossL2InboxAddress,
Topics: []common.Hash{
common.BytesToHash(interoptypes.ExecutingMessageEventTopic[:]),
common.BytesToHash([]byte("payloadHash")),
},
Data: []byte{},
}
// using all 0s for data allows all takeZeros to pass
for i := 0; i < 32*5; i++ {
l1.Data = append(l1.Data, 0)
}
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
return []*types.Log{l1}, nil
}
var spyEMs []interoptypes.Message
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
spyEMs = ems
return errors.New("error")
}
// when there is one executing message, and the checkFn returns an error,
// (ie the supervisor rejects the transaction) the transaction should be denied
filter := NewInteropFilter(logFn, checkFn)
require.False(t, filter.FilterTx(context.Background(), tx))
// confirm that one executing message was passed to the checkFn
require.Equal(t, 1, len(spyEMs))
})
}

func TestInteropFilterRPCFailures(t *testing.T) {
tests := []struct {
name string
networkErr bool
timeout bool
invalidResp bool
}{
{
name: "Network Error",
networkErr: true,
},
{
name: "Timeout",
timeout: true,
},
{
name: "Invalid Response",
invalidResp: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create mock log function that always returns our test log
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
log := &types.Log{
Address: params.InteropCrossL2InboxAddress,
Topics: []common.Hash{
common.BytesToHash(interoptypes.ExecutingMessageEventTopic[:]),
common.BytesToHash([]byte("payloadHash")),
},
Data: make([]byte, 32*5),
}
return []*types.Log{log}, nil
}

// Create mock check function that simulates RPC failures
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
if tt.networkErr {
return &net.OpError{Op: "dial", Err: errors.New("connection refused")}
}

if tt.timeout {
return context.DeadlineExceeded
}

if tt.invalidResp {
return errors.New("invalid response format")
}

return nil
}

// Create and test filter
filter := NewInteropFilter(logFn, checkFn)
result := filter.FilterTx(context.Background(), &types.Transaction{})
require.Equal(t, false, result, "FilterTx result mismatch")
})
}
}
42 changes: 36 additions & 6 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package txpool

import (
"context"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -77,22 +78,32 @@ type TxPool struct {
term chan struct{} // Termination channel to detect a closed pool

sync chan chan error // Testing / simulator channel to block until internal reset is done

ingressFilters []IngressFilter // List of filters to apply to incoming transactions

filterCtx context.Context // Filters may use external resources
filterCancel context.CancelFunc // Filter calls are cancelled on shutdown
}

// New creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
func New(gasTip uint64, chain BlockChain, subpools []SubPool, poolFilters []IngressFilter) (*TxPool, error) {
// Retrieve the current head so that all subpools and this main coordinator
// pool will have the same starting state, even if the chain moves forward
// during initialization.
head := chain.CurrentBlock()

filterCtx, filterCancel := context.WithCancel(context.Background())

pool := &TxPool{
subpools: subpools,
reservations: make(map[common.Address]SubPool),
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
subpools: subpools,
reservations: make(map[common.Address]SubPool),
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
ingressFilters: poolFilters,
filterCtx: filterCtx,
filterCancel: filterCancel,
}
for i, subpool := range subpools {
if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil {
Expand Down Expand Up @@ -156,6 +167,8 @@ func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver {
func (p *TxPool) Close() error {
var errs []error

p.filterCancel() // Cancel filter work, these in-flight txs will be not be allowed through before shutdown

// Terminate the reset loop and wait for it to finish
errc := make(chan error)
p.quit <- errc
Expand Down Expand Up @@ -319,11 +332,23 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
// so we can piece back the returned errors into the original order.
txsets := make([][]*types.Transaction, len(p.subpools))
splits := make([]int, len(txs))
filtered_out := make([]bool, len(txs))

for i, tx := range txs {
// Mark this transaction belonging to no-subpool
splits[i] = -1

// Filter the transaction through the ingress filters
for _, f := range p.ingressFilters {
if !f.FilterTx(p.filterCtx, tx) {
filtered_out[i] = true
}
}
// if the transaction is filtered out, don't add it to any subpool
if filtered_out[i] {
continue
}

// Try to find a subpool that accepts the transaction
for j, subpool := range p.subpools {
if subpool.Filter(tx) {
Expand All @@ -341,6 +366,11 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
}
errs := make([]error, len(txs))
for i, split := range splits {
// If the transaction was filtered out, mark it as such
if filtered_out[i] {
errs[i] = core.ErrTxFilteredOut
continue
}
// If the transaction was rejected by all subpools, mark it unsupported
if split == -1 {
errs[i] = core.ErrTxTypeNotSupported
Expand Down
Loading