Skip to content

Commit

Permalink
Merge pull request #5669 from onflow/bastian/sync-stable-cadence-12
Browse files Browse the repository at this point in the history
  • Loading branch information
turbolent authored Apr 15, 2024
2 parents f218386 + 2ad7247 commit 2f9ffa7
Show file tree
Hide file tree
Showing 33 changed files with 1,170 additions and 1,179 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ generate-mocks: install-mock-generators
mockery --name '(Connector|PingInfoProvider)' --dir=network/p2p --case=underscore --output="./network/mocknetwork" --outpkg="mocknetwork"
CGO_CFLAGS=$(CRYPTO_FLAG) mockgen -destination=storage/mocks/storage.go -package=mocks github.com/onflow/flow-go/storage Blocks,Headers,Payloads,Collections,Commits,Events,ServiceEvents,TransactionResults
CGO_CFLAGS=$(CRYPTO_FLAG) mockgen -destination=network/mocknetwork/mock_network.go -package=mocknetwork github.com/onflow/flow-go/network EngineRegistry
mockery --name='.*' --dir=integration/benchmark/mocksiface --case=underscore --output="integration/benchmark/mock" --outpkg="mock"
mockery --name=ExecutionDataStore --dir=module/executiondatasync/execution_data --case=underscore --output="./module/executiondatasync/execution_data/mock" --outpkg="mock"
mockery --name=Downloader --dir=module/executiondatasync/execution_data --case=underscore --output="./module/executiondatasync/execution_data/mock" --outpkg="mock"
mockery --name '(ExecutionDataRequester|IndexReporter)' --dir=module/state_synchronization --case=underscore --output="./module/state_synchronization/mock" --outpkg="state_synchronization"
Expand Down
4 changes: 2 additions & 2 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,9 +708,9 @@ func (h *Handler) GetExecutionResultForBlockID(ctx context.Context, req *access.
func (h *Handler) GetExecutionResultByID(ctx context.Context, req *access.GetExecutionResultByIDRequest) (*access.ExecutionResultByIDResponse, error) {
metadata := h.buildMetadataResponse()

blockID := convert.MessageToIdentifier(req.GetId())
resultID := convert.MessageToIdentifier(req.GetId())

result, err := h.api.GetExecutionResultByID(ctx, blockID)
result, err := h.api.GetExecutionResultByID(ctx, resultID)
if err != nil {
return nil, err
}
Expand Down
21 changes: 21 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,19 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
),
}

broadcaster := engine.NewBroadcaster()
// create BlockTracker that will track for new blocks (finalized and sealed) and
// handles block-related operations.
blockTracker, err := subscription.NewBlockTracker(
node.State,
builder.FinalizedRootBlock.Header.Height,
node.Storage.Headers,
broadcaster,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize block tracker: %w", err)
}

backendParams := backend.Params{
State: node.State,
Blocks: node.Storage.Blocks,
Expand All @@ -1678,6 +1691,14 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
BlockTracker: blockTracker,
SubscriptionHandler: subscription.NewSubscriptionHandler(
builder.Logger,
broadcaster,
builder.stateStreamConf.ClientSendTimeout,
builder.stateStreamConf.ResponseLimit,
builder.stateStreamConf.ClientSendBufferSize,
),
}

