Skip to content

Commit

Permalink
Merge #4054
Browse files Browse the repository at this point in the history
4054: Replace View with SnapshotTree as storage representation r=pattyshack a=pattyshack

This simplify parallel execution (the tree is immutable and the list of write sets can be used for OCC validation), but at the expense of less efficient value lookups (the cost is amortized by compaction).

Co-authored-by: Patrick Lee <patrick.lee@dapperlabs.com>
  • Loading branch information
bors[bot] and pattyshack authored Mar 22, 2023
2 parents 1f96328 + 33eea11 commit d352e86
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 99 deletions.
68 changes: 34 additions & 34 deletions engine/execution/computation/computer/computer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
"github.com/onflow/flow-go/crypto/hash"
"github.com/onflow/flow-go/engine/execution"
"github.com/onflow/flow-go/engine/execution/computation/result"
"github.com/onflow/flow-go/engine/execution/state/delta"
"github.com/onflow/flow-go/engine/execution/utils"
"github.com/onflow/flow-go/fvm"
"github.com/onflow/flow-go/fvm/blueprints"
"github.com/onflow/flow-go/fvm/derived"
"github.com/onflow/flow-go/fvm/state"
"github.com/onflow/flow-go/fvm/storage"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/executiondatasync/provider"
Expand Down Expand Up @@ -272,7 +272,7 @@ func (e *blockComputer) executeBlock(
ctx context.Context,
parentBlockExecutionResultID flow.Identifier,
block *entity.ExecutableBlock,
snapshot state.StorageSnapshot,
baseSnapshot state.StorageSnapshot,
derivedBlockData *derived.DerivedBlockData,
) (
*execution.ComputationResult,
Expand Down Expand Up @@ -311,9 +311,13 @@ func (e *blockComputer) executeBlock(
e.colResCons)
defer collector.Stop()

stateView := delta.NewDeltaView(snapshot)
snapshotTree := storage.NewSnapshotTree(baseSnapshot)
for _, txn := range transactions {
err := e.executeTransaction(blockSpan, txn, stateView, collector)
txnExecutionSnapshot, output, err := e.executeTransaction(
blockSpan,
txn,
snapshotTree,
collector)
if err != nil {
prefix := ""
if txn.isSystemTransaction {
Expand All @@ -326,6 +330,9 @@ func (e *blockComputer) executeBlock(
txn.txnIndex,
err)
}

collector.AddTransactionResult(txn, txnExecutionSnapshot, output)
snapshotTree = snapshotTree.Append(txnExecutionSnapshot)
}

res, err := collector.Finalize(ctx)
Expand All @@ -345,9 +352,13 @@ func (e *blockComputer) executeBlock(
func (e *blockComputer) executeTransaction(
parentSpan otelTrace.Span,
txn transaction,
stateView state.View,
storageSnapshot state.StorageSnapshot,
collector *resultCollector,
) error {
) (
*state.ExecutionSnapshot,
fvm.ProcedureOutput,
error,
) {
startedAt := time.Now()
memAllocBefore := debug.GetHeapAllocsBytes()

Expand All @@ -374,10 +385,13 @@ func (e *blockComputer) executeTransaction(

txn.ctx = fvm.NewContextFromParent(txn.ctx, fvm.WithSpan(txSpan))

txView := stateView.NewChild()
err := e.vm.Run(txn.ctx, txn.TransactionProcedure, txView)
executionSnapshot, output, err := e.vm.RunV2(
txn.ctx,
txn.TransactionProcedure,
storageSnapshot)
if err != nil {
return fmt.Errorf("failed to execute transaction %v for block %s at height %v: %w",
return nil, fvm.ProcedureOutput{}, fmt.Errorf(
"failed to execute transaction %v for block %s at height %v: %w",
txn.txnIdStr,
txn.blockIdStr,
txn.ctx.BlockHeader.Height,
Expand All @@ -387,33 +401,19 @@ func (e *blockComputer) executeTransaction(
postProcessSpan := e.tracer.StartSpanFromParent(txSpan, trace.EXEPostProcessTransaction)
defer postProcessSpan.End()

// always merge the view, fvm take cares of reverting changes
// of failed transaction invocation

txnSnapshot := txView.Finalize()
collector.AddTransactionResult(txn, txnSnapshot)

err = stateView.Merge(txnSnapshot)
if err != nil {
return fmt.Errorf(
"merging tx view to collection view failed for tx %v: %w",
txn.txnIdStr,
err)
}

memAllocAfter := debug.GetHeapAllocsBytes()

logger = logger.With().
Uint64("computation_used", txn.ComputationUsed).
Uint64("memory_used", txn.MemoryEstimate).
Uint64("computation_used", output.ComputationUsed).
Uint64("memory_used", output.MemoryEstimate).
Uint64("mem_alloc", memAllocAfter-memAllocBefore).
Int64("time_spent_in_ms", time.Since(startedAt).Milliseconds()).
Logger()

if txn.Err != nil {
if output.Err != nil {
logger = logger.With().
Str("error_message", txn.Err.Error()).
Uint16("error_code", uint16(txn.Err.Code())).
Str("error_message", output.Err.Error()).
Uint16("error_code", uint16(output.Err.Code())).
Logger()
logger.Info().Msg("transaction execution failed")

Expand All @@ -434,12 +434,12 @@ func (e *blockComputer) executeTransaction(

e.metrics.ExecutionTransactionExecuted(
time.Since(startedAt),
txn.ComputationUsed,
txn.MemoryEstimate,
output.ComputationUsed,
output.MemoryEstimate,
memAllocAfter-memAllocBefore,
len(txn.Events),
flow.EventsList(txn.Events).ByteSize(),
txn.Err != nil,
len(output.Events),
flow.EventsList(output.Events).ByteSize(),
output.Err != nil,
)
return nil
return executionSnapshot, output, nil
}
121 changes: 81 additions & 40 deletions engine/execution/computation/computer/computer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,24 +101,10 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
fvm.WithDerivedBlockData(derived.NewEmptyDerivedBlockData()),
)

vm := new(fvmmock.VM)
vm.On("Run", mock.Anything, mock.Anything, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
ctx := args[0].(fvm.Context)
tx := args[1].(*fvm.TransactionProcedure)
view := args[2].(state.View)

tx.Events = generateEvents(1, tx.TxIndex)

derivedTxnData, err := ctx.DerivedBlockData.NewDerivedTransactionData(
tx.ExecutionTime(),
tx.ExecutionTime())
require.NoError(t, err)

getSetAProgram(t, view, derivedTxnData)
}).
Times(2 + 1) // 2 txs in collection + system chunk
vm := &testVM{
t: t,
eventsPerTransaction: 1,
}

committer := &fakeCommitter{
callCount: 0,
Expand Down Expand Up @@ -283,7 +269,7 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
assert.NotNil(t, chunkExecutionData2.TrieUpdate)
assert.Equal(t, byte(2), chunkExecutionData2.TrieUpdate.RootHash[0])

vm.AssertExpectations(t)
assert.Equal(t, 3, vm.callCount)
})

t.Run("empty block still computes system chunk", func(t *testing.T) {
Expand Down Expand Up @@ -320,8 +306,11 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
block := generateBlock(0, 0, rag)
derivedBlockData := derived.NewEmptyDerivedBlockData()

vm.On("Run", mock.Anything, mock.Anything, mock.Anything).
Return(nil).
vm.On("RunV2", mock.Anything, mock.Anything, mock.Anything).
Return(
&state.ExecutionSnapshot{},
fvm.ProcedureOutput{},
nil).
Once() // just system chunk

committer.On("CommitView", mock.Anything, mock.Anything).
Expand Down Expand Up @@ -431,7 +420,6 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
t.Run("multiple collections", func(t *testing.T) {
execCtx := fvm.NewContext()

vm := new(fvmmock.VM)
committer := new(computermock.ViewCommitter)

bservice := requesterunit.MockBlobService(blockstore.NewBlockstore(dssync.MutexWrap(datastore.NewMapDatastore())))
Expand All @@ -445,6 +433,15 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
trackerStorage,
)

eventsPerTransaction := 2
vm := &testVM{
t: t,
eventsPerTransaction: eventsPerTransaction,
err: fvmErrors.NewInvalidAddressErrorf(
flow.EmptyAddress,
"no payer address provided"),
}

exe, err := computer.NewBlockComputer(
vm,
execCtx,
Expand All @@ -459,7 +456,6 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {

collectionCount := 2
transactionsPerCollection := 2
eventsPerTransaction := 2
eventsPerCollection := eventsPerTransaction * transactionsPerCollection
totalTransactionCount := (collectionCount * transactionsPerCollection) + 1 // +1 for system chunk
// totalEventCount := eventsPerTransaction * totalTransactionCount
Expand All @@ -468,19 +464,6 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
block := generateBlock(collectionCount, transactionsPerCollection, rag)
derivedBlockData := derived.NewEmptyDerivedBlockData()

vm.On("Run", mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
tx := args[1].(*fvm.TransactionProcedure)

tx.Err = fvmErrors.NewInvalidAddressErrorf(
flow.EmptyAddress,
"no payer address provided")
// create dummy events
tx.Events = generateEvents(eventsPerTransaction, tx.TxIndex)
}).
Return(nil).
Times(totalTransactionCount)

committer.On("CommitView", mock.Anything, mock.Anything).
Return(nil, nil, nil, nil).
Times(collectionCount + 1)
Expand Down Expand Up @@ -536,7 +519,7 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {

assertEventHashesMatch(t, collectionCount+1, result)

vm.AssertExpectations(t)
assert.Equal(t, totalTransactionCount, vm.callCount)
})

t.Run("service events are emitted", func(t *testing.T) {
Expand Down Expand Up @@ -1250,6 +1233,58 @@ func generateCollection(transactionCount int, addressGenerator flow.AddressGener
}
}

type testVM struct {
t *testing.T
eventsPerTransaction int

callCount int
err fvmErrors.CodedError
}

func (vm *testVM) RunV2(
ctx fvm.Context,
proc fvm.Procedure,
storageSnapshot state.StorageSnapshot,
) (
*state.ExecutionSnapshot,
fvm.ProcedureOutput,
error,
) {
vm.callCount += 1

txn := proc.(*fvm.TransactionProcedure)

derivedTxnData, err := ctx.DerivedBlockData.NewDerivedTransactionData(
txn.ExecutionTime(),
txn.ExecutionTime())
require.NoError(vm.t, err)

getSetAProgram(vm.t, storageSnapshot, derivedTxnData)

snapshot := &state.ExecutionSnapshot{}
output := fvm.ProcedureOutput{
Events: generateEvents(vm.eventsPerTransaction, txn.TxIndex),
Err: vm.err,
}

return snapshot, output, nil
}

func (testVM) Run(_ fvm.Context, _ fvm.Procedure, _ state.View) error {
panic("not implemented")
}

func (testVM) GetAccount(
_ fvm.Context,
_ flow.Address,
_ state.StorageSnapshot,
) (
*flow.Account,
error,
) {
panic("not implemented")
}

func generateEvents(eventCount int, txIndex uint32) []flow.Event {
events := make([]flow.Event, eventCount)
for i := 0; i < eventCount; i++ {
Expand All @@ -1260,16 +1295,22 @@ func generateEvents(eventCount int, txIndex uint32) []flow.Event {
return events
}

func getSetAProgram(t *testing.T, view state.View, derivedTxnData derived.DerivedTransactionCommitter) {
func getSetAProgram(
t *testing.T,
storageSnapshot state.StorageSnapshot,
derivedTxnData derived.DerivedTransactionCommitter,
) {

txState := state.NewTransactionState(view, state.DefaultParameters())
txnState := state.NewTransactionState(
delta.NewDeltaView(storageSnapshot),
state.DefaultParameters())

loc := common.AddressLocation{
Name: "SomeContract",
Address: common.MustBytesToAddress([]byte{0x1}),
}
_, err := derivedTxnData.GetOrComputeProgram(
txState,
txnState,
loc,
&programLoader{
load: func() (*derived.Program, error) {
Expand Down
Loading

0 comments on commit d352e86

Please sign in to comment.