From aff95b4b9d16544774338a08433b8bf6b2c2920e Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Mon, 14 Feb 2022 15:10:49 +0200 Subject: [PATCH 1/3] initial: bloom filters --- arbitrum/apibackend.go | 7 ++++-- arbitrum/backend.go | 27 +++++++++++++++++------ arbitrum/bloom.go | 46 +++++++++++++++++++++++++++++++++++++++ eth/bloombits.go | 32 ++++++++++++++++----------- params/config_arbitrum.go | 5 +++++ 5 files changed, 95 insertions(+), 22 deletions(-) create mode 100644 arbitrum/bloom.go diff --git a/arbitrum/apibackend.go b/arbitrum/apibackend.go index ee3fda639a50..75e86d7e744e 100644 --- a/arbitrum/apibackend.go +++ b/arbitrum/apibackend.go @@ -257,7 +257,8 @@ func (a *APIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs // Filter API func (a *APIBackend) BloomStatus() (uint64, uint64) { - return params.BloomBitsBlocks, 0 // TODO: Implement second return value + sections, _, _ := a.b.bloomIndexer.Sections() + return params.ArbBloomBitsBlocks, sections } func (a *APIBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) { @@ -273,7 +274,9 @@ func (a *APIBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*t } func (a *APIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { - panic("not implemented") // TODO: Implement + for i := 0; i < bloomFilterThreads; i++ { + go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, a.b.bloomRequests) + } } func (a *APIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { diff --git a/arbitrum/backend.go b/arbitrum/backend.go index b5eaba4b1b1e..ba1d771396e1 100644 --- a/arbitrum/backend.go +++ b/arbitrum/backend.go @@ -6,10 +6,12 @@ import ( "time" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/params" ) type Backend struct { @@ -22,6 +24,9 @@ type Backend struct { txFeed event.Feed scope event.SubscriptionScope + bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests + bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports + chanTxs chan *types.Transaction chanClose chan struct{} //close coroutine chanNewBlock chan struct{} //create new L2 block unless empty @@ -46,16 +51,22 @@ var DefaultConfig = Config{ func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, blockChain *core.BlockChain, publisher ArbInterface) (*Backend, error) { backend := &Backend{ - arb: publisher, - stack: stack, - config: config, - chainDb: chainDb, + arb: publisher, + stack: stack, + config: config, + chainDb: chainDb, + + bloomRequests: make(chan chan *bloombits.Retrieval), + bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), + chanTxs: make(chan *types.Transaction, 100), - chanClose: make(chan struct{}, 1), + chanClose: make(chan struct{}), chanNewBlock: make(chan struct{}, 1), } stack.RegisterLifecycle(backend) + backend.bloomIndexer.Start(backend.arb.BlockChain()) + createRegisterAPIBackend(backend) return backend, nil } @@ -86,12 +97,14 @@ func (b *Backend) ArbInterface() ArbInterface { //TODO: this is used when registering backend as lifecycle in stack func (b *Backend) Start() error { + b.startBloomHandlers(params.ArbBloomBitsBlocks) + return nil } func (b *Backend) Stop() error { - b.scope.Close() - + b.bloomIndexer.Close() + close(b.chanClose) return nil } diff --git a/arbitrum/bloom.go b/arbitrum/bloom.go new file mode 100644 index 000000000000..7946911e3aca --- /dev/null +++ b/arbitrum/bloom.go @@ -0,0 +1,46 @@ +package arbitrum + +import ( + "time" + + "github.com/ethereum/go-ethereum/eth" +) + +const ( + // bloomServiceThreads is the number of goroutines used globally by an Ethereum + // instance to service bloombits lookups for all running filters. + bloomServiceThreads = 16 + + // bloomFilterThreads is the number of goroutines used locally per filter to + // multiplex requests onto the global servicing goroutines. + bloomFilterThreads = 3 + + // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service + // in a single batch. + bloomRetrievalBatch = 16 + + // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests + // to accumulate request an entire batch (avoiding hysteresis). + bloomRetrievalWait = time.Duration(0) +) + +// startBloomHandlers starts a batch of goroutines to accept bloom bit database +// retrievals from possibly a range of filters and serving the data to satisfy. +func (b *Backend) startBloomHandlers(sectionSize uint64) { + for i := 0; i < bloomServiceThreads; i++ { + go func() { + for { + select { + case _, more := <-b.chanClose: + if !more { + return + } + case request := <-b.bloomRequests: + task := <-request + eth.ServeBloombitRetrieval(task, b.chainDb, sectionSize) + request <- task + } + } + }() + } +} diff --git a/eth/bloombits.go b/eth/bloombits.go index 0cb7050d2327..1f8ae83fc1d1 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -20,7 +20,9 @@ import ( "time" "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" ) const ( @@ -53,22 +55,26 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { case request := <-eth.bloomRequests: task := <-request - task.Bitsets = make([][]byte, len(task.Sections)) - for i, section := range task.Sections { - head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*sectionSize-1) - if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil { - if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil { - task.Bitsets[i] = blob - } else { - task.Error = err - } - } else { - task.Error = err - } - } + ServeBloombitRetrieval(task, eth.chainDb, sectionSize) request <- task } } }() } } + +func ServeBloombitRetrieval(task *bloombits.Retrieval, chainDb ethdb.Database, sectionSize uint64) { + task.Bitsets = make([][]byte, len(task.Sections)) + for i, section := range task.Sections { + head := rawdb.ReadCanonicalHash(chainDb, (section+1)*sectionSize-1) + if compVector, err := rawdb.ReadBloomBits(chainDb, task.Bit, section, head); err == nil { + if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil { + task.Bitsets[i] = blob + } else { + task.Error = err + } + } else { + task.Error = err + } + } +} diff --git a/params/config_arbitrum.go b/params/config_arbitrum.go index 1e37c528907b..9132ab29ec97 100644 --- a/params/config_arbitrum.go +++ b/params/config_arbitrum.go @@ -109,3 +109,8 @@ func ArbitrumTestChainConfig() *ChainConfig { }, } } + +const ( + // Arbitrum blocks are usually smaller, so use more blocks per bloom section + ArbBloomBitsBlocks uint64 = BloomBitsBlocks * 16 +) From d28eb5e62f1e030ac702998a3a11ab86f6d05080 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Tue, 12 Apr 2022 13:38:07 +0300 Subject: [PATCH 2/3] arb-backend doesn't register to stack start/stop handled by node --- arbitrum/backend.go | 1 - 1 file changed, 1 deletion(-) diff --git a/arbitrum/backend.go b/arbitrum/backend.go index 986851844fa7..bbe0a553c3d8 100644 --- a/arbitrum/backend.go +++ b/arbitrum/backend.go @@ -43,7 +43,6 @@ func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, publis chanClose: make(chan struct{}), chanNewBlock: make(chan struct{}, 1), } - stack.RegisterLifecycle(backend) backend.bloomIndexer.Start(backend.arb.BlockChain()) From a37abe1be5bdfadf8560c3c35502f5b242e5ffa8 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Tue, 12 Apr 2022 13:38:55 +0300 Subject: [PATCH 3/3] use proper config for arbitrum bloombits --- arbitrum/apibackend.go | 2 +- arbitrum/backend.go | 6 +++--- arbitrum/config.go | 18 ++++++++++++++---- params/config_arbitrum.go | 5 ----- 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/arbitrum/apibackend.go b/arbitrum/apibackend.go index 3aba1968ace5..36cd25c5660b 100644 --- a/arbitrum/apibackend.go +++ b/arbitrum/apibackend.go @@ -288,7 +288,7 @@ func (a *APIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs // Filter API func (a *APIBackend) BloomStatus() (uint64, uint64) { sections, _, _ := a.b.bloomIndexer.Sections() - return params.ArbBloomBitsBlocks, sections + return a.b.config.BloomBitsBlocks, sections } func (a *APIBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) { diff --git a/arbitrum/backend.go b/arbitrum/backend.go index bbe0a553c3d8..c726e33df635 100644 --- a/arbitrum/backend.go +++ b/arbitrum/backend.go @@ -2,13 +2,13 @@ package arbitrum import ( "context" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/params" ) type Backend struct { @@ -37,7 +37,7 @@ func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, publis chainDb: chainDb, bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), + bloomIndexer: core.NewBloomIndexer(chainDb, config.BloomBitsBlocks, config.BloomConfirms), chanTxs: make(chan *types.Transaction, 100), chanClose: make(chan struct{}), @@ -76,7 +76,7 @@ func (b *Backend) ArbInterface() ArbInterface { //TODO: this is used when registering backend as lifecycle in stack func (b *Backend) Start() error { - b.startBloomHandlers(params.ArbBloomBitsBlocks) + b.startBloomHandlers(b.config.BloomBitsBlocks) return nil } diff --git a/arbitrum/config.go b/arbitrum/config.go index 1c2ac294500d..5bd1f0a84ace 100644 --- a/arbitrum/config.go +++ b/arbitrum/config.go @@ -1,10 +1,12 @@ package arbitrum import ( + "time" + "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/params" flag "github.com/spf13/pflag" ) -import "time" type Config struct { // RPCGasCap is the global gas cap for eth-call variants. @@ -16,16 +18,24 @@ type Config struct { // RPCEVMTimeout is the global timeout for eth-call. RPCEVMTimeout time.Duration `koanf:"evm-timeout"` + + // Parameters for the bloom indexer + BloomBitsBlocks uint64 `koanf:"bloom-bits-blocks"` + BloomConfirms uint64 `koanf:"bloom-confirms"` } func ConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".gas-cap", DefaultConfig.RPCGasCap, "cap on computation gas that can be used in eth_call/estimateGas (0=infinite)") f.Float64(prefix+".tx-fee-cap", DefaultConfig.RPCTxFeeCap, "cap on transaction fee (in ether) that can be sent via the RPC APIs (0 = no cap)") f.Duration(prefix+".evm-timeout", DefaultConfig.RPCEVMTimeout, "timeout used for eth_call (0=infinite)") + f.Uint64(prefix+".bloom-bits-blocks", DefaultConfig.BloomBitsBlocks, "number of blocks a single bloom bit section vector holds") + f.Uint64(prefix+".bloom-confirms", DefaultConfig.BloomConfirms, "number of confirmations before indexing new bloom filters") } var DefaultConfig = Config{ - RPCGasCap: ethconfig.Defaults.RPCGasCap, // 50,000,000 - RPCTxFeeCap: ethconfig.Defaults.RPCTxFeeCap, // 1 ether - RPCEVMTimeout: ethconfig.Defaults.RPCEVMTimeout, // 5 seconds + RPCGasCap: ethconfig.Defaults.RPCGasCap, // 50,000,000 + RPCTxFeeCap: ethconfig.Defaults.RPCTxFeeCap, // 1 ether + RPCEVMTimeout: ethconfig.Defaults.RPCEVMTimeout, // 5 seconds + BloomBitsBlocks: params.BloomBitsBlocks * 4, // we generally have smaller blocks + BloomConfirms: params.BloomConfirms, } diff --git a/params/config_arbitrum.go b/params/config_arbitrum.go index d538e9339723..93245586a033 100644 --- a/params/config_arbitrum.go +++ b/params/config_arbitrum.go @@ -189,11 +189,6 @@ func ArbitrumDevTestDASChainConfig() *ChainConfig { } } -const ( - // Arbitrum blocks are usually smaller, so use more blocks per bloom section - ArbBloomBitsBlocks uint64 = BloomBitsBlocks * 16 -) - var ArbitrumSupportedChainConfigs = []*ChainConfig{ ArbitrumOneChainConfig(), ArbitrumTestnetChainConfig(),