Skip to content

Commit

Permalink
Merge pull request ethereum#54 from OffchainLabs/bloom
Browse files Browse the repository at this point in the history
initial: bloom filters
  • Loading branch information
PlasmaPower authored Apr 12, 2022
2 parents 256714e + 7c19a99 commit 94d9d4c
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 27 deletions.
7 changes: 5 additions & 2 deletions arbitrum/apibackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ func (a *APIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs

// Filter API
func (a *APIBackend) BloomStatus() (uint64, uint64) {
return params.BloomBitsBlocks, 0 // TODO: Implement second return value
sections, _, _ := a.b.bloomIndexer.Sections()
return a.b.config.BloomBitsBlocks, sections
}

func (a *APIBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) {
Expand All @@ -303,7 +304,9 @@ func (a *APIBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*t
}

func (a *APIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
panic("not implemented") // TODO: Implement
for i := 0; i < bloomFilterThreads; i++ {
go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, a.b.bloomRequests)
}
}

func (a *APIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
Expand Down
28 changes: 20 additions & 8 deletions arbitrum/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package arbitrum

import (
"context"

"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
Expand All @@ -19,22 +21,30 @@ type Backend struct {
txFeed event.Feed
scope event.SubscriptionScope

bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports

chanTxs chan *types.Transaction
chanClose chan struct{} //close coroutine
chanNewBlock chan struct{} //create new L2 block unless empty
}

func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, publisher ArbInterface) (*Backend, error) {
backend := &Backend{
arb: publisher,
stack: stack,
config: config,
chainDb: chainDb,
arb: publisher,
stack: stack,
config: config,
chainDb: chainDb,

bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: core.NewBloomIndexer(chainDb, config.BloomBitsBlocks, config.BloomConfirms),

chanTxs: make(chan *types.Transaction, 100),
chanClose: make(chan struct{}, 1),
chanClose: make(chan struct{}),
chanNewBlock: make(chan struct{}, 1),
}
stack.RegisterLifecycle(backend)

backend.bloomIndexer.Start(backend.arb.BlockChain())

createRegisterAPIBackend(backend)
return backend, nil
Expand Down Expand Up @@ -66,12 +76,14 @@ func (b *Backend) ArbInterface() ArbInterface {

//TODO: this is used when registering backend as lifecycle in stack
func (b *Backend) Start() error {
b.startBloomHandlers(b.config.BloomBitsBlocks)

return nil
}

func (b *Backend) Stop() error {

b.scope.Close()

b.bloomIndexer.Close()
close(b.chanClose)
return nil
}
46 changes: 46 additions & 0 deletions arbitrum/bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package arbitrum

import (
"time"

"github.com/ethereum/go-ethereum/eth"
)

const (
// bloomServiceThreads is the number of goroutines used globally by an Ethereum
// instance to service bloombits lookups for all running filters.
bloomServiceThreads = 16

// bloomFilterThreads is the number of goroutines used locally per filter to
// multiplex requests onto the global servicing goroutines.
bloomFilterThreads = 3

// bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
// in a single batch.
bloomRetrievalBatch = 16

// bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
// to accumulate request an entire batch (avoiding hysteresis).
bloomRetrievalWait = time.Duration(0)
)

// startBloomHandlers starts a batch of goroutines to accept bloom bit database
// retrievals from possibly a range of filters and serving the data to satisfy.
func (b *Backend) startBloomHandlers(sectionSize uint64) {
for i := 0; i < bloomServiceThreads; i++ {
go func() {
for {
select {
case _, more := <-b.chanClose:
if !more {
return
}
case request := <-b.bloomRequests:
task := <-request
eth.ServeBloombitRetrieval(task, b.chainDb, sectionSize)
request <- task
}
}
}()
}
}
18 changes: 14 additions & 4 deletions arbitrum/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package arbitrum

import (
"time"

"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/params"
flag "github.com/spf13/pflag"
)
import "time"

type Config struct {
// RPCGasCap is the global gas cap for eth-call variants.
Expand All @@ -16,16 +18,24 @@ type Config struct {

// RPCEVMTimeout is the global timeout for eth-call.
RPCEVMTimeout time.Duration `koanf:"evm-timeout"`

// Parameters for the bloom indexer
BloomBitsBlocks uint64 `koanf:"bloom-bits-blocks"`
BloomConfirms uint64 `koanf:"bloom-confirms"`
}

func ConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".gas-cap", DefaultConfig.RPCGasCap, "cap on computation gas that can be used in eth_call/estimateGas (0=infinite)")
f.Float64(prefix+".tx-fee-cap", DefaultConfig.RPCTxFeeCap, "cap on transaction fee (in ether) that can be sent via the RPC APIs (0 = no cap)")
f.Duration(prefix+".evm-timeout", DefaultConfig.RPCEVMTimeout, "timeout used for eth_call (0=infinite)")
f.Uint64(prefix+".bloom-bits-blocks", DefaultConfig.BloomBitsBlocks, "number of blocks a single bloom bit section vector holds")
f.Uint64(prefix+".bloom-confirms", DefaultConfig.BloomConfirms, "number of confirmations before indexing new bloom filters")
}

var DefaultConfig = Config{
RPCGasCap: ethconfig.Defaults.RPCGasCap, // 50,000,000
RPCTxFeeCap: ethconfig.Defaults.RPCTxFeeCap, // 1 ether
RPCEVMTimeout: ethconfig.Defaults.RPCEVMTimeout, // 5 seconds
RPCGasCap: ethconfig.Defaults.RPCGasCap, // 50,000,000
RPCTxFeeCap: ethconfig.Defaults.RPCTxFeeCap, // 1 ether
RPCEVMTimeout: ethconfig.Defaults.RPCEVMTimeout, // 5 seconds
BloomBitsBlocks: params.BloomBitsBlocks * 4, // we generally have smaller blocks
BloomConfirms: params.BloomConfirms,
}
32 changes: 19 additions & 13 deletions eth/bloombits.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"time"

"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
)

const (
Expand Down Expand Up @@ -53,22 +55,26 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) {

case request := <-eth.bloomRequests:
task := <-request
task.Bitsets = make([][]byte, len(task.Sections))
for i, section := range task.Sections {
head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*sectionSize-1)
if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil {
if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil {
task.Bitsets[i] = blob
} else {
task.Error = err
}
} else {
task.Error = err
}
}
ServeBloombitRetrieval(task, eth.chainDb, sectionSize)
request <- task
}
}
}()
}
}

func ServeBloombitRetrieval(task *bloombits.Retrieval, chainDb ethdb.Database, sectionSize uint64) {
task.Bitsets = make([][]byte, len(task.Sections))
for i, section := range task.Sections {
head := rawdb.ReadCanonicalHash(chainDb, (section+1)*sectionSize-1)
if compVector, err := rawdb.ReadBloomBits(chainDb, task.Bit, section, head); err == nil {
if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil {
task.Bitsets[i] = blob
} else {
task.Error = err
}
} else {
task.Error = err
}
}
}

0 comments on commit 94d9d4c

Please sign in to comment.