From d0e70db6e1631c78ac01830ac2a8e47857aece9a Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 1 Feb 2024 08:39:11 -0800 Subject: [PATCH 1/8] add ingestion throttle --- engine/execution/ingestion/throttle.go | 301 +++++++++++++++++++++++++ 1 file changed, 301 insertions(+) create mode 100644 engine/execution/ingestion/throttle.go diff --git a/engine/execution/ingestion/throttle.go b/engine/execution/ingestion/throttle.go new file mode 100644 index 00000000000..5e6612e8c9b --- /dev/null +++ b/engine/execution/ingestion/throttle.go @@ -0,0 +1,301 @@ +package ingestion + +import ( + "context" + "fmt" + "sync" + + "github.com/onflow/flow-go/engine/execution/state" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/state/protocol" + "github.com/onflow/flow-go/storage" + "github.com/rs/zerolog" +) + +// CatchUpThreshold is the number of blocks that if the execution is far behind +// the finalization then we will only lazy load the next unexecuted finalized +// blocks until the execution has caught up +const CatchUpThreshold = 500 + +func NewThrottleEngine( + blocks storage.Blocks, + handler BlockHandler, + log zerolog.Logger, + state protocol.State, + execState state.ExecutionState, + headers storage.Headers, + catchupThreshold int, +) (*component.ComponentManager, error) { + throttle, err := NewThrottle(log, state, execState, headers, catchupThreshold) + if err != nil { + return nil, fmt.Errorf("could not create throttle: %w", err) + } + + e := component.NewComponentManagerBuilder(). + AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + processables := make(chan flow.Identifier, 1) + + go func() { + err := forwardProcessableToHandler(ctx, blocks, handler, processables) + if err != nil { + ctx.Throw(err) + } + }() + + log.Info().Msg("initializing throttle engine") + + err = throttle.Init(processables) + if err != nil { + ctx.Throw(err) + } + + log.Info().Msgf("throttle engine initialized") + + ready() + }). + Build() + return e, nil +} + +func forwardProcessableToHandler( + ctx context.Context, + blocks storage.Blocks, + handler BlockHandler, + processables <-chan flow.Identifier, +) error { + for { + select { + case <-ctx.Done(): + return nil + case blockID := <-processables: + block, err := blocks.ByID(blockID) + if err != nil { + return fmt.Errorf("could not get block: %w", err) + } + + err = handler.OnBlock(block) + if err != nil { + return fmt.Errorf("could not process block: %w", err) + } + } + } +} + +// Throttle is a helper struct that helps throttle the unexecuted blocks to be sent +// to the block queue for execution. +// It is useful for case when execution is falling far behind the finalization, in which case +// we want to throttle the blocks to be sent to the block queue for fetching data to execute +// them. Without throttle, the block queue will be flooded with blocks, and the network +// will be flooded with requests fetching collections, and the EN might quickly run out of memory. +type Throttle struct { + // config + threshold int // catch up threshold + + // state + mu sync.Mutex + executed uint64 + finalized uint64 + inited bool + + // notifier + processables chan<- flow.Identifier + + // dependencies + log zerolog.Logger + state protocol.State + headers storage.Headers +} + +type BlockHandler interface { + OnBlock(block *flow.Block) error +} + +func NewThrottle( + log zerolog.Logger, + state protocol.State, + execState state.ExecutionState, + headers storage.Headers, + catchupThreshold int, +) (*Throttle, error) { + finalizedHead, err := state.Final().Head() + if err != nil { + return nil, fmt.Errorf("could not get finalized head: %w", err) + } + + finalized := finalizedHead.Height + // TODO: implement GetHighestFinalizedExecuted for execution state when storehouse + // is not used + executed := execState.GetHighestFinalizedExecuted() + + if executed > finalized { + return nil, fmt.Errorf("executed finalized %v is greater than finalized %v", executed, finalized) + } + + return &Throttle{ + threshold: catchupThreshold, + executed: executed, + finalized: finalized, + + log: log.With().Str("component", "throttle").Logger(), + state: state, + headers: headers, + }, nil +} + +func (c *Throttle) Init(processables chan<- flow.Identifier) error { + c.mu.Lock() + defer c.mu.Unlock() + if c.inited { + return fmt.Errorf("throttle already inited") + } + + c.inited = true + + var unexecuted []flow.Identifier + var err error + if caughtUp(c.executed, c.finalized, c.threshold) { + unexecuted, err = findAllUnexecutedBlocks(c.state, c.headers, c.executed, c.finalized) + if err != nil { + return err + } + } else { + unexecuted, err = findFinalized(c.state, c.headers, c.executed, c.executed+500) + if err != nil { + return err + } + } + + for _, id := range unexecuted { + c.processables <- id + } + + return nil +} + +func (c *Throttle) OnBlockExecuted(executed uint64, _ flow.Identifier) error { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.inited { + return fmt.Errorf("throttle not inited") + } + + // we have already caught up, ignore + if c.caughtUp() { + return nil + } + + // the execution is still far behind from finalization + c.executed = executed + if !c.caughtUp() { + return nil + } + + c.log.Info().Uint64("executed", executed).Uint64("finalized", c.finalized). + Msgf("execution has caught up, processing remaining unexecuted blocks") + + // if the execution have just caught up close enough to the latest finalized blocks, + // then process all unexecuted blocks, including finalized unexecuted and pending unexecuted + unexecuted, err := findAllUnexecutedBlocks(c.state, c.headers, c.executed, c.finalized) + if err != nil { + return fmt.Errorf("could not find unexecuted blocks for processing: %w", err) + } + + c.log.Info().Int("unexecuted", len(unexecuted)).Msgf("forwarding unexecuted blocks") + + for _, id := range unexecuted { + c.processables <- id + } + + c.log.Info().Msgf("all unexecuted blocks have been processed") + + return nil +} + +func (c *Throttle) BlockProcessable(block *flow.Header, qc *flow.QuorumCertificate) { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.inited { + return + } + + // ignore the block if has not caught up. + if !c.caughtUp() { + return + } + + // if has caught up, then process the block + c.processables <- qc.BlockID +} + +func (c *Throttle) OnBlockFinalized(lastFinalized *flow.Header) { + c.mu.Lock() + defer c.mu.Unlock() + if !c.inited { + return + } + + if c.caughtUp() { + return + } + + if lastFinalized.Height <= c.finalized { + return + } + + c.finalized = lastFinalized.Height +} + +func (c *Throttle) caughtUp() bool { + return caughtUp(c.executed, c.finalized, c.threshold) +} + +func caughtUp(executed, finalized uint64, threshold int) bool { + return finalized <= executed+uint64(threshold) +} + +func findFinalized(state protocol.State, headers storage.Headers, lastExecuted, finalizedHeight uint64) ([]flow.Identifier, error) { + // get finalized height + finalized := state.AtHeight(finalizedHeight) + final, err := finalized.Head() + if err != nil { + return nil, fmt.Errorf("could not get finalized block: %w", err) + } + + // dynamically bootstrapped execution node will have highest finalized executed as sealed root, + // which is lower than finalized root. so we will reload blocks from + // [sealedRoot.Height + 1, finalizedRoot.Height] and execute them on startup. + unexecutedFinalized := make([]flow.Identifier, 0) + + // starting from the first unexecuted block, go through each unexecuted and finalized block + // reload its block to execution queues + // loading finalized blocks + for height := lastExecuted + 1; height <= final.Height; height++ { + finalizedID, err := headers.BlockIDByHeight(height) + if err != nil { + return nil, fmt.Errorf("could not get header at height: %v, %w", height, err) + } + + unexecutedFinalized = append(unexecutedFinalized, finalizedID) + } + return unexecutedFinalized, nil +} + +func findAllUnexecutedBlocks(state protocol.State, headers storage.Headers, lastExecuted, finalizedHeight uint64) ([]flow.Identifier, error) { + unexecutedFinalized, err := findFinalized(state, headers, lastExecuted, finalizedHeight) + if err != nil { + return nil, fmt.Errorf("could not find finalized unexecuted blocks: %w", err) + } + + // loaded all pending blocks + pendings, err := state.AtHeight(finalizedHeight).Descendants() + if err != nil { + return nil, fmt.Errorf("could not get descendants of finalized block: %w", err) + } + + unexecuted := append(unexecutedFinalized, pendings...) + return unexecuted, nil +} From d5aec198b4c59774d09796059d16c573bc099a61 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 1 Feb 2024 08:55:55 -0800 Subject: [PATCH 2/8] increase buffer size --- engine/execution/ingestion/throttle.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/engine/execution/ingestion/throttle.go b/engine/execution/ingestion/throttle.go index 5e6612e8c9b..05ee256ba73 100644 --- a/engine/execution/ingestion/throttle.go +++ b/engine/execution/ingestion/throttle.go @@ -20,9 +20,8 @@ import ( const CatchUpThreshold = 500 func NewThrottleEngine( - blocks storage.Blocks, - handler BlockHandler, log zerolog.Logger, + handler BlockHandler, state protocol.State, execState state.ExecutionState, headers storage.Headers, @@ -35,10 +34,19 @@ func NewThrottleEngine( e := component.NewComponentManagerBuilder(). AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { - processables := make(chan flow.Identifier, 1) + // TODO: config the buffer size + // since the handler.OnBlock method could be blocking, we need to make sure + // the channel has enough buffer space to hold the unprocessed blocks. + // if the channel is full, then it will block the follower engine from + // delivering new blocks until the channel is not full, which could be + // useful because we probably don't want to process too many blocks if + // the execution is not fast enough or even stopped. + // TODO: wrap the channel so that we can report acurate metrics about the + // buffer size + processables := make(chan flow.Identifier, 10000) go func() { - err := forwardProcessableToHandler(ctx, blocks, handler, processables) + err := forwardProcessableToHandler(ctx, headers, handler, processables) if err != nil { ctx.Throw(err) } @@ -61,7 +69,7 @@ func NewThrottleEngine( func forwardProcessableToHandler( ctx context.Context, - blocks storage.Blocks, + headers storage.Headers, handler BlockHandler, processables <-chan flow.Identifier, ) error { @@ -70,7 +78,7 @@ func forwardProcessableToHandler( case <-ctx.Done(): return nil case blockID := <-processables: - block, err := blocks.ByID(blockID) + block, err := headers.ByBlockID(blockID) if err != nil { return fmt.Errorf("could not get block: %w", err) } @@ -109,7 +117,7 @@ type Throttle struct { } type BlockHandler interface { - OnBlock(block *flow.Block) error + OnBlock(block *flow.Header) error } func NewThrottle( From 0604cc9cd1d515295f04382fd13f5e23209fdc7c Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 1 Feb 2024 17:08:51 -0800 Subject: [PATCH 3/8] remove throttle engine --- engine/execution/ingestion/throttle.go | 75 -------------------------- 1 file changed, 75 deletions(-) diff --git a/engine/execution/ingestion/throttle.go b/engine/execution/ingestion/throttle.go index 05ee256ba73..fab5d98f8f0 100644 --- a/engine/execution/ingestion/throttle.go +++ b/engine/execution/ingestion/throttle.go @@ -1,14 +1,11 @@ package ingestion import ( - "context" "fmt" "sync" "github.com/onflow/flow-go/engine/execution/state" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/component" - "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" "github.com/rs/zerolog" @@ -19,78 +16,6 @@ import ( // blocks until the execution has caught up const CatchUpThreshold = 500 -func NewThrottleEngine( - log zerolog.Logger, - handler BlockHandler, - state protocol.State, - execState state.ExecutionState, - headers storage.Headers, - catchupThreshold int, -) (*component.ComponentManager, error) { - throttle, err := NewThrottle(log, state, execState, headers, catchupThreshold) - if err != nil { - return nil, fmt.Errorf("could not create throttle: %w", err) - } - - e := component.NewComponentManagerBuilder(). - AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { - // TODO: config the buffer size - // since the handler.OnBlock method could be blocking, we need to make sure - // the channel has enough buffer space to hold the unprocessed blocks. - // if the channel is full, then it will block the follower engine from - // delivering new blocks until the channel is not full, which could be - // useful because we probably don't want to process too many blocks if - // the execution is not fast enough or even stopped. - // TODO: wrap the channel so that we can report acurate metrics about the - // buffer size - processables := make(chan flow.Identifier, 10000) - - go func() { - err := forwardProcessableToHandler(ctx, headers, handler, processables) - if err != nil { - ctx.Throw(err) - } - }() - - log.Info().Msg("initializing throttle engine") - - err = throttle.Init(processables) - if err != nil { - ctx.Throw(err) - } - - log.Info().Msgf("throttle engine initialized") - - ready() - }). - Build() - return e, nil -} - -func forwardProcessableToHandler( - ctx context.Context, - headers storage.Headers, - handler BlockHandler, - processables <-chan flow.Identifier, -) error { - for { - select { - case <-ctx.Done(): - return nil - case blockID := <-processables: - block, err := headers.ByBlockID(blockID) - if err != nil { - return fmt.Errorf("could not get block: %w", err) - } - - err = handler.OnBlock(block) - if err != nil { - return fmt.Errorf("could not process block: %w", err) - } - } - } -} - // Throttle is a helper struct that helps throttle the unexecuted blocks to be sent // to the block queue for execution. // It is useful for case when execution is falling far behind the finalization, in which case From 04694252669dc3b38e2de593ed26fcc66d0d3f4b Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 1 Feb 2024 17:09:14 -0800 Subject: [PATCH 4/8] lint --- engine/execution/ingestion/throttle.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/execution/ingestion/throttle.go b/engine/execution/ingestion/throttle.go index fab5d98f8f0..38464a5e6fd 100644 --- a/engine/execution/ingestion/throttle.go +++ b/engine/execution/ingestion/throttle.go @@ -4,11 +4,12 @@ import ( "fmt" "sync" + "github.com/rs/zerolog" + "github.com/onflow/flow-go/engine/execution/state" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" - "github.com/rs/zerolog" ) // CatchUpThreshold is the number of blocks that if the execution is far behind From 465a3d025d0324f12929d182880cd6562e32976b Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 2 Feb 2024 10:47:38 -0800 Subject: [PATCH 5/8] rename throttle --- engine/execution/ingestion/throttle.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/engine/execution/ingestion/throttle.go b/engine/execution/ingestion/throttle.go index 38464a5e6fd..b98157dbda2 100644 --- a/engine/execution/ingestion/throttle.go +++ b/engine/execution/ingestion/throttle.go @@ -17,13 +17,13 @@ import ( // blocks until the execution has caught up const CatchUpThreshold = 500 -// Throttle is a helper struct that helps throttle the unexecuted blocks to be sent +// BlockThrottle is a helper struct that throttles the unexecuted blocks to be sent // to the block queue for execution. // It is useful for case when execution is falling far behind the finalization, in which case // we want to throttle the blocks to be sent to the block queue for fetching data to execute // them. Without throttle, the block queue will be flooded with blocks, and the network // will be flooded with requests fetching collections, and the EN might quickly run out of memory. -type Throttle struct { +type BlockThrottle struct { // config threshold int // catch up threshold @@ -46,13 +46,13 @@ type BlockHandler interface { OnBlock(block *flow.Header) error } -func NewThrottle( +func NewBlockThrottle( log zerolog.Logger, state protocol.State, execState state.ExecutionState, headers storage.Headers, catchupThreshold int, -) (*Throttle, error) { +) (*BlockThrottle, error) { finalizedHead, err := state.Final().Head() if err != nil { return nil, fmt.Errorf("could not get finalized head: %w", err) @@ -67,7 +67,7 @@ func NewThrottle( return nil, fmt.Errorf("executed finalized %v is greater than finalized %v", executed, finalized) } - return &Throttle{ + return &BlockThrottle{ threshold: catchupThreshold, executed: executed, finalized: finalized, @@ -78,7 +78,7 @@ func NewThrottle( }, nil } -func (c *Throttle) Init(processables chan<- flow.Identifier) error { +func (c *BlockThrottle) Init(processables chan<- flow.Identifier) error { c.mu.Lock() defer c.mu.Unlock() if c.inited { @@ -108,7 +108,7 @@ func (c *Throttle) Init(processables chan<- flow.Identifier) error { return nil } -func (c *Throttle) OnBlockExecuted(executed uint64, _ flow.Identifier) error { +func (c *BlockThrottle) OnBlockExecuted(executed uint64, _ flow.Identifier) error { c.mu.Lock() defer c.mu.Unlock() @@ -148,7 +148,7 @@ func (c *Throttle) OnBlockExecuted(executed uint64, _ flow.Identifier) error { return nil } -func (c *Throttle) BlockProcessable(block *flow.Header, qc *flow.QuorumCertificate) { +func (c *BlockThrottle) BlockProcessable(block *flow.Header, qc *flow.QuorumCertificate) { c.mu.Lock() defer c.mu.Unlock() @@ -165,7 +165,7 @@ func (c *Throttle) BlockProcessable(block *flow.Header, qc *flow.QuorumCertifica c.processables <- qc.BlockID } -func (c *Throttle) OnBlockFinalized(lastFinalized *flow.Header) { +func (c *BlockThrottle) OnBlockFinalized(lastFinalized *flow.Header) { c.mu.Lock() defer c.mu.Unlock() if !c.inited { @@ -183,7 +183,7 @@ func (c *Throttle) OnBlockFinalized(lastFinalized *flow.Header) { c.finalized = lastFinalized.Height } -func (c *Throttle) caughtUp() bool { +func (c *BlockThrottle) caughtUp() bool { return caughtUp(c.executed, c.finalized, c.threshold) } From 669e087d1b259861d691ddeedbff7ae88076caa4 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 2 Feb 2024 10:58:47 -0800 Subject: [PATCH 6/8] update throttle --- engine/execution/ingestion/throttle.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/engine/execution/ingestion/throttle.go b/engine/execution/ingestion/throttle.go index b98157dbda2..16ca08797e9 100644 --- a/engine/execution/ingestion/throttle.go +++ b/engine/execution/ingestion/throttle.go @@ -42,10 +42,6 @@ type BlockThrottle struct { headers storage.Headers } -type BlockHandler interface { - OnBlock(block *flow.Header) error -} - func NewBlockThrottle( log zerolog.Logger, state protocol.State, @@ -108,7 +104,7 @@ func (c *BlockThrottle) Init(processables chan<- flow.Identifier) error { return nil } -func (c *BlockThrottle) OnBlockExecuted(executed uint64, _ flow.Identifier) error { +func (c *BlockThrottle) OnBlockExecuted(_ flow.Identifier, executed uint64) error { c.mu.Lock() defer c.mu.Unlock() @@ -148,21 +144,22 @@ func (c *BlockThrottle) OnBlockExecuted(executed uint64, _ flow.Identifier) erro return nil } -func (c *BlockThrottle) BlockProcessable(block *flow.Header, qc *flow.QuorumCertificate) { +func (c *BlockThrottle) OnBlock(blockID flow.Identifier) error { c.mu.Lock() defer c.mu.Unlock() if !c.inited { - return + return fmt.Errorf("throttle not inited") } // ignore the block if has not caught up. if !c.caughtUp() { - return + return nil } // if has caught up, then process the block - c.processables <- qc.BlockID + c.processables <- blockID + return nil } func (c *BlockThrottle) OnBlockFinalized(lastFinalized *flow.Header) { From 3baf7c07cb98b325b1cc21e30251ea8e433aaeba Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 8 Apr 2024 15:19:25 -0700 Subject: [PATCH 7/8] update comments --- engine/execution/ingestion/throttle.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/engine/execution/ingestion/throttle.go b/engine/execution/ingestion/throttle.go index 16ca08797e9..480388acb88 100644 --- a/engine/execution/ingestion/throttle.go +++ b/engine/execution/ingestion/throttle.go @@ -17,7 +17,7 @@ import ( // blocks until the execution has caught up const CatchUpThreshold = 500 -// BlockThrottle is a helper struct that throttles the unexecuted blocks to be sent +// BlockThrottle is a helper struct that helps throttle the unexecuted blocks to be sent // to the block queue for execution. // It is useful for case when execution is falling far behind the finalization, in which case // we want to throttle the blocks to be sent to the block queue for fetching data to execute @@ -68,7 +68,7 @@ func NewBlockThrottle( executed: executed, finalized: finalized, - log: log.With().Str("component", "throttle").Logger(), + log: log.With().Str("component", "block_throttle").Logger(), state: state, headers: headers, }, nil @@ -77,11 +77,13 @@ func NewBlockThrottle( func (c *BlockThrottle) Init(processables chan<- flow.Identifier) error { c.mu.Lock() defer c.mu.Unlock() + c.log.Info().Msgf("initializing block throttle") if c.inited { return fmt.Errorf("throttle already inited") } c.inited = true + c.processables = processables var unexecuted []flow.Identifier var err error @@ -90,17 +92,21 @@ func (c *BlockThrottle) Init(processables chan<- flow.Identifier) error { if err != nil { return err } + c.log.Info().Msgf("loaded %d unexecuted blocks", len(unexecuted)) } else { unexecuted, err = findFinalized(c.state, c.headers, c.executed, c.executed+500) if err != nil { return err } + c.log.Info().Msgf("loaded %d unexecuted finalized blocks", len(unexecuted)) } for _, id := range unexecuted { c.processables <- id } + c.log.Info().Msgf("throttle initialized with %d unexecuted blocks", len(unexecuted)) + return nil } @@ -147,6 +153,7 @@ func (c *BlockThrottle) OnBlockExecuted(_ flow.Identifier, executed uint64) erro func (c *BlockThrottle) OnBlock(blockID flow.Identifier) error { c.mu.Lock() defer c.mu.Unlock() + c.log.Debug().Msgf("recieved block (%v)", blockID) if !c.inited { return fmt.Errorf("throttle not inited") @@ -159,6 +166,8 @@ func (c *BlockThrottle) OnBlock(blockID flow.Identifier) error { // if has caught up, then process the block c.processables <- blockID + c.log.Debug().Msgf("processed block (%v)", blockID) + return nil } From 267c6c3952ad4b7e9bb495e00eaafd6b5ae51b8a Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 12 Apr 2024 10:17:37 -0700 Subject: [PATCH 8/8] addres review comments for throttle --- engine/execution/ingestion/throttle.go | 30 +++++++++++++-------- engine/execution/ingestion/throttle_test.go | 16 +++++++++++ 2 files changed, 35 insertions(+), 11 deletions(-) create mode 100644 engine/execution/ingestion/throttle_test.go diff --git a/engine/execution/ingestion/throttle.go b/engine/execution/ingestion/throttle.go index 480388acb88..a92566b6660 100644 --- a/engine/execution/ingestion/throttle.go +++ b/engine/execution/ingestion/throttle.go @@ -12,10 +12,10 @@ import ( "github.com/onflow/flow-go/storage" ) -// CatchUpThreshold is the number of blocks that if the execution is far behind +// DefaultCatchUpThreshold is the number of blocks that if the execution is far behind // the finalization then we will only lazy load the next unexecuted finalized // blocks until the execution has caught up -const CatchUpThreshold = 500 +const DefaultCatchUpThreshold = 500 // BlockThrottle is a helper struct that helps throttle the unexecuted blocks to be sent // to the block queue for execution. @@ -31,7 +31,6 @@ type BlockThrottle struct { mu sync.Mutex executed uint64 finalized uint64 - inited bool // notifier processables chan<- flow.Identifier @@ -74,15 +73,18 @@ func NewBlockThrottle( }, nil } +// inited returns true if the throttle has been inited +func (c *BlockThrottle) inited() bool { + return c.processables != nil +} + func (c *BlockThrottle) Init(processables chan<- flow.Identifier) error { c.mu.Lock() defer c.mu.Unlock() - c.log.Info().Msgf("initializing block throttle") - if c.inited { + if c.inited() { return fmt.Errorf("throttle already inited") } - c.inited = true c.processables = processables var unexecuted []flow.Identifier @@ -94,13 +96,18 @@ func (c *BlockThrottle) Init(processables chan<- flow.Identifier) error { } c.log.Info().Msgf("loaded %d unexecuted blocks", len(unexecuted)) } else { - unexecuted, err = findFinalized(c.state, c.headers, c.executed, c.executed+500) + unexecuted, err = findFinalized(c.state, c.headers, c.executed, c.executed+uint64(c.threshold)) if err != nil { return err } c.log.Info().Msgf("loaded %d unexecuted finalized blocks", len(unexecuted)) } + c.log.Info().Msgf("throttle initializing with %d unexecuted blocks", len(unexecuted)) + + // the ingestion core engine must have initialized the 'processables' with 10000 (default) buffer size, + // and the 'unexecuted' will only contain up to DefaultCatchUpThreshold (500) blocks, + // so pushing all the unexecuted to processables won't be blocked. for _, id := range unexecuted { c.processables <- id } @@ -114,7 +121,7 @@ func (c *BlockThrottle) OnBlockExecuted(_ flow.Identifier, executed uint64) erro c.mu.Lock() defer c.mu.Unlock() - if !c.inited { + if !c.inited() { return fmt.Errorf("throttle not inited") } @@ -155,7 +162,7 @@ func (c *BlockThrottle) OnBlock(blockID flow.Identifier) error { defer c.mu.Unlock() c.log.Debug().Msgf("recieved block (%v)", blockID) - if !c.inited { + if !c.inited() { return fmt.Errorf("throttle not inited") } @@ -174,7 +181,7 @@ func (c *BlockThrottle) OnBlock(blockID flow.Identifier) error { func (c *BlockThrottle) OnBlockFinalized(lastFinalized *flow.Header) { c.mu.Lock() defer c.mu.Unlock() - if !c.inited { + if !c.inited() { return } @@ -216,11 +223,12 @@ func findFinalized(state protocol.State, headers storage.Headers, lastExecuted, for height := lastExecuted + 1; height <= final.Height; height++ { finalizedID, err := headers.BlockIDByHeight(height) if err != nil { - return nil, fmt.Errorf("could not get header at height: %v, %w", height, err) + return nil, fmt.Errorf("could not get block ID by height %v: %w", height, err) } unexecutedFinalized = append(unexecutedFinalized, finalizedID) } + return unexecutedFinalized, nil } diff --git a/engine/execution/ingestion/throttle_test.go b/engine/execution/ingestion/throttle_test.go new file mode 100644 index 00000000000..a2d8911b109 --- /dev/null +++ b/engine/execution/ingestion/throttle_test.go @@ -0,0 +1,16 @@ +package ingestion + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCaughtUp(t *testing.T) { + require.True(t, caughtUp(100, 200, 500)) + require.True(t, caughtUp(100, 100, 500)) + require.True(t, caughtUp(100, 600, 500)) + + require.False(t, caughtUp(100, 601, 500)) + require.False(t, caughtUp(100, 602, 500)) +}