diff --git a/arbitrum/apibackend.go b/arbitrum/apibackend.go index d0a81521eb68..36cd25c5660b 100644 --- a/arbitrum/apibackend.go +++ b/arbitrum/apibackend.go @@ -287,7 +287,8 @@ func (a *APIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs // Filter API func (a *APIBackend) BloomStatus() (uint64, uint64) { - return params.BloomBitsBlocks, 0 // TODO: Implement second return value + sections, _, _ := a.b.bloomIndexer.Sections() + return a.b.config.BloomBitsBlocks, sections } func (a *APIBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) { @@ -303,7 +304,9 @@ func (a *APIBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*t } func (a *APIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { - panic("not implemented") // TODO: Implement + for i := 0; i < bloomFilterThreads; i++ { + go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, a.b.bloomRequests) + } } func (a *APIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { diff --git a/arbitrum/backend.go b/arbitrum/backend.go index 8deded4b0722..c726e33df635 100644 --- a/arbitrum/backend.go +++ b/arbitrum/backend.go @@ -2,7 +2,9 @@ package arbitrum import ( "context" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -19,6 +21,9 @@ type Backend struct { txFeed event.Feed scope event.SubscriptionScope + bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests + bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports + chanTxs chan *types.Transaction chanClose chan struct{} //close coroutine chanNewBlock chan struct{} //create new L2 block unless empty @@ -26,15 +31,20 @@ type Backend struct { func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, publisher ArbInterface) (*Backend, error) { backend := &Backend{ - arb: publisher, - stack: stack, - config: config, - chainDb: chainDb, + arb: publisher, + stack: stack, + config: config, + chainDb: chainDb, + + bloomRequests: make(chan chan *bloombits.Retrieval), + bloomIndexer: core.NewBloomIndexer(chainDb, config.BloomBitsBlocks, config.BloomConfirms), + chanTxs: make(chan *types.Transaction, 100), - chanClose: make(chan struct{}, 1), + chanClose: make(chan struct{}), chanNewBlock: make(chan struct{}, 1), } - stack.RegisterLifecycle(backend) + + backend.bloomIndexer.Start(backend.arb.BlockChain()) createRegisterAPIBackend(backend) return backend, nil @@ -66,12 +76,14 @@ func (b *Backend) ArbInterface() ArbInterface { //TODO: this is used when registering backend as lifecycle in stack func (b *Backend) Start() error { + b.startBloomHandlers(b.config.BloomBitsBlocks) + return nil } func (b *Backend) Stop() error { - b.scope.Close() - + b.bloomIndexer.Close() + close(b.chanClose) return nil } diff --git a/arbitrum/bloom.go b/arbitrum/bloom.go new file mode 100644 index 000000000000..7946911e3aca --- /dev/null +++ b/arbitrum/bloom.go @@ -0,0 +1,46 @@ +package arbitrum + +import ( + "time" + + "github.com/ethereum/go-ethereum/eth" +) + +const ( + // bloomServiceThreads is the number of goroutines used globally by an Ethereum + // instance to service bloombits lookups for all running filters. + bloomServiceThreads = 16 + + // bloomFilterThreads is the number of goroutines used locally per filter to + // multiplex requests onto the global servicing goroutines. + bloomFilterThreads = 3 + + // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service + // in a single batch. + bloomRetrievalBatch = 16 + + // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests + // to accumulate request an entire batch (avoiding hysteresis). + bloomRetrievalWait = time.Duration(0) +) + +// startBloomHandlers starts a batch of goroutines to accept bloom bit database +// retrievals from possibly a range of filters and serving the data to satisfy. +func (b *Backend) startBloomHandlers(sectionSize uint64) { + for i := 0; i < bloomServiceThreads; i++ { + go func() { + for { + select { + case _, more := <-b.chanClose: + if !more { + return + } + case request := <-b.bloomRequests: + task := <-request + eth.ServeBloombitRetrieval(task, b.chainDb, sectionSize) + request <- task + } + } + }() + } +} diff --git a/arbitrum/config.go b/arbitrum/config.go index 1c2ac294500d..5bd1f0a84ace 100644 --- a/arbitrum/config.go +++ b/arbitrum/config.go @@ -1,10 +1,12 @@ package arbitrum import ( + "time" + "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/params" flag "github.com/spf13/pflag" ) -import "time" type Config struct { // RPCGasCap is the global gas cap for eth-call variants. @@ -16,16 +18,24 @@ type Config struct { // RPCEVMTimeout is the global timeout for eth-call. RPCEVMTimeout time.Duration `koanf:"evm-timeout"` + + // Parameters for the bloom indexer + BloomBitsBlocks uint64 `koanf:"bloom-bits-blocks"` + BloomConfirms uint64 `koanf:"bloom-confirms"` } func ConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".gas-cap", DefaultConfig.RPCGasCap, "cap on computation gas that can be used in eth_call/estimateGas (0=infinite)") f.Float64(prefix+".tx-fee-cap", DefaultConfig.RPCTxFeeCap, "cap on transaction fee (in ether) that can be sent via the RPC APIs (0 = no cap)") f.Duration(prefix+".evm-timeout", DefaultConfig.RPCEVMTimeout, "timeout used for eth_call (0=infinite)") + f.Uint64(prefix+".bloom-bits-blocks", DefaultConfig.BloomBitsBlocks, "number of blocks a single bloom bit section vector holds") + f.Uint64(prefix+".bloom-confirms", DefaultConfig.BloomConfirms, "number of confirmations before indexing new bloom filters") } var DefaultConfig = Config{ - RPCGasCap: ethconfig.Defaults.RPCGasCap, // 50,000,000 - RPCTxFeeCap: ethconfig.Defaults.RPCTxFeeCap, // 1 ether - RPCEVMTimeout: ethconfig.Defaults.RPCEVMTimeout, // 5 seconds + RPCGasCap: ethconfig.Defaults.RPCGasCap, // 50,000,000 + RPCTxFeeCap: ethconfig.Defaults.RPCTxFeeCap, // 1 ether + RPCEVMTimeout: ethconfig.Defaults.RPCEVMTimeout, // 5 seconds + BloomBitsBlocks: params.BloomBitsBlocks * 4, // we generally have smaller blocks + BloomConfirms: params.BloomConfirms, } diff --git a/eth/bloombits.go b/eth/bloombits.go index 0cb7050d2327..1f8ae83fc1d1 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -20,7 +20,9 @@ import ( "time" "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" ) const ( @@ -53,22 +55,26 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { case request := <-eth.bloomRequests: task := <-request - task.Bitsets = make([][]byte, len(task.Sections)) - for i, section := range task.Sections { - head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*sectionSize-1) - if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil { - if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil { - task.Bitsets[i] = blob - } else { - task.Error = err - } - } else { - task.Error = err - } - } + ServeBloombitRetrieval(task, eth.chainDb, sectionSize) request <- task } } }() } } + +func ServeBloombitRetrieval(task *bloombits.Retrieval, chainDb ethdb.Database, sectionSize uint64) { + task.Bitsets = make([][]byte, len(task.Sections)) + for i, section := range task.Sections { + head := rawdb.ReadCanonicalHash(chainDb, (section+1)*sectionSize-1) + if compVector, err := rawdb.ReadBloomBits(chainDb, task.Bit, section, head); err == nil { + if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil { + task.Bitsets[i] = blob + } else { + task.Error = err + } + } else { + task.Error = err + } + } +}