Skip to content

Commit

Permalink
Merge pull request #5593 from onflow/leo/ingestion-new-engine
Browse files Browse the repository at this point in the history
[Execution] New ingestion engine
  • Loading branch information
zhangchiqing authored May 1, 2024
2 parents f7d9a07 + 68fc314 commit e055e56
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 37 deletions.
31 changes: 26 additions & 5 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ type ExecutionNode struct {
followerEng *followereng.ComplianceEngine // to sync blocks from consensus nodes
computationManager *computation.Manager
collectionRequester *requester.Engine
ingestionEng *ingestion.Engine
scriptsEng *scripts.Engine
followerDistributor *pubsub.FollowerDistributor
checkAuthorizedAtBlock func(blockID flow.Identifier) (bool, error)
Expand Down Expand Up @@ -1093,14 +1092,36 @@ func (exeNode *ExecutionNode) LoadIngestionEngine(
}

fetcher := fetcher.NewCollectionFetcher(node.Logger, exeNode.collectionRequester, node.State, exeNode.exeConf.onflowOnlyLNs)

if exeNode.exeConf.enableNewIngestionEngine {
_, core, err := ingestion.NewMachine(
node.Logger,
node.ProtocolEvents,
exeNode.collectionRequester,
fetcher,
node.Storage.Headers,
node.Storage.Blocks,
node.Storage.Collections,
exeNode.executionState,
node.State,
exeNode.collector,
exeNode.computationManager,
exeNode.providerEngine,
exeNode.blockDataUploader,
exeNode.stopControl,
)

return core, err
}

var blockLoader ingestion.BlockLoader
if exeNode.exeConf.enableStorehouse {
blockLoader = loader.NewUnfinalizedLoader(node.Logger, node.State, node.Storage.Headers, exeNode.executionState)
} else {
blockLoader = loader.NewUnexecutedLoader(node.Logger, node.State, node.Storage.Headers, exeNode.executionState)
}

exeNode.ingestionEng, err = ingestion.New(
ingestionEng, err := ingestion.New(
exeNode.ingestionUnit,
node.Logger,
node.EngineRegistry,
Expand All @@ -1122,11 +1143,11 @@ func (exeNode *ExecutionNode) LoadIngestionEngine(

// TODO: we should solve these mutual dependencies better
// => https://github.com/dapperlabs/flow-go/issues/4360
exeNode.collectionRequester = exeNode.collectionRequester.WithHandle(exeNode.ingestionEng.OnCollection)
exeNode.collectionRequester.WithHandle(ingestionEng.OnCollection)

node.ProtocolEvents.AddConsumer(exeNode.ingestionEng)
node.ProtocolEvents.AddConsumer(ingestionEng)

return exeNode.ingestionEng, err
return ingestionEng, err
}

// create scripts engine for handling script execution
Expand Down
8 changes: 5 additions & 3 deletions cmd/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ type ExecutionConfig struct {
// It works around an issue where some collection nodes are not configured with enough
// this works around an issue where some collection nodes are not configured with enough
// file descriptors causing connection failures.
onflowOnlyLNs bool
enableStorehouse bool
enableChecker bool
onflowOnlyLNs bool
enableStorehouse bool
enableChecker bool
enableNewIngestionEngine bool
}

func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
Expand Down Expand Up @@ -120,6 +121,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.BoolVar(&exeConf.onflowOnlyLNs, "temp-onflow-only-lns", false, "do not use unless required. forces node to only request collections from onflow collection nodes")
flags.BoolVar(&exeConf.enableStorehouse, "enable-storehouse", false, "enable storehouse to store registers on disk, default is false")
flags.BoolVar(&exeConf.enableChecker, "enable-checker", true, "enable checker to check the correctness of the execution result, default is true")
flags.BoolVar(&exeConf.enableNewIngestionEngine, "enable-new-ingestion-engine", false, "enable new ingestion engine, default is false")

}

Expand Down
3 changes: 1 addition & 2 deletions engine/common/requester/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,8 @@ func New(log zerolog.Logger, metrics module.EngineMetrics, net network.EngineReg
// function. It is done in a separate call so that the requester can be injected
// into engines upon construction, and then provide a handle function to the
// requester from that engine itself.
func (e *Engine) WithHandle(handle HandleFunc) *Engine {
func (e *Engine) WithHandle(handle HandleFunc) {
e.handle = handle
return e
}

// Ready returns a ready channel that is closed once the engine has fully
Expand Down
34 changes: 23 additions & 11 deletions engine/execution/ingestion/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (e *Core) launchWorkerToExecuteBlocks(ctx irrecoverable.SignalerContext, re
}

func (e *Core) OnBlock(header *flow.Header, qc *flow.QuorumCertificate) {
e.log.Debug().
Hex("block_id", qc.BlockID[:]).Uint64("height", header.Height).
Msgf("received block")

// qc.Block is equivalent to header.ID()
err := e.throttle.OnBlock(qc.BlockID)
if err != nil {
Expand Down Expand Up @@ -234,7 +238,7 @@ func (e *Core) onProcessableBlock(blockID flow.Identifier) error {
}

if executed {
e.log.Debug().Msg("block has been executed already")
e.log.Debug().Hex("block_id", blockID[:]).Uint64("height", header.Height).Msg("block has been executed already")
return nil
}

Expand All @@ -248,7 +252,9 @@ func (e *Core) onProcessableBlock(blockID flow.Identifier) error {
return fmt.Errorf("failed to enqueue block %v: %w", blockID, err)
}

e.log.Debug().Int("executables", len(executables)).Msgf("executeConcurrently block is executable")
e.log.Debug().
Hex("block_id", blockID[:]).Uint64("height", header.Height).
Int("executables", len(executables)).Msgf("executeConcurrently block is executable")
e.executeConcurrently(executables)

err = e.fetch(missingColls)
Expand Down Expand Up @@ -365,12 +371,18 @@ func (e *Core) onBlockExecuted(
return fmt.Errorf("cannot persist execution state: %w", err)
}

e.log.Debug().Uint64("height", block.Block.Header.Height).Msgf("execution state saved")
blockID := block.ID()
lg := e.log.With().
Hex("block_id", blockID[:]).
Uint64("height", block.Block.Header.Height).
Logger()

lg.Debug().Msgf("execution state saved")

// must call OnBlockExecuted AFTER saving the execution result to storage
// because when enqueuing a block, we rely on execState.StateCommitmentByBlockID
// to determine whether a block has been executed or not.
executables, err := e.blockQueue.OnBlockExecuted(block.ID(), commit)
executables, err := e.blockQueue.OnBlockExecuted(blockID, commit)
if err != nil {
return fmt.Errorf("unexpected error while marking block as executed: %w", err)
}
Expand All @@ -382,9 +394,7 @@ func (e *Core) onBlockExecuted(
logs := e.eventConsumer.OnComputationResultSaved(ctx, computationResult)

receipt := computationResult.ExecutionReceipt
e.log.Info().
Hex("block_id", logging.Entity(block)).
Uint64("height", block.Block.Header.Height).
lg.Info().
Int("collections", len(block.CompleteCollections)).
Hex("parent_block", block.Block.Header.ParentID[:]).
Int("collections", len(block.Block.Payload.Guarantees)).
Expand All @@ -397,23 +407,24 @@ func (e *Core) onBlockExecuted(
Uint64("num_txs", nonSystemTransactionCount(receipt.ExecutionResult)).
Int64("timeSpentInMS", time.Since(startedAt).Milliseconds()).
Str("logs", logs). // broadcasted
Int("executables", len(executables)).
Msgf("block executed")

// we ensures that the child blocks are only executed after the execution result of
// its parent block has been successfully saved to storage.
// this ensures OnBlockExecuted would not be called with blocks in a wrong order, such as
// OnBlockExecuted(childBlock) being called before OnBlockExecuted(parentBlock).

e.log.Debug().Int("executables", len(executables)).Msgf("executeConcurrently: parent block is executed")
e.executeConcurrently(executables)

return nil
}

func (e *Core) onCollection(col *flow.Collection) error {
e.log.Info().
lg := e.log.With().
Hex("collection_id", logging.Entity(col)).
Msgf("handle collection")
Logger()
lg.Info().Msgf("handle collection")
// EN might request a collection from multiple collection nodes,
// therefore might receive multiple copies of the same collection.
// we only need to store it once.
Expand All @@ -433,7 +444,8 @@ func (e *Core) onCollection(col *flow.Collection) error {
return fmt.Errorf("unexpected error while adding collection to block queue")
}

e.log.Debug().Int("executables", len(executables)).Msgf("executeConcurrently: collection is handled, ready to execute block")
lg.Debug().
Int("executables", len(executables)).Msgf("executeConcurrently: collection is handled, ready to execute block")
e.executeConcurrently(executables)

return nil
Expand Down
5 changes: 4 additions & 1 deletion engine/execution/ingestion/loader/unfinalized_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ func NewUnfinalizedLoader(
// LoadUnexecuted loads all unfinalized and validated blocks
// any error returned are exceptions
func (e *UnfinalizedLoader) LoadUnexecuted(ctx context.Context) ([]flow.Identifier, error) {
lastExecuted := e.execState.GetHighestFinalizedExecuted()
lastExecuted, err := e.execState.GetHighestFinalizedExecuted()
if err != nil {
return nil, fmt.Errorf("could not get highest finalized executed: %w", err)
}

// get finalized height
finalized := e.state.Final()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestLoadingUnfinalizedBlocks(t *testing.T) {
require.NoError(t, ps.Finalize(blockC.ID()))

es := new(stateMock.FinalizedExecutionState)
es.On("GetHighestFinalizedExecuted").Return(genesis.Header.Height)
es.On("GetHighestFinalizedExecuted").Return(genesis.Header.Height, nil)
headers := new(storage.Headers)
headers.On("BlockIDByHeight", blockA.Header.Height).Return(blockA.Header.ID(), nil)
headers.On("BlockIDByHeight", blockB.Header.Height).Return(blockB.Header.ID(), nil)
Expand Down
166 changes: 166 additions & 0 deletions engine/execution/ingestion/machine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package ingestion

import (
"context"
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine/common/requester"
"github.com/onflow/flow-go/engine/execution"
"github.com/onflow/flow-go/engine/execution/computation"
"github.com/onflow/flow-go/engine/execution/ingestion/stop"
"github.com/onflow/flow-go/engine/execution/ingestion/uploader"
"github.com/onflow/flow-go/engine/execution/provider"
"github.com/onflow/flow-go/engine/execution/state"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/mempool/entity"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/events"
"github.com/onflow/flow-go/storage"
)

// Machine forwards blocks and collections to the core for processing.
type Machine struct {
events.Noop // satisfy protocol events consumer interface
log zerolog.Logger
core *Core
broadcaster provider.ProviderEngine
uploader *uploader.Manager
execState state.ExecutionState
computationManager computation.ComputationManager
}

type CollectionRequester interface {
WithHandle(requester.HandleFunc)
}

func NewMachine(
logger zerolog.Logger,
protocolEvents *events.Distributor,
collectionRequester CollectionRequester,

collectionFetcher CollectionFetcher,
headers storage.Headers,
blocks storage.Blocks,
collections storage.Collections,
execState state.ExecutionState,
state protocol.State,
metrics module.ExecutionMetrics,
computationManager computation.ComputationManager,
broadcaster provider.ProviderEngine,
uploader *uploader.Manager,
stopControl *stop.StopControl,
) (*Machine, module.ReadyDoneAware, error) {

e := &Machine{
log: logger.With().Str("engine", "ingestion_machine").Logger(),
broadcaster: broadcaster,
uploader: uploader,
execState: execState,
computationManager: computationManager,
}

throttle, err := NewBlockThrottle(
logger,
state,
execState,
headers,
DefaultCatchUpThreshold,
)

if err != nil {
return nil, nil, fmt.Errorf("failed to create block throttle: %w", err)
}

core, err := NewCore(
logger,
throttle,
execState,
stopControl,
headers,
blocks,
collections,
e,
collectionFetcher,
e,
)

if err != nil {
return nil, nil, fmt.Errorf("failed to create ingestion core: %w", err)
}

e.core = core

protocolEvents.AddConsumer(e)
collectionRequester.WithHandle(func(originID flow.Identifier, entity flow.Entity) {
collection, ok := entity.(*flow.Collection)
if !ok {
e.log.Error().Msgf("invalid entity type (%T)", entity)
return
}
e.core.OnCollection(collection)
})

return e, core, nil
}

// Protocol Events implementation
func (e *Machine) BlockProcessable(b *flow.Header, qc *flow.QuorumCertificate) {
e.core.OnBlock(b, qc)
}

// EventConsumer implementation
var _ EventConsumer = (*Machine)(nil)

func (e *Machine) BeforeComputationResultSaved(
ctx context.Context,
result *execution.ComputationResult,
) {
err := e.uploader.Upload(ctx, result)
if err != nil {
e.log.Err(err).Msg("error while uploading block")
// continue processing. uploads should not block execution
}
}

func (e *Machine) OnComputationResultSaved(
ctx context.Context,
result *execution.ComputationResult,
) string {
header := result.BlockExecutionResult.ExecutableBlock.Block.Header
broadcasted, err := e.broadcaster.BroadcastExecutionReceipt(
ctx, header.Height, result.ExecutionReceipt)
if err != nil {
e.log.Err(err).Msg("critical: failed to broadcast the receipt")
}
return fmt.Sprintf("broadcasted: %v", broadcasted)
}

// BlockExecutor implementation
var _ BlockExecutor = (*Machine)(nil)

func (e *Machine) ExecuteBlock(ctx context.Context, executableBlock *entity.ExecutableBlock) (*execution.ComputationResult, error) {
parentID := executableBlock.Block.Header.ParentID
parentErID, err := e.execState.GetExecutionResultID(ctx, parentID)
if err != nil {
return nil, fmt.Errorf("failed to get parent execution result ID %v: %w", parentID, err)
}

snapshot := e.execState.NewStorageSnapshot(*executableBlock.StartState,
executableBlock.Block.Header.ParentID,
executableBlock.Block.Header.Height-1,
)

computationResult, err := e.computationManager.ComputeBlock(
ctx,
parentErID,
executableBlock,
snapshot)
if err != nil {
return nil, fmt.Errorf("failed to compute block: %w", err)
}

return computationResult, nil
}
7 changes: 4 additions & 3 deletions engine/execution/ingestion/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ func NewBlockThrottle(
}

finalized := finalizedHead.Height
// TODO: implement GetHighestFinalizedExecuted for execution state when storehouse
// is not used
executed := execState.GetHighestFinalizedExecuted()
executed, err := execState.GetHighestFinalizedExecuted()
if err != nil {
return nil, fmt.Errorf("could not get highest finalized executed: %w", err)
}

if executed > finalized {
return nil, fmt.Errorf("executed finalized %v is greater than finalized %v", executed, finalized)
Expand Down
Loading

0 comments on commit e055e56

Please sign in to comment.