Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Aug 12, 2024
1 parent 2c90ca9 commit e598efd
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 68 deletions.
14 changes: 3 additions & 11 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,22 +1150,14 @@ func (exeNode *ExecutionNode) LoadScriptsEngine(node *NodeConfig) (module.ReadyD
func (exeNode *ExecutionNode) LoadTransactionExecutionMetrics(
node *NodeConfig,
) (module.ReadyDoneAware, error) {
latestFinalizedBlock, err := node.State.Final().Head()
if err != nil {
return nil, fmt.Errorf("could not get latest finalized block: %w", err)
}

// buffer size is the number of blocks that are kept in memory by the metrics provider
// If the size is to small the clients might not have the opportunity to get the metrics for all blocks
// If the size is too large the memory usage will increase
bufferSize := uint(200)
lastFinalizedHeader := node.LastFinalizedHeader

metricsProvider := txmetrics.NewTransactionExecutionMetricsProvider(
node.Logger,
exeNode.executionState,
node.Storage.Headers,
latestFinalizedBlock,
bufferSize,
lastFinalizedHeader,
exeNode.exeConf.transactionExecutionMetricsBufferSize,
)

node.ProtocolEvents.AddConsumer(metricsProvider)
Expand Down
62 changes: 32 additions & 30 deletions cmd/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,37 @@ import (

// ExecutionConfig contains the configs for starting up execution nodes
type ExecutionConfig struct {
rpcConf rpc.Config
triedir string
executionDataDir string
registerDir string
mTrieCacheSize uint32
transactionResultsCacheSize uint
checkpointDistance uint
checkpointsToKeep uint
chunkDataPackDir string
chunkDataPackCacheSize uint
chunkDataPackRequestsCacheSize uint32
requestInterval time.Duration
extensiveLog bool
pauseExecution bool
chunkDataPackQueryTimeout time.Duration
chunkDataPackDeliveryTimeout time.Duration
enableBlockDataUpload bool
gcpBucketName string
s3BucketName string
apiRatelimits map[string]int
apiBurstlimits map[string]int
executionDataAllowedPeers string
executionDataPrunerHeightRangeTarget uint64
executionDataPrunerThreshold uint64
blobstoreRateLimit int
blobstoreBurstLimit int
chunkDataPackRequestWorkers uint
maxGracefulStopDuration time.Duration
importCheckpointWorkerCount int
transactionExecutionMetricsEnabled bool
rpcConf rpc.Config
triedir string
executionDataDir string
registerDir string
mTrieCacheSize uint32
transactionResultsCacheSize uint
checkpointDistance uint
checkpointsToKeep uint
chunkDataPackDir string
chunkDataPackCacheSize uint
chunkDataPackRequestsCacheSize uint32
requestInterval time.Duration
extensiveLog bool
pauseExecution bool
chunkDataPackQueryTimeout time.Duration
chunkDataPackDeliveryTimeout time.Duration
enableBlockDataUpload bool
gcpBucketName string
s3BucketName string
apiRatelimits map[string]int
apiBurstlimits map[string]int
executionDataAllowedPeers string
executionDataPrunerHeightRangeTarget uint64
executionDataPrunerThreshold uint64
blobstoreRateLimit int
blobstoreBurstLimit int
chunkDataPackRequestWorkers uint
maxGracefulStopDuration time.Duration
importCheckpointWorkerCount int
transactionExecutionMetricsEnabled bool
transactionExecutionMetricsBufferSize uint

// evm tracing configuration
evmTracingEnabled bool
Expand Down Expand Up @@ -124,6 +125,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.DurationVar(&exeConf.maxGracefulStopDuration, "max-graceful-stop-duration", stop.DefaultMaxGracefulStopDuration, "the maximum amount of time stop control will wait for ingestion engine to gracefully shutdown before crashing")
flags.IntVar(&exeConf.importCheckpointWorkerCount, "import-checkpoint-worker-count", 10, "number of workers to import checkpoint file during bootstrap")
flags.BoolVar(&exeConf.transactionExecutionMetricsEnabled, "tx-execution-metrics", true, "enable collection of transaction execution metrics")
flags.UintVar(&exeConf.transactionExecutionMetricsBufferSize, "tx-execution-metrics-buffer-size", 200, "buffer size for transaction execution metrics. The buffer size is the number of blocks that are kept in memory by the metrics provider engine")
flags.BoolVar(&exeConf.evmTracingEnabled, "evm-tracing-enabled", false, "enable EVM tracing, when set it will generate traces and upload them to the GCP bucket provided by the --evm-traces-gcp-bucket. Warning: this might affect speed of execution")
flags.StringVar(&exeConf.evmTracesGCPBucket, "evm-traces-gcp-bucket", "", "define GCP bucket name used for uploading EVM traces, must be used in combination with --evm-tracing-enabled.")

Expand Down
29 changes: 14 additions & 15 deletions engine/execution/computation/metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,22 @@ type collector struct {

collection chan metrics

mu *sync.Mutex
mu sync.Mutex

latestHeight uint64
blocksAtHeight map[uint64]map[flow.Identifier]struct{}
metrics map[flow.Identifier][]TransactionExecutionMetrics
lowestAvailableHeight uint64
blocksAtHeight map[uint64]map[flow.Identifier]struct{}
metrics map[flow.Identifier][]TransactionExecutionMetrics
}

func newCollector(
log zerolog.Logger,
latestHeight uint64,
lowestAvailableHeight uint64,
) *collector {
return &collector{
log: log,
latestHeight: latestHeight,
log: log,
lowestAvailableHeight: lowestAvailableHeight,

collection: make(chan metrics, 1000),
mu: &sync.Mutex{},
blocksAtHeight: make(map[uint64]map[flow.Identifier]struct{}),
metrics: make(map[flow.Identifier][]TransactionExecutionMetrics),
}
Expand Down Expand Up @@ -80,10 +79,10 @@ func (c *collector) collect(
c.mu.Lock()
defer c.mu.Unlock()

if blockHeight <= c.latestHeight {
if blockHeight <= c.lowestAvailableHeight {
c.log.Warn().
Uint64("height", blockHeight).
Uint64("latestHeight", c.latestHeight).
Uint64("lowestAvailableHeight", c.lowestAvailableHeight).
Msg("received metrics for a block that is older or equal than the most recent block")
return
}
Expand All @@ -101,7 +100,7 @@ func (c *collector) Pop(height uint64, blockID flow.Identifier) []TransactionExe
c.mu.Lock()
defer c.mu.Unlock()

if height <= c.latestHeight && c.latestHeight != 0 {
if height <= c.lowestAvailableHeight && c.lowestAvailableHeight != 0 {
c.log.Warn().
Uint64("height", height).
Stringer("blockID", blockID).
Expand All @@ -119,12 +118,12 @@ func (c *collector) Pop(height uint64, blockID flow.Identifier) []TransactionExe
// advanceTo moves the latest height to the given height
// all data at lower heights will be deleted
func (c *collector) advanceTo(height uint64) {
for c.latestHeight < height {
c.latestHeight++
blocks := c.blocksAtHeight[c.latestHeight]
for c.lowestAvailableHeight < height {
c.lowestAvailableHeight++
blocks := c.blocksAtHeight[c.lowestAvailableHeight]
for block := range blocks {
delete(c.metrics, block)
}
delete(c.blocksAtHeight, c.latestHeight)
delete(c.blocksAtHeight, c.lowestAvailableHeight)
}
}
4 changes: 2 additions & 2 deletions engine/execution/computation/metrics/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func (p *provider) Push(
if height <= p.blockHeightAtBufferIndex {
p.log.Warn().
Uint64("height", height).
Uint64("latestHeight", p.blockHeightAtBufferIndex).
Uint64("lowestAvailableHeight", p.blockHeightAtBufferIndex).
Msg("received metrics for a block that is older or equal than the most recent block")
return
}
if height > p.blockHeightAtBufferIndex+1 {
p.log.Warn().
Uint64("height", height).
Uint64("latestHeight", p.blockHeightAtBufferIndex).
Uint64("lowestAvailableHeight", p.blockHeightAtBufferIndex).
Msg("received metrics for a block that is not the next block")

// Fill in the gap with nil
Expand Down
1 change: 1 addition & 0 deletions engine/execution/computation/metrics/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,6 @@ func Test_ProviderPushOutOfSequence(t *testing.T) {

require.Len(t, res, int(bufferSize))

require.Nil(t, res[newHeight+1])
require.Equal(t, time.Duration(newHeight+2), res[newHeight+2][0].ExecutionTime)
}
10 changes: 0 additions & 10 deletions module/metrics/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,16 +1099,6 @@ func (ec *ExecutionCollector) WithTransactionCallback(
module.TransactionExecutionResultInfo,
),
) *ExecutionCollectorWithTransactionCallback {
// if callback is nil, use a no-op callback
if callback == nil {
callback = func(
time.Duration,
module.TransactionExecutionResultStats,
module.TransactionExecutionResultInfo,
) {
}
}

return &ExecutionCollectorWithTransactionCallback{
ExecutionCollector: ec,
TransactionCallback: callback,
Expand Down

0 comments on commit e598efd

Please sign in to comment.