if builder.localServiceAPIEnabled {
Expand Down
16 changes: 12 additions & 4 deletions cmd/util/cmd/execution-state-extract/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ var (
flagDiffMigration bool
flagLogVerboseDiff bool
flagVerboseErrorOutput bool
flagCheckStorageHealthBeforeMigration bool
flagStagedContractsFile string
flagContinueMigrationOnValidationError bool
flagCheckStorageHealthBeforeMigration bool
flagCheckStorageHealthAfterMigration bool
flagInputPayloadFileName string
flagOutputPayloadFileName string
flagOutputPayloadByAddresses string
Expand Down Expand Up @@ -97,15 +98,18 @@ func init() {
Cmd.Flags().BoolVar(&flagVerboseErrorOutput, "verbose-error-output", true,
"log verbose output on migration errors")

Cmd.Flags().BoolVar(&flagCheckStorageHealthBeforeMigration, "check-storage-health-before", false,
"check (atree) storage health before migration")

Cmd.Flags().StringVar(&flagStagedContractsFile, "staged-contracts", "",
"Staged contracts CSV file")

Cmd.Flags().BoolVar(&flagAllowPartialStateFromPayloads, "allow-partial-state-from-payload-file", false,
"allow input payload file containing partial state (e.g. not all accounts)")

Cmd.Flags().BoolVar(&flagCheckStorageHealthBeforeMigration, "check-storage-health-before", false,
"check (atree) storage health before migration")

Cmd.Flags().BoolVar(&flagCheckStorageHealthAfterMigration, "check-storage-health-after", false,
"check (atree) storage health after migration")

Cmd.Flags().BoolVar(&flagContinueMigrationOnValidationError, "continue-migration-on-validation-errors", false,
"continue migration even if validation fails")

Expand Down Expand Up @@ -299,6 +303,10 @@ func run(*cobra.Command, []string) {
log.Warn().Msgf("--check-storage-health-before flag is enabled and will increase duration of migration")
}

if flagCheckStorageHealthAfterMigration {
log.Warn().Msgf("--check-storage-health-after flag is enabled and will increase duration of migration")
}

var inputMsg string
if len(flagInputPayloadFileName) > 0 {
// Input is payloads
Expand Down
66 changes: 38 additions & 28 deletions cmd/util/ledger/migrations/atree_register_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ import (
"github.com/onflow/cadence/runtime"
"github.com/onflow/cadence/runtime/common"
"github.com/onflow/cadence/runtime/interpreter"
"github.com/onflow/cadence/runtime/stdlib"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/cmd/util/ledger/reporters"
"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/fvm/environment"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/convert"
Expand All @@ -32,13 +30,14 @@ type AtreeRegisterMigrator struct {

sampler zerolog.Sampler
rw reporters.ReportWriter
rwf reporters.ReportWriterFactory

nWorkers int

validateMigratedValues bool
logVerboseValidationError bool
continueMigrationOnValidationError bool
checkStorageHealthBeforeMigration bool
checkStorageHealthAfterMigration bool
}

var _ AccountBasedMigration = (*AtreeRegisterMigrator)(nil)
Expand All @@ -50,17 +49,20 @@ func NewAtreeRegisterMigrator(
validateMigratedValues bool,
logVerboseValidationError bool,
continueMigrationOnValidationError bool,
checkStorageHealthBeforeMigration bool,
checkStorageHealthAfterMigration bool,
) *AtreeRegisterMigrator {

sampler := util2.NewTimedSampler(30 * time.Second)

migrator := &AtreeRegisterMigrator{
sampler: sampler,
rwf: rwf,
rw: rwf.ReportWriter("atree-register-migrator"),
validateMigratedValues: validateMigratedValues,
logVerboseValidationError: logVerboseValidationError,
continueMigrationOnValidationError: continueMigrationOnValidationError,
checkStorageHealthBeforeMigration: checkStorageHealthBeforeMigration,
checkStorageHealthAfterMigration: checkStorageHealthAfterMigration,
}

return migrator
Expand Down Expand Up @@ -90,11 +92,22 @@ func (m *AtreeRegisterMigrator) MigrateAccount(
oldPayloads []*ledger.Payload,
) ([]*ledger.Payload, error) {
// create all the runtime components we need for the migration
mr, err := NewMigratorRuntime(address, oldPayloads, util.RuntimeInterfaceConfig{})
mr, err := NewAtreeRegisterMigratorRuntime(address, oldPayloads)
if err != nil {
return nil, fmt.Errorf("failed to create migrator runtime: %w", err)
}

// Check storage health before migration, if enabled.
if m.checkStorageHealthBeforeMigration {
err = checkStorageHealth(address, mr.Storage, oldPayloads)
if err != nil {
m.log.Warn().
Err(err).
Str("account", address.Hex()).
Msg("storage health check before migration failed")
}
}

// keep track of all storage maps that were accessed
// if they are empty they won't be changed, but we still need to copy them over
storageMapIds := make(map[string]struct{})
Expand Down Expand Up @@ -145,11 +158,27 @@ func (m *AtreeRegisterMigrator) MigrateAccount(
})
}

// Check storage health after migration, if enabled.
if m.checkStorageHealthAfterMigration {
mr, err := NewAtreeRegisterMigratorRuntime(address, newPayloads)
if err != nil {
return nil, fmt.Errorf("failed to create migrator runtime: %w", err)
}

err = checkStorageHealth(address, mr.Storage, newPayloads)
if err != nil {
m.log.Warn().
Err(err).
Str("account", address.Hex()).
Msg("storage health check after migration failed")
}
}

return newPayloads, nil
}

func (m *AtreeRegisterMigrator) migrateAccountStorage(
mr *migratorRuntime,
mr *AtreeRegisterMigratorRuntime,
storageMapIds map[string]struct{},
) (map[flow.RegisterID]flow.RegisterValue, error) {

Expand All @@ -176,7 +205,7 @@ func (m *AtreeRegisterMigrator) migrateAccountStorage(
}

func (m *AtreeRegisterMigrator) convertStorageDomain(
mr *migratorRuntime,
mr *AtreeRegisterMigratorRuntime,
storageMapIds map[string]struct{},
domain string,
) error {
Expand Down Expand Up @@ -254,7 +283,7 @@ func (m *AtreeRegisterMigrator) convertStorageDomain(
}

func (m *AtreeRegisterMigrator) validateChangesAndCreateNewRegisters(
mr *migratorRuntime,
mr *AtreeRegisterMigratorRuntime,
changes map[flow.RegisterID]flow.RegisterValue,
storageMapIds map[string]struct{},
) ([]*ledger.Payload, error) {
Expand Down Expand Up @@ -389,7 +418,7 @@ func (m *AtreeRegisterMigrator) validateChangesAndCreateNewRegisters(
}

func (m *AtreeRegisterMigrator) cloneValue(
mr *migratorRuntime,
mr *AtreeRegisterMigratorRuntime,
value interpreter.Value,
) (interpreter.Value, error) {

Expand Down Expand Up @@ -427,25 +456,6 @@ func capturePanic(f func()) (err error) {
return
}

// convert all domains
var domains = []string{
common.PathDomainStorage.Identifier(),
common.PathDomainPrivate.Identifier(),
common.PathDomainPublic.Identifier(),
runtime.StorageDomainContract,
stdlib.InboxStorageDomain,
stdlib.CapabilityControllerStorageDomain,
}

var domainsLookupMap = map[string]struct{}{
common.PathDomainStorage.Identifier(): {},
common.PathDomainPrivate.Identifier(): {},
common.PathDomainPublic.Identifier(): {},
runtime.StorageDomainContract: {},
stdlib.InboxStorageDomain: {},
stdlib.CapabilityControllerStorageDomain: {},
}

// migrationProblem is a struct for reporting errors
type migrationProblem struct {
Address string
Expand Down
2 changes: 2 additions & 0 deletions cmd/util/ledger/migrations/atree_register_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func TestAtreeRegisterMigration(t *testing.T) {
true,
false,
false,
false,
false,
),
},
),
Expand Down
64 changes: 64 additions & 0 deletions cmd/util/ledger/migrations/atree_register_migrator_runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package migrations

import (
"fmt"

"github.com/onflow/cadence/runtime"
"github.com/onflow/cadence/runtime/common"
"github.com/onflow/cadence/runtime/interpreter"

"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/fvm/environment"
"github.com/onflow/flow-go/fvm/storage/state"
"github.com/onflow/flow-go/ledger"
)

// NewAtreeRegisterMigratorRuntime returns a new runtime to be used with the AtreeRegisterMigrator.
func NewAtreeRegisterMigratorRuntime(
address common.Address,
payloads []*ledger.Payload,
) (
*AtreeRegisterMigratorRuntime,
error,
) {
snapshot, err := util.NewPayloadSnapshot(payloads)
if err != nil {
return nil, fmt.Errorf("failed to create payload snapshot: %w", err)
}
transactionState := state.NewTransactionState(snapshot, state.DefaultParameters())
accounts := environment.NewAccounts(transactionState)

accountsAtreeLedger := util.NewAccountsAtreeLedger(accounts)
storage := runtime.NewStorage(accountsAtreeLedger, nil)

inter, err := interpreter.NewInterpreter(
nil,
nil,
&interpreter.Config{
Storage: storage,
},
)
if err != nil {
return nil, err
}

return &AtreeRegisterMigratorRuntime{
Address: address,
Payloads: payloads,
Snapshot: snapshot,
TransactionState: transactionState,
Interpreter: inter,
Storage: storage,
AccountsAtreeLedger: accountsAtreeLedger,
}, nil
}

type AtreeRegisterMigratorRuntime struct {
Snapshot *util.PayloadSnapshot
TransactionState state.NestedTransactionPreparer
Interpreter *interpreter.Interpreter
Storage *runtime.Storage
Payloads []*ledger.Payload
Address common.Address
AccountsAtreeLedger *util.AccountsAtreeLedger
}
Loading

0 comments on commit 2f9ffa7

Please sign in to comment.