Skip to content

Commit

Permalink
Implement transaction coordinator
Browse files Browse the repository at this point in the history
This refactors block database and result collector into transaction coordinator.
This also adds synchronization functionality to be used for parallel execution.
  • Loading branch information
pattyshack committed Jun 6, 2023
1 parent ff3519c commit 12f898b
Show file tree
Hide file tree
Showing 6 changed files with 673 additions and 50 deletions.
71 changes: 26 additions & 45 deletions engine/execution/computation/computer/computer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package computer
import (
"context"
"fmt"
"time"

"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -15,7 +14,6 @@ import (
"github.com/onflow/flow-go/engine/execution/utils"
"github.com/onflow/flow-go/fvm"
"github.com/onflow/flow-go/fvm/blueprints"
"github.com/onflow/flow-go/fvm/storage"
"github.com/onflow/flow-go/fvm/storage/derived"
"github.com/onflow/flow-go/fvm/storage/logical"
"github.com/onflow/flow-go/fvm/storage/snapshot"
Expand All @@ -41,7 +39,7 @@ type collectionInfo struct {
isSystemTransaction bool
}

type transactionRequest struct {
type TransactionRequest struct {
collectionInfo

txnId flow.Identifier
Expand All @@ -62,11 +60,11 @@ func newTransactionRequest(
txnIndex uint32,
txnBody *flow.TransactionBody,
lastTransactionInCollection bool,
) transactionRequest {
) TransactionRequest {
txnId := txnBody.ID()
txnIdStr := txnId.String()

return transactionRequest{
return TransactionRequest{
collectionInfo: collection,
txnId: txnId,
txnIdStr: txnIdStr,
Expand Down Expand Up @@ -192,7 +190,7 @@ func (e *blockComputer) queueTransactionRequests(
blockHeader *flow.Header,
rawCollections []*entity.CompleteCollection,
systemTxnBody *flow.TransactionBody,
requestQueue chan transactionRequest,
requestQueue chan TransactionRequest,
) {
txnIndex := uint32(0)

Expand Down Expand Up @@ -318,7 +316,14 @@ func (e *blockComputer) executeBlock(
e.colResCons)
defer collector.Stop()

requestQueue := make(chan transactionRequest, numTxns)
requestQueue := make(chan TransactionRequest, numTxns)

database := newTransactionCoordinator(
e.vm,
baseSnapshot,
derivedBlockData,
collector)

e.queueTransactionRequests(
blockId,
blockIdStr,
Expand All @@ -328,14 +333,11 @@ func (e *blockComputer) executeBlock(
requestQueue)
close(requestQueue)

database := storage.NewBlockDatabase(baseSnapshot, 0, derivedBlockData)

for request := range requestQueue {
request.ctx.Logger.Info().Msg("executing transaction")
err := e.executeTransaction(
blockSpan,
database,
collector,
request)
if err != nil {
return nil, err
Expand All @@ -357,15 +359,13 @@ func (e *blockComputer) executeBlock(
}

func (e *blockComputer) executeTransaction(
parentSpan otelTrace.Span,
database *storage.BlockDatabase,
collector *resultCollector,
request transactionRequest,
blockSpan otelTrace.Span,
database *transactionCoordinator,
request TransactionRequest,
) error {
txn, err := e.executeTransactionInternal(
parentSpan,
blockSpan,
database,
collector,
request)
if err != nil {
prefix := ""
Expand Down Expand Up @@ -394,18 +394,15 @@ func (e *blockComputer) executeTransaction(
}

func (e *blockComputer) executeTransactionInternal(
parentSpan otelTrace.Span,
database *storage.BlockDatabase,
collector *resultCollector,
request transactionRequest,
blockSpan otelTrace.Span,
database *transactionCoordinator,
request TransactionRequest,
) (
storage.Transaction,
*transaction,
error,
) {
startedAt := time.Now()

txSpan := e.tracer.StartSampledSpanFromParent(
parentSpan,
blockSpan,
request.txnId,
trace.EXEComputeTransaction)
txSpan.SetAttributes(
Expand All @@ -417,22 +414,18 @@ func (e *blockComputer) executeTransactionInternal(

request.ctx = fvm.NewContextFromParent(request.ctx, fvm.WithSpan(txSpan))

txn, err := database.NewTransaction(
request.ExecutionTime(),
fvm.ProcedureStateParameters(request.ctx, request))
txn, err := database.NewTransaction(request)
if err != nil {
return nil, err
}
defer txn.Cleanup()

executor := e.vm.NewExecutor(request.ctx, request.TransactionProcedure, txn)
defer executor.Cleanup()

err = executor.Preprocess()
err = txn.Preprocess()
if err != nil {
return txn, err
}

err = executor.Execute()
err = txn.Execute()
if err != nil {
return txn, err
}
Expand All @@ -442,17 +435,5 @@ func (e *blockComputer) executeTransactionInternal(
return txn, err
}

executionSnapshot, err := txn.Commit()
if err != nil {
return txn, err
}

output := executor.Output()
collector.AddTransactionResult(
request,
executionSnapshot,
output,
time.Since(startedAt))

return txn, nil
return txn, txn.Commit()
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions engine/execution/computation/computer/result_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ViewCommitter interface {
}

type transactionResult struct {
transactionRequest
TransactionRequest
*snapshot.ExecutionSnapshot
fvm.ProcedureOutput
timeSpent time.Duration
Expand Down Expand Up @@ -216,7 +216,7 @@ func (collector *resultCollector) commitCollection(
}

func (collector *resultCollector) processTransactionResult(
txn transactionRequest,
txn TransactionRequest,
txnExecutionSnapshot *snapshot.ExecutionSnapshot,
output fvm.ProcedureOutput,
timeSpent time.Duration,
Expand Down Expand Up @@ -296,13 +296,13 @@ func (collector *resultCollector) processTransactionResult(
}

func (collector *resultCollector) AddTransactionResult(
request transactionRequest,
request TransactionRequest,
snapshot *snapshot.ExecutionSnapshot,
output fvm.ProcedureOutput,
timeSpent time.Duration,
) {
result := transactionResult{
transactionRequest: request,
TransactionRequest: request,
ExecutionSnapshot: snapshot,
ProcedureOutput: output,
timeSpent: timeSpent,
Expand All @@ -321,7 +321,7 @@ func (collector *resultCollector) runResultProcessor() {

for result := range collector.processorInputChan {
err := collector.processTransactionResult(
result.transactionRequest,
result.TransactionRequest,
result.ExecutionSnapshot,
result.ProcedureOutput,
result.timeSpent)
Expand Down
Loading

0 comments on commit 12f898b

Please sign in to comment.