Skip to content

Commit

Permalink
addres review comments for throttle
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Apr 12, 2024
1 parent 3baf7c0 commit 267c6c3
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 11 deletions.
30 changes: 19 additions & 11 deletions engine/execution/ingestion/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
"github.com/onflow/flow-go/storage"
)

// CatchUpThreshold is the number of blocks that if the execution is far behind
// DefaultCatchUpThreshold is the number of blocks that if the execution is far behind
// the finalization then we will only lazy load the next unexecuted finalized
// blocks until the execution has caught up
const CatchUpThreshold = 500
const DefaultCatchUpThreshold = 500

// BlockThrottle is a helper struct that helps throttle the unexecuted blocks to be sent
// to the block queue for execution.
Expand All @@ -31,7 +31,6 @@ type BlockThrottle struct {
mu sync.Mutex
executed uint64
finalized uint64
inited bool

// notifier
processables chan<- flow.Identifier
Expand Down Expand Up @@ -74,15 +73,18 @@ func NewBlockThrottle(
}, nil
}

// inited returns true if the throttle has been inited
func (c *BlockThrottle) inited() bool {
return c.processables != nil
}

func (c *BlockThrottle) Init(processables chan<- flow.Identifier) error {
c.mu.Lock()
defer c.mu.Unlock()
c.log.Info().Msgf("initializing block throttle")
if c.inited {
if c.inited() {
return fmt.Errorf("throttle already inited")
}

c.inited = true
c.processables = processables

var unexecuted []flow.Identifier
Expand All @@ -94,13 +96,18 @@ func (c *BlockThrottle) Init(processables chan<- flow.Identifier) error {
}
c.log.Info().Msgf("loaded %d unexecuted blocks", len(unexecuted))
} else {
unexecuted, err = findFinalized(c.state, c.headers, c.executed, c.executed+500)
unexecuted, err = findFinalized(c.state, c.headers, c.executed, c.executed+uint64(c.threshold))
if err != nil {
return err
}
c.log.Info().Msgf("loaded %d unexecuted finalized blocks", len(unexecuted))
}

c.log.Info().Msgf("throttle initializing with %d unexecuted blocks", len(unexecuted))

// the ingestion core engine must have initialized the 'processables' with 10000 (default) buffer size,
// and the 'unexecuted' will only contain up to DefaultCatchUpThreshold (500) blocks,
// so pushing all the unexecuted to processables won't be blocked.
for _, id := range unexecuted {
c.processables <- id
}
Expand All @@ -114,7 +121,7 @@ func (c *BlockThrottle) OnBlockExecuted(_ flow.Identifier, executed uint64) erro
c.mu.Lock()
defer c.mu.Unlock()

if !c.inited {
if !c.inited() {
return fmt.Errorf("throttle not inited")
}

Expand Down Expand Up @@ -155,7 +162,7 @@ func (c *BlockThrottle) OnBlock(blockID flow.Identifier) error {
defer c.mu.Unlock()
c.log.Debug().Msgf("recieved block (%v)", blockID)

if !c.inited {
if !c.inited() {
return fmt.Errorf("throttle not inited")
}

Expand All @@ -174,7 +181,7 @@ func (c *BlockThrottle) OnBlock(blockID flow.Identifier) error {
func (c *BlockThrottle) OnBlockFinalized(lastFinalized *flow.Header) {
c.mu.Lock()
defer c.mu.Unlock()
if !c.inited {
if !c.inited() {
return
}

Expand Down Expand Up @@ -216,11 +223,12 @@ func findFinalized(state protocol.State, headers storage.Headers, lastExecuted,
for height := lastExecuted + 1; height <= final.Height; height++ {
finalizedID, err := headers.BlockIDByHeight(height)
if err != nil {
return nil, fmt.Errorf("could not get header at height: %v, %w", height, err)
return nil, fmt.Errorf("could not get block ID by height %v: %w", height, err)
}

unexecutedFinalized = append(unexecutedFinalized, finalizedID)
}

return unexecutedFinalized, nil
}

Expand Down
16 changes: 16 additions & 0 deletions engine/execution/ingestion/throttle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ingestion

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestCaughtUp(t *testing.T) {
require.True(t, caughtUp(100, 200, 500))
require.True(t, caughtUp(100, 100, 500))
require.True(t, caughtUp(100, 600, 500))

require.False(t, caughtUp(100, 601, 500))
require.False(t, caughtUp(100, 602, 500))
}

0 comments on commit 267c6c3

Please sign in to comment.