diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 2f1795cd8e0..cf083d1a451 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -49,6 +49,7 @@ import ( "github.com/onflow/flow-go/engine/execution/checker" "github.com/onflow/flow-go/engine/execution/computation" "github.com/onflow/flow-go/engine/execution/computation/committer" + txmetrics "github.com/onflow/flow-go/engine/execution/computation/metrics" "github.com/onflow/flow-go/engine/execution/ingestion" "github.com/onflow/flow-go/engine/execution/ingestion/fetcher" "github.com/onflow/flow-go/engine/execution/ingestion/loader" @@ -127,7 +128,7 @@ type ExecutionNode struct { ingestionUnit *engine.Unit - collector module.ExecutionMetrics + collector *metrics.ExecutionCollector executionState state.ExecutionState followerState protocol.FollowerState committee hotstuff.DynamicCommittee @@ -160,6 +161,7 @@ type ExecutionNode struct { executionDataTracker tracker.Storage blobService network.BlobService blobserviceDependable *module.ProxiedReadyDoneAware + metricsProvider txmetrics.TransactionExecutionMetricsProvider } func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() { @@ -228,6 +230,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() { Component("block data upload manager", exeNode.LoadBlockUploaderManager). Component("GCP block data uploader", exeNode.LoadGCPBlockDataUploader). Component("S3 block data uploader", exeNode.LoadS3BlockDataUploader). + Component("transaction execution metrics", exeNode.LoadTransactionExecutionMetrics). Component("provider engine", exeNode.LoadProviderEngine). Component("checker engine", exeNode.LoadCheckerEngine). Component("ingestion engine", exeNode.LoadIngestionEngine). @@ -544,10 +547,23 @@ func (exeNode *ExecutionNode) LoadProviderEngine( vmCtx := fvm.NewContext(opts...) + collector := exeNode.collector.WithTransactionCallback( + func(dur time.Duration, stats module.TransactionExecutionResultStats, info module.TransactionExecutionResultInfo) { + exeNode.metricsProvider.Collect( + info.BlockID, + info.BlockHeight, + txmetrics.TransactionExecutionMetrics{ + TransactionID: info.TransactionID, + ExecutionTime: dur, + ExecutionEffortWeights: stats.ComputationIntensities, + }) + }) + ledgerViewCommitter := committer.NewLedgerViewCommitter(exeNode.ledgerStorage, node.Tracer) manager, err := computation.New( node.Logger, - exeNode.collector, + // todo inject metrics for computation intensities + collector, node.Tracer, node.Me, node.State, @@ -1127,6 +1143,26 @@ func (exeNode *ExecutionNode) LoadScriptsEngine(node *NodeConfig) (module.ReadyD return exeNode.scriptsEng, nil } +func (exeNode *ExecutionNode) LoadTransactionExecutionMetrics( + node *NodeConfig, +) (module.ReadyDoneAware, error) { + latestFinalizedBlock, err := node.State.Final().Head() + if err != nil { + return nil, fmt.Errorf("could not get latest finalized block: %w", err) + } + + metricsProvider := txmetrics.NewTransactionExecutionMetricsProvider( + node.Logger, + exeNode.executionState, + node.Storage.Headers, + latestFinalizedBlock, + 0, + ) + node.ProtocolEvents.AddConsumer(metricsProvider) + exeNode.metricsProvider = metricsProvider + return metricsProvider, nil +} + func (exeNode *ExecutionNode) LoadConsensusCommittee( node *NodeConfig, ) ( diff --git a/engine/execution/computation/computer/computer.go b/engine/execution/computation/computer/computer.go index 0d73645f6a7..a6bcc074014 100644 --- a/engine/execution/computation/computer/computer.go +++ b/engine/execution/computation/computer/computer.go @@ -255,7 +255,6 @@ func (e *blockComputer) queueTransactionRequests( i == len(collection.Transactions)-1) txnIndex += 1 } - } systemCtx := fvm.NewContextFromParent( diff --git a/engine/execution/computation/metrics/collector.go b/engine/execution/computation/metrics/collector.go new file mode 100644 index 00000000000..b6a022bde5d --- /dev/null +++ b/engine/execution/computation/metrics/collector.go @@ -0,0 +1,130 @@ +package metrics + +import ( + "sync" + + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" +) + +type collector struct { + log zerolog.Logger + + collection chan metrics + + mu *sync.Mutex + + latestHeight uint64 + blocksAtHeight map[uint64]map[flow.Identifier]struct{} + metrics map[flow.Identifier][]TransactionExecutionMetrics +} + +func newCollector( + log zerolog.Logger, + latestHeight uint64, +) *collector { + return &collector{ + log: log, + latestHeight: latestHeight, + + collection: make(chan metrics, 1000), + mu: &sync.Mutex{}, + blocksAtHeight: make(map[uint64]map[flow.Identifier]struct{}), + metrics: make(map[flow.Identifier][]TransactionExecutionMetrics), + } +} + +// Collect should never block because it's called from the execution +func (c *collector) Collect( + blockId flow.Identifier, + blockHeight uint64, + t TransactionExecutionMetrics, +) { + select { + case c.collection <- metrics{ + TransactionExecutionMetrics: t, + blockHeight: blockHeight, + blockId: blockId, + }: + default: + c.log.Warn(). + Uint64("height", blockHeight). + Msg("dropping metrics because the collection channel is full") + } +} + +func (c *collector) metricsCollectorWorker( + ctx irrecoverable.SignalerContext, + ready component.ReadyFunc, +) { + ready() + + for { + select { + case <-ctx.Done(): + return + case m := <-c.collection: + c.collect(m.blockId, m.blockHeight, m.TransactionExecutionMetrics) + } + } +} + +func (c *collector) collect( + blockId flow.Identifier, + blockHeight uint64, + t TransactionExecutionMetrics, +) { + c.mu.Lock() + defer c.mu.Unlock() + + if blockHeight <= c.latestHeight { + c.log.Warn(). + Uint64("height", blockHeight). + Uint64("latestHeight", c.latestHeight). + Msg("received metrics for a block that is older or equal than the most recent block") + return + } + + if _, ok := c.blocksAtHeight[blockHeight]; !ok { + c.blocksAtHeight[blockHeight] = make(map[flow.Identifier]struct{}) + } + c.blocksAtHeight[blockHeight][blockId] = struct{}{} + c.metrics[blockId] = append(c.metrics[blockId], t) +} + +// Pop returns the metrics for the given block at the given height +// and clears all data up to the given height. +func (c *collector) Pop(height uint64, block flow.Identifier) []TransactionExecutionMetrics { + c.mu.Lock() + defer c.mu.Unlock() + + if height <= c.latestHeight && c.latestHeight != 0 { + c.log.Warn(). + Uint64("height", height). + Stringer("block", block). + Msg("requested metrics for a block that is older or equal than the most recent block") + return nil + } + + metrics := c.metrics[block] + + c.advanceTo(height) + + return metrics +} + +// advanceTo moves the latest height to the given height +// all data at lower heights will be deleted +func (c *collector) advanceTo(height uint64) { + for c.latestHeight < height { + c.latestHeight++ + blocks := c.blocksAtHeight[c.latestHeight] + for block := range blocks { + delete(c.metrics, block) + } + delete(c.blocksAtHeight, c.latestHeight) + } +} diff --git a/engine/execution/computation/metrics/collector_test.go b/engine/execution/computation/metrics/collector_test.go new file mode 100644 index 00000000000..3f103c9638a --- /dev/null +++ b/engine/execution/computation/metrics/collector_test.go @@ -0,0 +1,82 @@ +package metrics + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/irrecoverable" +) + +func Test_CollectorPopOnEmpty(t *testing.T) { + t.Parallel() + + log := zerolog.New(zerolog.NewTestWriter(t)) + latestHeight := uint64(100) + + collector := newCollector(log, latestHeight) + + data := collector.Pop(latestHeight, flow.ZeroID) + require.Nil(t, data) +} + +func Test_CollectorCollection(t *testing.T) { + log := zerolog.New(zerolog.NewTestWriter(t)) + latestHeight := uint64(100) + + collector := newCollector(log, latestHeight) + + ctx := context.Background() + go func() { + ictx := irrecoverable.NewMockSignalerContext(t, ctx) + collector.metricsCollectorWorker(ictx, func() {}) + }() + + wg := sync.WaitGroup{} + + wg.Add(16 * 16 * 16) + for h := 0; h < 16; h++ { + for b := 0; b < 16; b++ { + for t := 0; t < 16; t++ { + go func(h, b, t int) { + defer wg.Done() + + block := flow.Identifier{} + // 4 different blocks per height + block[0] = byte(h) + block[1] = byte(b) + + collector.Collect(block, latestHeight+1+uint64(h), TransactionExecutionMetrics{ + ExecutionTime: time.Duration(b + t), + }) + }(h, b, t) + } + // wait a bit for the collector to process the data + <-time.After(1 * time.Millisecond) + } + } + + wg.Wait() + // wait a bit for the collector to process the data + <-time.After(10 * time.Millisecond) + + for h := 0; h < 16; h++ { + block := flow.Identifier{} + block[0] = byte(h) + + data := collector.Pop(latestHeight+1+uint64(h), block) + + require.Len(t, data, 16) + } + + data := collector.Pop(latestHeight, flow.ZeroID) + require.Nil(t, data) + + data = collector.Pop(latestHeight+17, flow.ZeroID) + require.Nil(t, data) +} diff --git a/engine/execution/computation/metrics/provider.go b/engine/execution/computation/metrics/provider.go new file mode 100644 index 00000000000..59b15090758 --- /dev/null +++ b/engine/execution/computation/metrics/provider.go @@ -0,0 +1,116 @@ +package metrics + +import ( + "fmt" + "sync" + + "github.com/rs/zerolog" +) + +// provider is responsible for providing the metrics for the rpc endpoint +// it has a circular buffer of metrics for the last N finalized and executed blocks. +type provider struct { + log zerolog.Logger + + mu *sync.RWMutex + + bufferSize uint + bufferIndex uint + blockHeightAtBufferIndex uint64 + + buffer [][]TransactionExecutionMetrics +} + +func newProvider( + log zerolog.Logger, + bufferSize uint, + blockHeightAtBufferIndex uint64, +) *provider { + return &provider{ + mu: &sync.RWMutex{}, + log: log, + bufferSize: bufferSize, + blockHeightAtBufferIndex: blockHeightAtBufferIndex, + bufferIndex: 0, + buffer: make([][]TransactionExecutionMetrics, bufferSize), + } +} + +func (p *provider) Push( + height uint64, + data []TransactionExecutionMetrics, +) { + p.mu.Lock() + defer p.mu.Unlock() + + if height <= p.blockHeightAtBufferIndex { + p.log.Warn(). + Uint64("height", height). + Uint64("latestHeight", p.blockHeightAtBufferIndex). + Msg("received metrics for a block that is older or equal than the most recent block") + return + } + if height > p.blockHeightAtBufferIndex+1 { + p.log.Warn(). + Uint64("height", height). + Uint64("latestHeight", p.blockHeightAtBufferIndex). + Msg("received metrics for a block that is not the next block") + + // Fill in the gap with nil + for i := p.blockHeightAtBufferIndex; i < height-1; i++ { + p.pushData(nil) + } + } + + p.pushData(data) +} + +func (p *provider) pushData(data []TransactionExecutionMetrics) { + p.bufferIndex = (p.bufferIndex + 1) % p.bufferSize + p.blockHeightAtBufferIndex++ + p.buffer[p.bufferIndex] = data +} + +func (p *provider) GetTransactionExecutionMetricsAfter(height uint64) (GetTransactionExecutionMetricsAfterResponse, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + data := make(map[uint64][]TransactionExecutionMetrics) + + if height+1 > p.blockHeightAtBufferIndex { + return data, nil + } + + // start index is the lowest block height that is in the buffer + startIndex := uint64(0) + if p.blockHeightAtBufferIndex < uint64(p.bufferSize) { + startIndex = 0 + } else { + startIndex = p.blockHeightAtBufferIndex - uint64(p.bufferSize) + } + + // if the starting index is lower than the height we only need to return the data for + // the blocks that are later than the given height + if height+1 > startIndex { + startIndex = height + 1 + } + + for i := startIndex; i <= p.blockHeightAtBufferIndex; i++ { + // 0 <= diff + diff := uint(p.blockHeightAtBufferIndex - i) + + // 0 <= diff < bufferSize + // we add bufferSize to avoid negative values + index := (p.bufferIndex + (p.bufferSize - diff)) % p.bufferSize + d := p.buffer[index] + if len(d) == 0 { + continue + } + + data[i] = p.buffer[index] + } + + return data, nil +} + +var NoTransactionExecutionMetricsError = fmt.Errorf("no transaction execution metrics available") diff --git a/engine/execution/computation/metrics/provider_test.go b/engine/execution/computation/metrics/provider_test.go new file mode 100644 index 00000000000..a71f9adfc6d --- /dev/null +++ b/engine/execution/computation/metrics/provider_test.go @@ -0,0 +1,110 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func Test_ProviderGetOnEmpty(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + for i := 0; uint(i) < bufferSize; i++ { + data, err := provider.GetTransactionExecutionMetricsAfter(height - uint64(i)) + require.NoError(t, err) + require.Len(t, data, 0) + } +} + +func Test_ProviderGetOutOfBounds(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + res, err := provider.GetTransactionExecutionMetricsAfter(height + 1) + require.NoError(t, err) + require.Len(t, res, 0) +} + +func Test_ProviderPushSequential(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + for i := 0; uint(i) < bufferSize; i++ { + data := []TransactionExecutionMetrics{ + { + // Execution time is our label + ExecutionTime: time.Duration(i), + }, + } + + provider.Push(height+uint64(i)+1, data) + } + + data, err := provider.GetTransactionExecutionMetricsAfter(height) + require.Nil(t, err) + for i := 0; uint(i) < bufferSize; i++ { + require.Equal(t, time.Duration(uint(i)), data[height+uint64(i)+1][0].ExecutionTime) + } +} + +func Test_ProviderPushOutOfSequence(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + for i := 0; uint(i) < bufferSize; i++ { + data := []TransactionExecutionMetrics{ + { + ExecutionTime: time.Duration(i), + }, + } + + provider.Push(height+uint64(i)+1, data) + } + + newHeight := height + uint64(bufferSize) + + // Push out of sequence + data := []TransactionExecutionMetrics{ + { + ExecutionTime: time.Duration(newHeight + 2), + }, + } + + // no-op + provider.Push(newHeight, data) + + // skip 1 + provider.Push(newHeight+2, data) + + res, err := provider.GetTransactionExecutionMetricsAfter(height) + require.NoError(t, err) + + require.Len(t, res, int(bufferSize)) + + for i := 0; i < int(bufferSize); i++ { + require.Equal(t, time.Duration(i), res[height+uint64(i)+1][0].ExecutionTime) + } +} diff --git a/engine/execution/computation/metrics/transaction_execution_metrics.go b/engine/execution/computation/metrics/transaction_execution_metrics.go new file mode 100644 index 00000000000..7e5fe207bad --- /dev/null +++ b/engine/execution/computation/metrics/transaction_execution_metrics.go @@ -0,0 +1,170 @@ +package metrics + +import ( + "time" + + cadenceCommon "github.com/onflow/cadence/runtime/common" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/engine/execution/state" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/state/protocol" + psEvents "github.com/onflow/flow-go/state/protocol/events" + "github.com/onflow/flow-go/storage" +) + +type TransactionExecutionMetricsProvider interface { + component.Component + protocol.Consumer + + // GetTransactionExecutionMetricsAfter returns the transaction metrics for all blocks higher than the given height + // It returns a map of block height to a list of transaction execution metrics + // Blocks that are out of scope (only a limited number blocks are kept in memory) are not returned + GetTransactionExecutionMetricsAfter(height uint64) (GetTransactionExecutionMetricsAfterResponse, error) + + // Collect the transaction metrics for the given block + // Collect does not block, it returns immediately + Collect( + blockId flow.Identifier, + blockHeight uint64, + t TransactionExecutionMetrics, + ) +} + +// GetTransactionExecutionMetricsAfterResponse is the response type for GetTransactionExecutionMetricsAfter +// It is a map of block height to a list of transaction execution metrics +type GetTransactionExecutionMetricsAfterResponse = map[uint64][]TransactionExecutionMetrics + +type TransactionExecutionMetrics struct { + TransactionID flow.Identifier + ExecutionTime time.Duration + ExecutionEffortWeights map[cadenceCommon.ComputationKind]uint +} + +type metrics struct { + TransactionExecutionMetrics + blockHeight uint64 + blockId flow.Identifier +} + +// transactionExecutionMetricsProvider is responsible for providing the metrics for the rpc endpoint. +// It has a circular buffer of metrics for the last N finalized and executed blocks. +// The metrics are not guaranteed to be available for all blocks. If the node is just starting up or catching up +// to the latest finalized block, some blocks may not have metrics available. +// The metrics are intended to be used for monitoring and analytics purposes. +type transactionExecutionMetricsProvider struct { + // collector is responsible for collecting the metrics + // the collector collects the metrics from the execution during block execution + // on a finalized and executed block, the metrics are moved to the provider, + // all non-finalized metrics for that height are discarded + *collector + + // provider is responsible for providing the metrics for the rpc endpoint + // it has a circular buffer of metrics for the last N finalized and executed blocks. + *provider + + component.Component + // transactionExecutionMetricsProvider needs to consume BlockFinalized events. + psEvents.Noop + + log zerolog.Logger + + executionState state.FinalizedExecutionState + headers storage.Headers + blockFinalized chan struct{} + + latestFinalizedAndExecuted *flow.Header +} + +var _ TransactionExecutionMetricsProvider = (*transactionExecutionMetricsProvider)(nil) + +func NewTransactionExecutionMetricsProvider( + log zerolog.Logger, + executionState state.FinalizedExecutionState, + headers storage.Headers, + latestFinalizedBlock *flow.Header, + bufferSize uint, +) TransactionExecutionMetricsProvider { + log = log.With().Str("component", "transaction_execution_metrics_provider").Logger() + + collector := newCollector(log, latestFinalizedBlock.Height) + provider := newProvider(log, bufferSize, latestFinalizedBlock.Height) + + p := &transactionExecutionMetricsProvider{ + collector: collector, + provider: provider, + log: log, + executionState: executionState, + headers: headers, + blockFinalized: make(chan struct{}), + latestFinalizedAndExecuted: latestFinalizedBlock, + } + + cm := component.NewComponentManagerBuilder() + cm.AddWorker(collector.metricsCollectorWorker) + cm.AddWorker(p.blockFinalizedWorker) + + p.Component = cm.Build() + + return p +} + +func (p *transactionExecutionMetricsProvider) BlockFinalized(*flow.Header) { + // only handle a single finalized event at a time + select { + case p.blockFinalized <- struct{}{}: + default: + } +} + +// move data from the collector to the provider +func (p *transactionExecutionMetricsProvider) onBlockExecutedAndFinalized(block flow.Identifier, height uint64) { + data := p.collector.Pop(height, block) + p.provider.Push(height, data) +} + +func (p *transactionExecutionMetricsProvider) blockFinalizedWorker( + ctx irrecoverable.SignalerContext, + ready component.ReadyFunc, +) { + ready() + + for { + select { + case <-ctx.Done(): + return + case <-p.blockFinalized: + p.onExecutedAndFinalized() + } + } +} + +func (p *transactionExecutionMetricsProvider) onExecutedAndFinalized() { + latestFinalizedAndExecutedHeight, err := p.executionState.GetHighestFinalizedExecuted() + + if err != nil { + p.log.Warn().Err(err).Msg("could not get highest finalized executed") + return + } + + // the latest finalized and executed block could be more than one block further than the last one handled + // step through all blocks between the last one handled and the latest finalized and executed + for height := p.latestFinalizedAndExecuted.Height + 1; height <= latestFinalizedAndExecutedHeight; height++ { + header, err := p.headers.ByHeight(height) + if err != nil { + p.log.Warn(). + Err(err). + Uint64("height", height). + Msg("could not get header by height") + return + } + + p.onBlockExecutedAndFinalized(header.ID(), height) + + if header.Height == latestFinalizedAndExecutedHeight { + p.latestFinalizedAndExecuted = header + } + } +} diff --git a/module/metrics/execution.go b/module/metrics/execution.go index 67a5aec84f3..add4615e4d3 100644 --- a/module/metrics/execution.go +++ b/module/metrics/execution.go @@ -773,6 +773,7 @@ func (ec *ExecutionCollector) ExecutionTransactionExecuted( if stats.Failed { ec.totalFailedTransactionsCounter.Inc() } + } // ExecutionChunkDataPackGenerated reports stats on chunk data pack generation @@ -973,3 +974,44 @@ func (ec *ExecutionCollector) ExecutionComputationResultUploaded() { func (ec *ExecutionCollector) ExecutionComputationResultUploadRetried() { ec.computationResultUploadRetriedCount.Inc() } + +type ExecutionCollectorWithTransactionCallback struct { + *ExecutionCollector + TransactionCallback func( + dur time.Duration, + stats module.TransactionExecutionResultStats, + info module.TransactionExecutionResultInfo, + ) +} + +func (ec *ExecutionCollector) WithTransactionCallback( + callback func( + time.Duration, + module.TransactionExecutionResultStats, + module.TransactionExecutionResultInfo, + ), +) *ExecutionCollectorWithTransactionCallback { + // if callback is nil, use a no-op callback + if callback == nil { + callback = func( + time.Duration, + module.TransactionExecutionResultStats, + module.TransactionExecutionResultInfo, + ) { + } + } + + return &ExecutionCollectorWithTransactionCallback{ + ExecutionCollector: ec, + TransactionCallback: callback, + } +} + +func (ec *ExecutionCollectorWithTransactionCallback) ExecutionTransactionExecuted( + dur time.Duration, + stats module.TransactionExecutionResultStats, + info module.TransactionExecutionResultInfo, +) { + ec.ExecutionCollector.ExecutionTransactionExecuted(dur, stats, info) + ec.TransactionCallback(dur, stats, info) +}