diff --git a/Makefile b/Makefile index 724f84d5ed7..7ba432f6bb6 100644 --- a/Makefile +++ b/Makefile @@ -157,7 +157,6 @@ generate-mocks: install-mock-generators mockery --name '(Connector|PingInfoProvider)' --dir=network/p2p --case=underscore --output="./network/mocknetwork" --outpkg="mocknetwork" CGO_CFLAGS=$(CRYPTO_FLAG) mockgen -destination=storage/mocks/storage.go -package=mocks github.com/onflow/flow-go/storage Blocks,Headers,Payloads,Collections,Commits,Events,ServiceEvents,TransactionResults CGO_CFLAGS=$(CRYPTO_FLAG) mockgen -destination=network/mocknetwork/mock_network.go -package=mocknetwork github.com/onflow/flow-go/network EngineRegistry - mockery --name='.*' --dir=integration/benchmark/mocksiface --case=underscore --output="integration/benchmark/mock" --outpkg="mock" mockery --name=ExecutionDataStore --dir=module/executiondatasync/execution_data --case=underscore --output="./module/executiondatasync/execution_data/mock" --outpkg="mock" mockery --name=Downloader --dir=module/executiondatasync/execution_data --case=underscore --output="./module/executiondatasync/execution_data/mock" --outpkg="mock" mockery --name '(ExecutionDataRequester|IndexReporter)' --dir=module/state_synchronization --case=underscore --output="./module/state_synchronization/mock" --outpkg="state_synchronization" diff --git a/access/handler.go b/access/handler.go index 71e48511aca..a191f333662 100644 --- a/access/handler.go +++ b/access/handler.go @@ -708,9 +708,9 @@ func (h *Handler) GetExecutionResultForBlockID(ctx context.Context, req *access. func (h *Handler) GetExecutionResultByID(ctx context.Context, req *access.GetExecutionResultByIDRequest) (*access.ExecutionResultByIDResponse, error) { metadata := h.buildMetadataResponse() - blockID := convert.MessageToIdentifier(req.GetId()) + resultID := convert.MessageToIdentifier(req.GetId()) - result, err := h.api.GetExecutionResultByID(ctx, blockID) + result, err := h.api.GetExecutionResultByID(ctx, resultID) if err != nil { return nil, err } diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 8aee600f60e..b50aab6144d 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -1660,6 +1660,19 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { ), } + broadcaster := engine.NewBroadcaster() + // create BlockTracker that will track for new blocks (finalized and sealed) and + // handles block-related operations. + blockTracker, err := subscription.NewBlockTracker( + node.State, + builder.FinalizedRootBlock.Header.Height, + node.Storage.Headers, + broadcaster, + ) + if err != nil { + return nil, fmt.Errorf("failed to initialize block tracker: %w", err) + } + backendParams := backend.Params{ State: node.State, Blocks: node.Storage.Blocks, @@ -1678,6 +1691,14 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { Log: node.Logger, SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled), + BlockTracker: blockTracker, + SubscriptionHandler: subscription.NewSubscriptionHandler( + builder.Logger, + broadcaster, + builder.stateStreamConf.ClientSendTimeout, + builder.stateStreamConf.ResponseLimit, + builder.stateStreamConf.ClientSendBufferSize, + ), } if builder.localServiceAPIEnabled { diff --git a/cmd/util/cmd/execution-state-extract/cmd.go b/cmd/util/cmd/execution-state-extract/cmd.go index ae77a3b8522..6db992b6337 100644 --- a/cmd/util/cmd/execution-state-extract/cmd.go +++ b/cmd/util/cmd/execution-state-extract/cmd.go @@ -38,9 +38,10 @@ var ( flagDiffMigration bool flagLogVerboseDiff bool flagVerboseErrorOutput bool - flagCheckStorageHealthBeforeMigration bool flagStagedContractsFile string flagContinueMigrationOnValidationError bool + flagCheckStorageHealthBeforeMigration bool + flagCheckStorageHealthAfterMigration bool flagInputPayloadFileName string flagOutputPayloadFileName string flagOutputPayloadByAddresses string @@ -97,15 +98,18 @@ func init() { Cmd.Flags().BoolVar(&flagVerboseErrorOutput, "verbose-error-output", true, "log verbose output on migration errors") - Cmd.Flags().BoolVar(&flagCheckStorageHealthBeforeMigration, "check-storage-health-before", false, - "check (atree) storage health before migration") - Cmd.Flags().StringVar(&flagStagedContractsFile, "staged-contracts", "", "Staged contracts CSV file") Cmd.Flags().BoolVar(&flagAllowPartialStateFromPayloads, "allow-partial-state-from-payload-file", false, "allow input payload file containing partial state (e.g. not all accounts)") + Cmd.Flags().BoolVar(&flagCheckStorageHealthBeforeMigration, "check-storage-health-before", false, + "check (atree) storage health before migration") + + Cmd.Flags().BoolVar(&flagCheckStorageHealthAfterMigration, "check-storage-health-after", false, + "check (atree) storage health after migration") + Cmd.Flags().BoolVar(&flagContinueMigrationOnValidationError, "continue-migration-on-validation-errors", false, "continue migration even if validation fails") @@ -299,6 +303,10 @@ func run(*cobra.Command, []string) { log.Warn().Msgf("--check-storage-health-before flag is enabled and will increase duration of migration") } + if flagCheckStorageHealthAfterMigration { + log.Warn().Msgf("--check-storage-health-after flag is enabled and will increase duration of migration") + } + var inputMsg string if len(flagInputPayloadFileName) > 0 { // Input is payloads diff --git a/cmd/util/ledger/migrations/atree_register_migration.go b/cmd/util/ledger/migrations/atree_register_migration.go index 8bedcc26bbc..d9f6be61c59 100644 --- a/cmd/util/ledger/migrations/atree_register_migration.go +++ b/cmd/util/ledger/migrations/atree_register_migration.go @@ -12,11 +12,9 @@ import ( "github.com/onflow/cadence/runtime" "github.com/onflow/cadence/runtime/common" "github.com/onflow/cadence/runtime/interpreter" - "github.com/onflow/cadence/runtime/stdlib" "github.com/rs/zerolog" "github.com/onflow/flow-go/cmd/util/ledger/reporters" - "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/fvm/environment" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/convert" @@ -32,13 +30,14 @@ type AtreeRegisterMigrator struct { sampler zerolog.Sampler rw reporters.ReportWriter - rwf reporters.ReportWriterFactory nWorkers int validateMigratedValues bool logVerboseValidationError bool continueMigrationOnValidationError bool + checkStorageHealthBeforeMigration bool + checkStorageHealthAfterMigration bool } var _ AccountBasedMigration = (*AtreeRegisterMigrator)(nil) @@ -50,17 +49,20 @@ func NewAtreeRegisterMigrator( validateMigratedValues bool, logVerboseValidationError bool, continueMigrationOnValidationError bool, + checkStorageHealthBeforeMigration bool, + checkStorageHealthAfterMigration bool, ) *AtreeRegisterMigrator { sampler := util2.NewTimedSampler(30 * time.Second) migrator := &AtreeRegisterMigrator{ sampler: sampler, - rwf: rwf, rw: rwf.ReportWriter("atree-register-migrator"), validateMigratedValues: validateMigratedValues, logVerboseValidationError: logVerboseValidationError, continueMigrationOnValidationError: continueMigrationOnValidationError, + checkStorageHealthBeforeMigration: checkStorageHealthBeforeMigration, + checkStorageHealthAfterMigration: checkStorageHealthAfterMigration, } return migrator @@ -90,11 +92,22 @@ func (m *AtreeRegisterMigrator) MigrateAccount( oldPayloads []*ledger.Payload, ) ([]*ledger.Payload, error) { // create all the runtime components we need for the migration - mr, err := NewMigratorRuntime(address, oldPayloads, util.RuntimeInterfaceConfig{}) + mr, err := NewAtreeRegisterMigratorRuntime(address, oldPayloads) if err != nil { return nil, fmt.Errorf("failed to create migrator runtime: %w", err) } + // Check storage health before migration, if enabled. + if m.checkStorageHealthBeforeMigration { + err = checkStorageHealth(address, mr.Storage, oldPayloads) + if err != nil { + m.log.Warn(). + Err(err). + Str("account", address.Hex()). + Msg("storage health check before migration failed") + } + } + // keep track of all storage maps that were accessed // if they are empty they won't be changed, but we still need to copy them over storageMapIds := make(map[string]struct{}) @@ -145,11 +158,27 @@ func (m *AtreeRegisterMigrator) MigrateAccount( }) } + // Check storage health after migration, if enabled. + if m.checkStorageHealthAfterMigration { + mr, err := NewAtreeRegisterMigratorRuntime(address, newPayloads) + if err != nil { + return nil, fmt.Errorf("failed to create migrator runtime: %w", err) + } + + err = checkStorageHealth(address, mr.Storage, newPayloads) + if err != nil { + m.log.Warn(). + Err(err). + Str("account", address.Hex()). + Msg("storage health check after migration failed") + } + } + return newPayloads, nil } func (m *AtreeRegisterMigrator) migrateAccountStorage( - mr *migratorRuntime, + mr *AtreeRegisterMigratorRuntime, storageMapIds map[string]struct{}, ) (map[flow.RegisterID]flow.RegisterValue, error) { @@ -176,7 +205,7 @@ func (m *AtreeRegisterMigrator) migrateAccountStorage( } func (m *AtreeRegisterMigrator) convertStorageDomain( - mr *migratorRuntime, + mr *AtreeRegisterMigratorRuntime, storageMapIds map[string]struct{}, domain string, ) error { @@ -254,7 +283,7 @@ func (m *AtreeRegisterMigrator) convertStorageDomain( } func (m *AtreeRegisterMigrator) validateChangesAndCreateNewRegisters( - mr *migratorRuntime, + mr *AtreeRegisterMigratorRuntime, changes map[flow.RegisterID]flow.RegisterValue, storageMapIds map[string]struct{}, ) ([]*ledger.Payload, error) { @@ -389,7 +418,7 @@ func (m *AtreeRegisterMigrator) validateChangesAndCreateNewRegisters( } func (m *AtreeRegisterMigrator) cloneValue( - mr *migratorRuntime, + mr *AtreeRegisterMigratorRuntime, value interpreter.Value, ) (interpreter.Value, error) { @@ -427,25 +456,6 @@ func capturePanic(f func()) (err error) { return } -// convert all domains -var domains = []string{ - common.PathDomainStorage.Identifier(), - common.PathDomainPrivate.Identifier(), - common.PathDomainPublic.Identifier(), - runtime.StorageDomainContract, - stdlib.InboxStorageDomain, - stdlib.CapabilityControllerStorageDomain, -} - -var domainsLookupMap = map[string]struct{}{ - common.PathDomainStorage.Identifier(): {}, - common.PathDomainPrivate.Identifier(): {}, - common.PathDomainPublic.Identifier(): {}, - runtime.StorageDomainContract: {}, - stdlib.InboxStorageDomain: {}, - stdlib.CapabilityControllerStorageDomain: {}, -} - // migrationProblem is a struct for reporting errors type migrationProblem struct { Address string diff --git a/cmd/util/ledger/migrations/atree_register_migration_test.go b/cmd/util/ledger/migrations/atree_register_migration_test.go index 3eed8856fc1..0fb9466b0da 100644 --- a/cmd/util/ledger/migrations/atree_register_migration_test.go +++ b/cmd/util/ledger/migrations/atree_register_migration_test.go @@ -35,6 +35,8 @@ func TestAtreeRegisterMigration(t *testing.T) { true, false, false, + false, + false, ), }, ), diff --git a/cmd/util/ledger/migrations/atree_register_migrator_runtime.go b/cmd/util/ledger/migrations/atree_register_migrator_runtime.go new file mode 100644 index 00000000000..77f52d9198f --- /dev/null +++ b/cmd/util/ledger/migrations/atree_register_migrator_runtime.go @@ -0,0 +1,64 @@ +package migrations + +import ( + "fmt" + + "github.com/onflow/cadence/runtime" + "github.com/onflow/cadence/runtime/common" + "github.com/onflow/cadence/runtime/interpreter" + + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/fvm/environment" + "github.com/onflow/flow-go/fvm/storage/state" + "github.com/onflow/flow-go/ledger" +) + +// NewAtreeRegisterMigratorRuntime returns a new runtime to be used with the AtreeRegisterMigrator. +func NewAtreeRegisterMigratorRuntime( + address common.Address, + payloads []*ledger.Payload, +) ( + *AtreeRegisterMigratorRuntime, + error, +) { + snapshot, err := util.NewPayloadSnapshot(payloads) + if err != nil { + return nil, fmt.Errorf("failed to create payload snapshot: %w", err) + } + transactionState := state.NewTransactionState(snapshot, state.DefaultParameters()) + accounts := environment.NewAccounts(transactionState) + + accountsAtreeLedger := util.NewAccountsAtreeLedger(accounts) + storage := runtime.NewStorage(accountsAtreeLedger, nil) + + inter, err := interpreter.NewInterpreter( + nil, + nil, + &interpreter.Config{ + Storage: storage, + }, + ) + if err != nil { + return nil, err + } + + return &AtreeRegisterMigratorRuntime{ + Address: address, + Payloads: payloads, + Snapshot: snapshot, + TransactionState: transactionState, + Interpreter: inter, + Storage: storage, + AccountsAtreeLedger: accountsAtreeLedger, + }, nil +} + +type AtreeRegisterMigratorRuntime struct { + Snapshot *util.PayloadSnapshot + TransactionState state.NestedTransactionPreparer + Interpreter *interpreter.Interpreter + Storage *runtime.Storage + Payloads []*ledger.Payload + Address common.Address + AccountsAtreeLedger *util.AccountsAtreeLedger +} diff --git a/cmd/util/ledger/migrations/cadence_value_validation.go b/cmd/util/ledger/migrations/cadence_value_validation.go index 6d5ef42741f..61aeedc1fb6 100644 --- a/cmd/util/ledger/migrations/cadence_value_validation.go +++ b/cmd/util/ledger/migrations/cadence_value_validation.go @@ -3,15 +3,11 @@ package migrations import ( "fmt" "strings" - "time" - "github.com/onflow/atree" - "github.com/onflow/cadence" "github.com/onflow/cadence/runtime" "github.com/onflow/cadence/runtime/common" "github.com/onflow/cadence/runtime/interpreter" "github.com/rs/zerolog" - "go.opentelemetry.io/otel/attribute" "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/ledger" @@ -99,7 +95,12 @@ func validateStorageDomain( newValue := newStorageMap.ReadValue(nil, mapKey) - err := cadenceValueEqual(oldRuntime.Interpreter, oldValue, newRuntime.Interpreter, newValue) + err := cadenceValueEqual( + oldRuntime.Interpreter, + oldValue, + newRuntime.Interpreter, + newValue, + ) if err != nil { if verboseLogging { log.Info(). @@ -112,7 +113,13 @@ func validateStorageDomain( Msgf("failed to validate value") } - return fmt.Errorf("failed to validate value for address %s, domain %s, key %s: %s", address.Hex(), domain, key, err.Error()) + return fmt.Errorf( + "failed to validate value for address %s, domain %s, key %s: %s", + address.Hex(), + domain, + key, + err.Error(), + ) } } @@ -397,19 +404,13 @@ func newReadonlyStorageRuntime(payloads []*ledger.Payload) ( storage := runtime.NewStorage(readonlyLedger, nil) - env := runtime.NewBaseInterpreterEnvironment(runtime.Config{ - // Attachments are enabled everywhere except for Mainnet - AttachmentsEnabled: true, - }) - - env.Configure( - &NoopRuntimeInterface{}, - runtime.NewCodesAndPrograms(), - storage, + inter, err := interpreter.NewInterpreter( + nil, nil, + &interpreter.Config{ + Storage: storage, + }, ) - - inter, err := interpreter.NewInterpreter(nil, nil, env.InterpreterConfig) if err != nil { return nil, err } @@ -419,203 +420,3 @@ func newReadonlyStorageRuntime(payloads []*ledger.Payload) ( Storage: storage, }, nil } - -// NoopRuntimeInterface is a runtime interface that can be used in migrations. -type NoopRuntimeInterface struct { -} - -func (NoopRuntimeInterface) ResolveLocation(_ []runtime.Identifier, _ runtime.Location) ([]runtime.ResolvedLocation, error) { - panic("unexpected ResolveLocation call") -} - -func (NoopRuntimeInterface) GetCode(_ runtime.Location) ([]byte, error) { - panic("unexpected GetCode call") -} - -func (NoopRuntimeInterface) GetAccountContractCode(_ common.AddressLocation) ([]byte, error) { - panic("unexpected GetAccountContractCode call") -} - -func (NoopRuntimeInterface) GetOrLoadProgram(_ runtime.Location, _ func() (*interpreter.Program, error)) (*interpreter.Program, error) { - panic("unexpected GetOrLoadProgram call") -} - -func (NoopRuntimeInterface) MeterMemory(_ common.MemoryUsage) error { - return nil -} - -func (NoopRuntimeInterface) MeterComputation(_ common.ComputationKind, _ uint) error { - return nil -} - -func (NoopRuntimeInterface) GetValue(_, _ []byte) (value []byte, err error) { - panic("unexpected GetValue call") -} - -func (NoopRuntimeInterface) SetValue(_, _, _ []byte) (err error) { - panic("unexpected SetValue call") -} - -func (NoopRuntimeInterface) CreateAccount(_ runtime.Address) (address runtime.Address, err error) { - panic("unexpected CreateAccount call") -} - -func (NoopRuntimeInterface) AddEncodedAccountKey(_ runtime.Address, _ []byte) error { - panic("unexpected AddEncodedAccountKey call") -} - -func (NoopRuntimeInterface) RevokeEncodedAccountKey(_ runtime.Address, _ int) (publicKey []byte, err error) { - panic("unexpected RevokeEncodedAccountKey call") -} - -func (NoopRuntimeInterface) AddAccountKey(_ runtime.Address, _ *runtime.PublicKey, _ runtime.HashAlgorithm, _ int) (*runtime.AccountKey, error) { - panic("unexpected AddAccountKey call") -} - -func (NoopRuntimeInterface) GetAccountKey(_ runtime.Address, _ int) (*runtime.AccountKey, error) { - panic("unexpected GetAccountKey call") -} - -func (NoopRuntimeInterface) RevokeAccountKey(_ runtime.Address, _ int) (*runtime.AccountKey, error) { - panic("unexpected RevokeAccountKey call") -} - -func (NoopRuntimeInterface) UpdateAccountContractCode(_ common.AddressLocation, _ []byte) (err error) { - panic("unexpected UpdateAccountContractCode call") -} - -func (NoopRuntimeInterface) RemoveAccountContractCode(common.AddressLocation) (err error) { - panic("unexpected RemoveAccountContractCode call") -} - -func (NoopRuntimeInterface) GetSigningAccounts() ([]runtime.Address, error) { - panic("unexpected GetSigningAccounts call") -} - -func (NoopRuntimeInterface) ProgramLog(_ string) error { - panic("unexpected ProgramLog call") -} - -func (NoopRuntimeInterface) EmitEvent(_ cadence.Event) error { - panic("unexpected EmitEvent call") -} - -func (NoopRuntimeInterface) ValueExists(_, _ []byte) (exists bool, err error) { - panic("unexpected ValueExists call") -} - -func (NoopRuntimeInterface) GenerateUUID() (uint64, error) { - panic("unexpected GenerateUUID call") -} - -func (NoopRuntimeInterface) GetComputationLimit() uint64 { - panic("unexpected GetComputationLimit call") -} - -func (NoopRuntimeInterface) SetComputationUsed(_ uint64) error { - panic("unexpected SetComputationUsed call") -} - -func (NoopRuntimeInterface) DecodeArgument(_ []byte, _ cadence.Type) (cadence.Value, error) { - panic("unexpected DecodeArgument call") -} - -func (NoopRuntimeInterface) GetCurrentBlockHeight() (uint64, error) { - panic("unexpected GetCurrentBlockHeight call") -} - -func (NoopRuntimeInterface) GetBlockAtHeight(_ uint64) (block runtime.Block, exists bool, err error) { - panic("unexpected GetBlockAtHeight call") -} - -func (NoopRuntimeInterface) ReadRandom([]byte) error { - panic("unexpected ReadRandom call") -} - -func (NoopRuntimeInterface) VerifySignature(_ []byte, _ string, _ []byte, _ []byte, _ runtime.SignatureAlgorithm, _ runtime.HashAlgorithm) (bool, error) { - panic("unexpected VerifySignature call") -} - -func (NoopRuntimeInterface) Hash(_ []byte, _ string, _ runtime.HashAlgorithm) ([]byte, error) { - panic("unexpected Hash call") -} - -func (NoopRuntimeInterface) GetAccountBalance(_ common.Address) (value uint64, err error) { - panic("unexpected GetAccountBalance call") -} - -func (NoopRuntimeInterface) GetAccountAvailableBalance(_ common.Address) (value uint64, err error) { - panic("unexpected GetAccountAvailableBalance call") -} - -func (NoopRuntimeInterface) GetStorageUsed(_ runtime.Address) (value uint64, err error) { - panic("unexpected GetStorageUsed call") -} - -func (NoopRuntimeInterface) GetStorageCapacity(_ runtime.Address) (value uint64, err error) { - panic("unexpected GetStorageCapacity call") -} - -func (NoopRuntimeInterface) ImplementationDebugLog(_ string) error { - panic("unexpected ImplementationDebugLog call") -} - -func (NoopRuntimeInterface) ValidatePublicKey(_ *runtime.PublicKey) error { - panic("unexpected ValidatePublicKey call") -} - -func (NoopRuntimeInterface) GetAccountContractNames(_ runtime.Address) ([]string, error) { - panic("unexpected GetAccountContractNames call") -} - -func (NoopRuntimeInterface) AllocateStorageIndex(_ []byte) (atree.StorageIndex, error) { - panic("unexpected AllocateStorageIndex call") -} - -func (NoopRuntimeInterface) ComputationUsed() (uint64, error) { - panic("unexpected ComputationUsed call") -} - -func (NoopRuntimeInterface) MemoryUsed() (uint64, error) { - panic("unexpected MemoryUsed call") -} - -func (NoopRuntimeInterface) InteractionUsed() (uint64, error) { - panic("unexpected InteractionUsed call") -} - -func (NoopRuntimeInterface) SetInterpreterSharedState(_ *interpreter.SharedState) { - panic("unexpected SetInterpreterSharedState call") -} - -func (NoopRuntimeInterface) GetInterpreterSharedState() *interpreter.SharedState { - panic("unexpected GetInterpreterSharedState call") -} - -func (NoopRuntimeInterface) AccountKeysCount(_ runtime.Address) (uint64, error) { - panic("unexpected AccountKeysCount call") -} - -func (NoopRuntimeInterface) BLSVerifyPOP(_ *runtime.PublicKey, _ []byte) (bool, error) { - panic("unexpected BLSVerifyPOP call") -} - -func (NoopRuntimeInterface) BLSAggregateSignatures(_ [][]byte) ([]byte, error) { - panic("unexpected BLSAggregateSignatures call") -} - -func (NoopRuntimeInterface) BLSAggregatePublicKeys(_ []*runtime.PublicKey) (*runtime.PublicKey, error) { - panic("unexpected BLSAggregatePublicKeys call") -} - -func (NoopRuntimeInterface) ResourceOwnerChanged(_ *interpreter.Interpreter, _ *interpreter.CompositeValue, _ common.Address, _ common.Address) { - panic("unexpected ResourceOwnerChanged call") -} - -func (NoopRuntimeInterface) GenerateAccountID(_ common.Address) (uint64, error) { - panic("unexpected GenerateAccountID call") -} - -func (NoopRuntimeInterface) RecordTrace(_ string, _ runtime.Location, _ time.Duration, _ []attribute.KeyValue) { - panic("unexpected RecordTrace call") -} diff --git a/cmd/util/ledger/migrations/cadence_value_validation_test.go b/cmd/util/ledger/migrations/cadence_value_validation_test.go index b784755a10c..c0780c03909 100644 --- a/cmd/util/ledger/migrations/cadence_value_validation_test.go +++ b/cmd/util/ledger/migrations/cadence_value_validation_test.go @@ -9,7 +9,6 @@ import ( "github.com/onflow/cadence/runtime/common" "github.com/onflow/cadence/runtime/interpreter" - "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/fvm/environment" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/convert" @@ -53,11 +52,7 @@ func TestValidateCadenceValues(t *testing.T) { accountStatus.ToBytes(), ) - mr, err := NewMigratorRuntime( - address, - []*ledger.Payload{accountStatusPayload}, - util.RuntimeInterfaceConfig{}, - ) + mr, err := NewAtreeRegisterMigratorRuntime(address, []*ledger.Payload{accountStatusPayload}) require.NoError(t, err) // Create new storage map @@ -145,11 +140,7 @@ func createTestPayloads(t *testing.T, address common.Address, domain string) []* accountStatus.ToBytes(), ) - mr, err := NewMigratorRuntime( - address, - []*ledger.Payload{accountStatusPayload}, - util.RuntimeInterfaceConfig{}, - ) + mr, err := NewAtreeRegisterMigratorRuntime(address, []*ledger.Payload{accountStatusPayload}) require.NoError(t, err) // Create new storage map diff --git a/cmd/util/ledger/migrations/utils.go b/cmd/util/ledger/migrations/utils.go index e747b3dc508..ea790bf3ff1 100644 --- a/cmd/util/ledger/migrations/utils.go +++ b/cmd/util/ledger/migrations/utils.go @@ -4,8 +4,13 @@ import ( "fmt" "github.com/onflow/atree" + "github.com/onflow/cadence/runtime" + "github.com/onflow/cadence/runtime/common" + "github.com/onflow/cadence/runtime/stdlib" "github.com/onflow/flow-go/fvm/environment" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/convert" "github.com/onflow/flow-go/model/flow" ) @@ -59,3 +64,57 @@ func (a *AccountsAtreeLedger) AllocateStorageIndex(owner []byte) (atree.StorageI } return v, nil } + +func checkStorageHealth( + address common.Address, + storage *runtime.Storage, + payloads []*ledger.Payload, +) error { + + for _, payload := range payloads { + registerID, _, err := convert.PayloadToRegister(payload) + if err != nil { + return fmt.Errorf("failed to convert payload to register: %w", err) + } + + if !registerID.IsSlabIndex() { + continue + } + + // Convert the register ID to a storage ID. + slabID := atree.NewStorageID( + atree.Address([]byte(registerID.Owner)), + atree.StorageIndex([]byte(registerID.Key[1:]))) + + // Retrieve the slab. + _, _, err = storage.Retrieve(slabID) + if err != nil { + return fmt.Errorf("failed to retrieve slab %s: %w", slabID, err) + } + } + + for _, domain := range domains { + _ = storage.GetStorageMap(address, domain, false) + } + + return storage.CheckHealth() +} + +// convert all domains +var domains = []string{ + common.PathDomainStorage.Identifier(), + common.PathDomainPrivate.Identifier(), + common.PathDomainPublic.Identifier(), + runtime.StorageDomainContract, + stdlib.InboxStorageDomain, + stdlib.CapabilityControllerStorageDomain, +} + +var domainsLookupMap = map[string]struct{}{ + common.PathDomainStorage.Identifier(): {}, + common.PathDomainPrivate.Identifier(): {}, + common.PathDomainPublic.Identifier(): {}, + runtime.StorageDomainContract: {}, + stdlib.InboxStorageDomain: {}, + stdlib.CapabilityControllerStorageDomain: {}, +} diff --git a/engine/access/apiproxy/access_api_proxy.go b/engine/access/apiproxy/access_api_proxy.go index d57f1681700..689d91de6cd 100644 --- a/engine/access/apiproxy/access_api_proxy.go +++ b/engine/access/apiproxy/access_api_proxy.go @@ -197,12 +197,6 @@ func (h *FlowAccessAPIRouter) GetSystemTransaction(context context.Context, req } func (h *FlowAccessAPIRouter) GetSystemTransactionResult(context context.Context, req *access.GetSystemTransactionResultRequest) (*access.TransactionResultResponse, error) { - if h.useIndex { - res, err := h.local.GetSystemTransactionResult(context, req) - h.log(LocalApiService, "GetSystemTransactionResult", err) - return res, err - } - res, err := h.upstream.GetSystemTransactionResult(context, req) h.log(UpstreamApiService, "GetSystemTransactionResult", err) return res, err @@ -347,48 +341,57 @@ func (h *FlowAccessAPIRouter) GetExecutionResultByID(context context.Context, re } func (h *FlowAccessAPIRouter) SubscribeBlocksFromStartBlockID(req *access.SubscribeBlocksFromStartBlockIDRequest, server access.AccessAPI_SubscribeBlocksFromStartBlockIDServer) error { - // SubscribeBlocksFromStartBlockID is not implemented for observer yet - return status.Errorf(codes.Unimplemented, "method SubscribeBlocksFromStartBlockID not implemented") + err := h.local.SubscribeBlocksFromStartBlockID(req, server) + h.log(LocalApiService, "SubscribeBlocksFromStartBlockID", err) + return err } func (h *FlowAccessAPIRouter) SubscribeBlocksFromStartHeight(req *access.SubscribeBlocksFromStartHeightRequest, server access.AccessAPI_SubscribeBlocksFromStartHeightServer) error { - // SubscribeBlocksFromStartHeight is not implemented for observer yet - return status.Errorf(codes.Unimplemented, "method SubscribeBlocksFromStartHeight not implemented") + err := h.local.SubscribeBlocksFromStartHeight(req, server) + h.log(LocalApiService, "SubscribeBlocksFromStartHeight", err) + return err } func (h *FlowAccessAPIRouter) SubscribeBlocksFromLatest(req *access.SubscribeBlocksFromLatestRequest, server access.AccessAPI_SubscribeBlocksFromLatestServer) error { - // SubscribeBlocksFromLatest is not implemented for observer yet - return status.Errorf(codes.Unimplemented, "method SubscribeBlocksFromLatest not implemented") + err := h.local.SubscribeBlocksFromLatest(req, server) + h.log(LocalApiService, "SubscribeBlocksFromLatest", err) + return err } func (h *FlowAccessAPIRouter) SubscribeBlockHeadersFromStartBlockID(req *access.SubscribeBlockHeadersFromStartBlockIDRequest, server access.AccessAPI_SubscribeBlockHeadersFromStartBlockIDServer) error { - // SubscribeBlockHeadersFromStartBlockID is not implemented for observer yet - return status.Errorf(codes.Unimplemented, "method SubscribeBlockHeadersFromStartBlockID not implemented") + err := h.local.SubscribeBlockHeadersFromStartBlockID(req, server) + h.log(LocalApiService, "SubscribeBlockHeadersFromStartBlockID", err) + return err } func (h *FlowAccessAPIRouter) SubscribeBlockHeadersFromStartHeight(req *access.SubscribeBlockHeadersFromStartHeightRequest, server access.AccessAPI_SubscribeBlockHeadersFromStartHeightServer) error { - // SubscribeBlockHeadersFromStartHeight is not implemented for observer yet - return status.Errorf(codes.Unimplemented, "method SubscribeBlockHeadersFromStartHeight not implemented") + err := h.local.SubscribeBlockHeadersFromStartHeight(req, server) + h.log(LocalApiService, "SubscribeBlockHeadersFromStartHeight", err) + return err } func (h *FlowAccessAPIRouter) SubscribeBlockHeadersFromLatest(req *access.SubscribeBlockHeadersFromLatestRequest, server access.AccessAPI_SubscribeBlockHeadersFromLatestServer) error { - // SubscribeBlockHeadersFromLatest is not implemented for observer yet - return status.Errorf(codes.Unimplemented, "method SubscribeBlockHeadersFromLatest not implemented") + err := h.local.SubscribeBlockHeadersFromLatest(req, server) + h.log(LocalApiService, "SubscribeBlockHeadersFromLatest", err) + return err } func (h *FlowAccessAPIRouter) SubscribeBlockDigestsFromStartBlockID(req *access.SubscribeBlockDigestsFromStartBlockIDRequest, server access.AccessAPI_SubscribeBlockDigestsFromStartBlockIDServer) error { - // SubscribeBlockDigestsFromStartBlockID is not implemented for observer yet - return status.Errorf(codes.Unimplemented, "method SubscribeBlockDigestsFromStartBlockID not implemented") + err := h.local.SubscribeBlockDigestsFromStartBlockID(req, server) + h.log(LocalApiService, "SubscribeBlockDigestsFromStartBlockID", err) + return err } func (h *FlowAccessAPIRouter) SubscribeBlockDigestsFromStartHeight(req *access.SubscribeBlockDigestsFromStartHeightRequest, server access.AccessAPI_SubscribeBlockDigestsFromStartHeightServer) error { - // SubscribeBlockDigestsFromStartHeight is not implemented for observer yet - return status.Errorf(codes.Unimplemented, "method SubscribeBlockDigestsFromStartHeight not implemented") + err := h.local.SubscribeBlockDigestsFromStartHeight(req, server) + h.log(LocalApiService, "SubscribeBlockDigestsFromStartHeight", err) + return err } func (h *FlowAccessAPIRouter) SubscribeBlockDigestsFromLatest(req *access.SubscribeBlockDigestsFromLatestRequest, server access.AccessAPI_SubscribeBlockDigestsFromLatestServer) error { - // SubscribeBlockDigestsFromLatest is not implemented for observer yet - return status.Errorf(codes.Unimplemented, "method SubscribeBlockDigestsFromLatest not implemented") + err := h.local.SubscribeBlockDigestsFromLatest(req, server) + h.log(LocalApiService, "SubscribeBlockDigestsFromLatest", err) + return err } func (h *FlowAccessAPIRouter) SendAndSubscribeTransactionStatuses(req *access.SendAndSubscribeTransactionStatusesRequest, server access.AccessAPI_SendAndSubscribeTransactionStatusesServer) error { diff --git a/integration/benchmark/cmd/ci/main.go b/integration/benchmark/cmd/ci/main.go index b5f68cc0ec7..c3917f5b161 100644 --- a/integration/benchmark/cmd/ci/main.go +++ b/integration/benchmark/cmd/ci/main.go @@ -3,25 +3,22 @@ package main import ( "context" "flag" - "net" "os" "strings" "time" - "github.com/onflow/flow-go/integration/benchmark/load" - "github.com/prometheus/client_golang/prometheus" - "github.com/rs/zerolog" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "gopkg.in/yaml.v3" flowsdk "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go-sdk/access" client "github.com/onflow/flow-go-sdk/access/grpc" "github.com/onflow/flow-go/integration/benchmark" - pb "github.com/onflow/flow-go/integration/benchmark/proto" + "github.com/onflow/flow-go/integration/benchmark/load" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/utils/unittest" @@ -33,7 +30,7 @@ type BenchmarkInfo struct { // Hardcoded CI values const ( - defaultLoadType = "token-transfer" + defaultLoadType = load.TokenTransferLoadType metricport = uint(8080) accessNodeAddress = "127.0.0.1:4001" pushgateway = "127.0.0.1:9091" @@ -45,35 +42,43 @@ const ( defaultMetricCollectionInterval = 20 * time.Second // gRPC constants - defaultMaxMsgSize = 1024 * 1024 * 16 // 16 MB - defaultGRPCAddress = "127.0.0.1:4777" + defaultMaxMsgSize = 1024 * 1024 * 16 // 16 MB ) func main() { logLvl := flag.String("log-level", "info", "set log level") // CI relevant flags - grpcAddressFlag := flag.String("grpc-address", defaultGRPCAddress, "listen address for gRPC server") initialTPSFlag := flag.Int("tps-initial", 10, "starting transactions per second") maxTPSFlag := flag.Int("tps-max", *initialTPSFlag, "maximum transactions per second allowed") minTPSFlag := flag.Int("tps-min", *initialTPSFlag, "minimum transactions per second allowed") + loadTypeFlag := flag.String("load-type", string(defaultLoadType), "load type (token-transfer / const-exec / evm) from the load config file") + loadConfigFileLocationFlag := flag.String("load-config", "", "load config file location. If not provided, default config will be used.") + adjustIntervalFlag := flag.Duration("tps-adjust-interval", defaultAdjustInterval, "interval for adjusting TPS") adjustDelayFlag := flag.Duration("tps-adjust-delay", 120*time.Second, "delay before adjusting TPS") - statIntervalFlag := flag.Duration("stat-interval", defaultMetricCollectionInterval, "") durationFlag := flag.Duration("duration", 10*time.Minute, "test duration") + + statIntervalFlag := flag.Duration("stat-interval", defaultMetricCollectionInterval, "") gitRepoPathFlag := flag.String("git-repo-path", "../..", "git repo path of the filesystem") gitRepoURLFlag := flag.String("git-repo-url", "https://github.com/onflow/flow-go.git", "git repo URL") bigQueryUpload := flag.Bool("bigquery-upload", true, "whether to upload results to BigQuery (true / false)") bigQueryProjectFlag := flag.String("bigquery-project", "dapperlabs-data", "project name for the bigquery uploader") bigQueryDatasetFlag := flag.String("bigquery-dataset", "dev_src_flow_tps_metrics", "dataset name for the bigquery uploader") bigQueryRawTableFlag := flag.String("bigquery-raw-table", "rawResults", "table name for the bigquery raw results") - loadTypeFlag := flag.String("load-type", defaultLoadType, "load type (token-transfer / const-exec / evm)") flag.Parse() - loadType := *loadTypeFlag - log := setupLogger(logLvl) + loadConfig := getLoadConfig( + log, + *loadConfigFileLocationFlag, + *loadTypeFlag, + *minTPSFlag, + *maxTPSFlag, + *initialTPSFlag, + ) + if *gitRepoPathFlag == "" { flag.PrintDefaults() log.Fatal().Msg("git repo path is required") @@ -86,26 +91,6 @@ func main() { <-server.Ready() loaderMetrics := metrics.NewLoaderCollector() - grpcServerOptions := []grpc.ServerOption{ - grpc.MaxRecvMsgSize(defaultMaxMsgSize), - grpc.MaxSendMsgSize(defaultMaxMsgSize), - } - grpcServer := grpc.NewServer(grpcServerOptions...) - defer grpcServer.Stop() - - pb.RegisterBenchmarkServer(grpcServer, &benchmarkServer{}) - - grpcListener, err := net.Listen("tcp", *grpcAddressFlag) - if err != nil { - log.Fatal().Err(err).Str("address", *grpcAddressFlag).Msg("failed to listen") - } - - go func() { - if err := grpcServer.Serve(grpcListener); err != nil { - log.Fatal().Err(err).Msg("failed to serve") - } - }() - sp := benchmark.NewStatsPusher(ctx, log, pushgateway, "loader", prometheus.DefaultGatherer) defer sp.Stop() @@ -136,10 +121,7 @@ func main() { // prepare load generator log.Info(). - Str("load_type", loadType). - Int("initialTPS", *initialTPSFlag). - Int("minTPS", *minTPSFlag). - Int("maxTPS", *maxTPSFlag). + Interface("loadConfig", loadConfig). Dur("duration", *durationFlag). Msg("Running load case") @@ -164,7 +146,7 @@ func main() { }, benchmark.LoadParams{ NumberOfAccounts: maxInflight, - LoadType: load.LoadType(loadType), + LoadConfig: loadConfig, FeedbackEnabled: feedbackEnabled, }, ) @@ -187,9 +169,9 @@ func main() { AdjusterParams{ Delay: *adjustDelayFlag, Interval: *adjustIntervalFlag, - InitialTPS: uint(*initialTPSFlag), - MinTPS: uint(*minTPSFlag), - MaxTPS: uint(*maxTPSFlag), + InitialTPS: uint(loadConfig.TPSInitial), + MinTPS: uint(loadConfig.TpsMin), + MaxTPS: uint(loadConfig.TpsMax), MaxInflight: uint(maxInflight / 2), }, ) @@ -218,7 +200,7 @@ func main() { // only upload valid data if *bigQueryUpload { repoInfo := MustGetRepoInfo(log, *gitRepoURLFlag, *gitRepoPathFlag) - mustUploadData(ctx, log, recorder, repoInfo, *bigQueryProjectFlag, *bigQueryDatasetFlag, *bigQueryRawTableFlag, loadType) + mustUploadData(ctx, log, recorder, repoInfo, *bigQueryProjectFlag, *bigQueryDatasetFlag, *bigQueryRawTableFlag, loadConfig.LoadName) } else { log.Info().Int("raw_tps_size", len(recorder.BenchmarkResults.RawTPS)).Msg("logging tps results locally") // log results locally when not uploading to BigQuery @@ -228,6 +210,55 @@ func main() { } } +func getLoadConfig( + log zerolog.Logger, + loadConfigLocation string, + load string, + minTPS int, + maxTPS int, + initialTPS int, +) benchmark.LoadConfig { + if loadConfigLocation == "" { + lc := benchmark.LoadConfig{ + LoadName: load, + LoadType: load, + TpsMax: maxTPS, + TpsMin: minTPS, + TPSInitial: initialTPS, + } + + log.Info(). + Interface("loadConfig", lc). + Msg("Load config file not provided, using parameters supplied in TPS flags") + return lc + } + + var loadConfigs map[string]benchmark.LoadConfig + + // check if the file exists + if _, err := os.Stat(loadConfigLocation); os.IsNotExist(err) { + log.Fatal().Err(err).Str("loadConfigLocation", loadConfigLocation).Msg("load config file not found") + } + + yamlFile, err := os.ReadFile(loadConfigLocation) + if err != nil { + log.Fatal().Err(err).Str("loadConfigLocation", loadConfigLocation).Msg("failed to read load config file") + } + + err = yaml.Unmarshal(yamlFile, &loadConfigs) + if err != nil { + log.Fatal().Err(err).Str("loadConfigLocation", loadConfigLocation).Msg("failed to unmarshal load config file") + } + + lc, ok := loadConfigs[load] + if !ok { + log.Fatal().Str("load", load).Msg("load not found in load config file") + } + lc.LoadName = load + + return lc +} + // setupLogger parses log level and apply to logger func setupLogger(logLvl *string) zerolog.Logger { log := zerolog.New(os.Stderr). @@ -252,7 +283,7 @@ func mustUploadData( bigQueryProject string, bigQueryDataset string, bigQueryRawTable string, - loadType string, + loadName string, ) { log.Info().Msg("Initializing BigQuery") db, err := NewDB(ctx, log, bigQueryProject) @@ -278,7 +309,7 @@ func mustUploadData( bigQueryRawTable, recorder.BenchmarkResults, *repoInfo, - BenchmarkInfo{BenchmarkType: loadType}, + BenchmarkInfo{BenchmarkType: loadName}, MustGetDefaultEnvironment(), ) if err != nil { diff --git a/integration/benchmark/cmd/ci/server.go b/integration/benchmark/cmd/ci/server.go deleted file mode 100644 index ba72e856ed4..00000000000 --- a/integration/benchmark/cmd/ci/server.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import ( - "context" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/emptypb" - - pb "github.com/onflow/flow-go/integration/benchmark/proto" -) - -type benchmarkServer struct { - pb.UnimplementedBenchmarkServer -} - -func (s *benchmarkServer) StartMacroBenchmark(*pb.StartMacroBenchmarkRequest, pb.Benchmark_StartMacroBenchmarkServer) error { - return status.Errorf(codes.Unimplemented, "method StartMacroBenchmark not implemented") -} -func (s *benchmarkServer) GetMacroBenchmark(context.Context, *pb.GetMacroBenchmarkRequest) (*pb.GetMacroBenchmarkResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetMacroBenchmark not implemented") -} -func (s *benchmarkServer) ListMacroBenchmarks(context.Context, *emptypb.Empty) (*pb.ListMacroBenchmarksResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ListMacroBenchmarks not implemented") -} -func (s *benchmarkServer) Status(context.Context, *emptypb.Empty) (*pb.StatusResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Status not implemented") -} diff --git a/integration/benchmark/cmd/manual/main.go b/integration/benchmark/cmd/manual/main.go index f42e21ef894..ffaa9615570 100644 --- a/integration/benchmark/cmd/manual/main.go +++ b/integration/benchmark/cmd/manual/main.go @@ -9,8 +9,6 @@ import ( "strings" "time" - "github.com/onflow/flow-go/integration/benchmark/load" - "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" "google.golang.org/grpc" @@ -132,8 +130,14 @@ func main() { }, benchmark.LoadParams{ NumberOfAccounts: int(maxTPS) * *accountMultiplierFlag, - LoadType: load.LoadType(*loadTypeFlag), - FeedbackEnabled: *feedbackEnabled, + LoadConfig: benchmark.LoadConfig{ + LoadName: *loadTypeFlag, + LoadType: *loadTypeFlag, + TpsMax: int(maxTPS), + TpsMin: int(maxTPS), + TPSInitial: int(maxTPS), + }, + FeedbackEnabled: *feedbackEnabled, }, ) if err != nil { diff --git a/integration/benchmark/contLoadGenerator.go b/integration/benchmark/contLoadGenerator.go index df78ca74c0a..4b5c147b8ff 100644 --- a/integration/benchmark/contLoadGenerator.go +++ b/integration/benchmark/contLoadGenerator.go @@ -46,9 +46,20 @@ type NetworkParams struct { ChainId flow.ChainID } +type LoadConfig struct { + // LoadName is the name of the load. This can be different from the LoadType + // and is used to identify the load in the results. The use case is when a single + // load type is used to run multiple loads with different parameters. + LoadName string `yaml:"-"` + LoadType string `yaml:"load_type"` + TpsMax int `yaml:"tps_max"` + TpsMin int `yaml:"tps_min"` + TPSInitial int `yaml:"tps_initial"` +} + type LoadParams struct { NumberOfAccounts int - LoadType load.LoadType + LoadConfig LoadConfig // TODO(rbtz): inject a TxFollower FeedbackEnabled bool @@ -157,7 +168,7 @@ func New( Proposer: servAcc, } - l := load.CreateLoadType(log, loadParams.LoadType) + l := load.CreateLoadType(log, load.LoadType(loadParams.LoadConfig.LoadType)) err = l.Setup(log, lc) if err != nil { diff --git a/integration/benchmark/mocksiface/mocks.go b/integration/benchmark/mocksiface/mocks.go deleted file mode 100644 index 0068b5676c2..00000000000 --- a/integration/benchmark/mocksiface/mocks.go +++ /dev/null @@ -1,10 +0,0 @@ -package mocksiface_test - -import ( - "github.com/onflow/flow-go-sdk/access" -) - -// This is a proxy for the real access.Client for mockery to use. -type Client interface { - access.Client -} diff --git a/integration/benchmark/proto/generate.go b/integration/benchmark/proto/generate.go deleted file mode 100644 index b36797e4592..00000000000 --- a/integration/benchmark/proto/generate.go +++ /dev/null @@ -1,3 +0,0 @@ -//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative macro_benchmark.proto - -package proto diff --git a/integration/benchmark/proto/macro_benchmark.pb.go b/integration/benchmark/proto/macro_benchmark.pb.go deleted file mode 100644 index 15fdb7b4cf9..00000000000 --- a/integration/benchmark/proto/macro_benchmark.pb.go +++ /dev/null @@ -1,435 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.28.1 -// protoc v3.21.9 -// source: macro_benchmark.proto - -package proto - -import ( - reflect "reflect" - sync "sync" - - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - emptypb "google.golang.org/protobuf/types/known/emptypb" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type StartMacroBenchmarkRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *StartMacroBenchmarkRequest) Reset() { - *x = StartMacroBenchmarkRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_macro_benchmark_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *StartMacroBenchmarkRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*StartMacroBenchmarkRequest) ProtoMessage() {} - -func (x *StartMacroBenchmarkRequest) ProtoReflect() protoreflect.Message { - mi := &file_macro_benchmark_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use StartMacroBenchmarkRequest.ProtoReflect.Descriptor instead. -func (*StartMacroBenchmarkRequest) Descriptor() ([]byte, []int) { - return file_macro_benchmark_proto_rawDescGZIP(), []int{0} -} - -type StartMacroBenchmarkResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *StartMacroBenchmarkResponse) Reset() { - *x = StartMacroBenchmarkResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_macro_benchmark_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *StartMacroBenchmarkResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*StartMacroBenchmarkResponse) ProtoMessage() {} - -func (x *StartMacroBenchmarkResponse) ProtoReflect() protoreflect.Message { - mi := &file_macro_benchmark_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use StartMacroBenchmarkResponse.ProtoReflect.Descriptor instead. -func (*StartMacroBenchmarkResponse) Descriptor() ([]byte, []int) { - return file_macro_benchmark_proto_rawDescGZIP(), []int{1} -} - -type GetMacroBenchmarkRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *GetMacroBenchmarkRequest) Reset() { - *x = GetMacroBenchmarkRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_macro_benchmark_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *GetMacroBenchmarkRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*GetMacroBenchmarkRequest) ProtoMessage() {} - -func (x *GetMacroBenchmarkRequest) ProtoReflect() protoreflect.Message { - mi := &file_macro_benchmark_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use GetMacroBenchmarkRequest.ProtoReflect.Descriptor instead. -func (*GetMacroBenchmarkRequest) Descriptor() ([]byte, []int) { - return file_macro_benchmark_proto_rawDescGZIP(), []int{2} -} - -type GetMacroBenchmarkResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *GetMacroBenchmarkResponse) Reset() { - *x = GetMacroBenchmarkResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_macro_benchmark_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *GetMacroBenchmarkResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*GetMacroBenchmarkResponse) ProtoMessage() {} - -func (x *GetMacroBenchmarkResponse) ProtoReflect() protoreflect.Message { - mi := &file_macro_benchmark_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use GetMacroBenchmarkResponse.ProtoReflect.Descriptor instead. -func (*GetMacroBenchmarkResponse) Descriptor() ([]byte, []int) { - return file_macro_benchmark_proto_rawDescGZIP(), []int{3} -} - -type ListMacroBenchmarksResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *ListMacroBenchmarksResponse) Reset() { - *x = ListMacroBenchmarksResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_macro_benchmark_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ListMacroBenchmarksResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ListMacroBenchmarksResponse) ProtoMessage() {} - -func (x *ListMacroBenchmarksResponse) ProtoReflect() protoreflect.Message { - mi := &file_macro_benchmark_proto_msgTypes[4] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ListMacroBenchmarksResponse.ProtoReflect.Descriptor instead. -func (*ListMacroBenchmarksResponse) Descriptor() ([]byte, []int) { - return file_macro_benchmark_proto_rawDescGZIP(), []int{4} -} - -type StatusResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *StatusResponse) Reset() { - *x = StatusResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_macro_benchmark_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *StatusResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*StatusResponse) ProtoMessage() {} - -func (x *StatusResponse) ProtoReflect() protoreflect.Message { - mi := &file_macro_benchmark_proto_msgTypes[5] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use StatusResponse.ProtoReflect.Descriptor instead. -func (*StatusResponse) Descriptor() ([]byte, []int) { - return file_macro_benchmark_proto_rawDescGZIP(), []int{5} -} - -var File_macro_benchmark_proto protoreflect.FileDescriptor - -var file_macro_benchmark_proto_rawDesc = []byte{ - 0x0a, 0x15, 0x6d, 0x61, 0x63, 0x72, 0x6f, 0x5f, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, - 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, - 0x72, 0x6b, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, - 0x1c, 0x0a, 0x1a, 0x53, 0x74, 0x61, 0x72, 0x74, 0x4d, 0x61, 0x63, 0x72, 0x6f, 0x42, 0x65, 0x6e, - 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x1d, 0x0a, - 0x1b, 0x53, 0x74, 0x61, 0x72, 0x74, 0x4d, 0x61, 0x63, 0x72, 0x6f, 0x42, 0x65, 0x6e, 0x63, 0x68, - 0x6d, 0x61, 0x72, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1a, 0x0a, 0x18, - 0x47, 0x65, 0x74, 0x4d, 0x61, 0x63, 0x72, 0x6f, 0x42, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, - 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x1b, 0x0a, 0x19, 0x47, 0x65, 0x74, 0x4d, - 0x61, 0x63, 0x72, 0x6f, 0x42, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1d, 0x0a, 0x1b, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x63, - 0x72, 0x6f, 0x42, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x73, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x10, 0x0a, 0x0e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xef, 0x02, 0x0a, 0x09, 0x42, 0x65, 0x6e, 0x63, 0x68, - 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x68, 0x0a, 0x13, 0x53, 0x74, 0x61, 0x72, 0x74, 0x4d, 0x61, 0x63, - 0x72, 0x6f, 0x42, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x25, 0x2e, 0x62, 0x65, - 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x4d, 0x61, 0x63, - 0x72, 0x6f, 0x42, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x2e, 0x53, - 0x74, 0x61, 0x72, 0x74, 0x4d, 0x61, 0x63, 0x72, 0x6f, 0x42, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, - 0x72, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x60, - 0x0a, 0x11, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x63, 0x72, 0x6f, 0x42, 0x65, 0x6e, 0x63, 0x68, 0x6d, - 0x61, 0x72, 0x6b, 0x12, 0x23, 0x2e, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x2e, - 0x47, 0x65, 0x74, 0x4d, 0x61, 0x63, 0x72, 0x6f, 0x42, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, - 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x62, 0x65, 0x6e, 0x63, 0x68, - 0x6d, 0x61, 0x72, 0x6b, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x63, 0x72, 0x6f, 0x42, 0x65, 0x6e, - 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x12, 0x57, 0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x63, 0x72, 0x6f, 0x42, 0x65, 0x6e, - 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, - 0x26, 0x2e, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x2e, 0x4c, 0x69, 0x73, 0x74, - 0x4d, 0x61, 0x63, 0x72, 0x6f, 0x42, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x73, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x06, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x19, 0x2e, 0x62, 0x65, - 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x6e, 0x66, 0x6c, 0x6f, 0x77, 0x2f, 0x66, 0x6c, - 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x2f, 0x62, 0x65, 0x63, 0x6e, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_macro_benchmark_proto_rawDescOnce sync.Once - file_macro_benchmark_proto_rawDescData = file_macro_benchmark_proto_rawDesc -) - -func file_macro_benchmark_proto_rawDescGZIP() []byte { - file_macro_benchmark_proto_rawDescOnce.Do(func() { - file_macro_benchmark_proto_rawDescData = protoimpl.X.CompressGZIP(file_macro_benchmark_proto_rawDescData) - }) - return file_macro_benchmark_proto_rawDescData -} - -var file_macro_benchmark_proto_msgTypes = make([]protoimpl.MessageInfo, 6) -var file_macro_benchmark_proto_goTypes = []interface{}{ - (*StartMacroBenchmarkRequest)(nil), // 0: benchmark.StartMacroBenchmarkRequest - (*StartMacroBenchmarkResponse)(nil), // 1: benchmark.StartMacroBenchmarkResponse - (*GetMacroBenchmarkRequest)(nil), // 2: benchmark.GetMacroBenchmarkRequest - (*GetMacroBenchmarkResponse)(nil), // 3: benchmark.GetMacroBenchmarkResponse - (*ListMacroBenchmarksResponse)(nil), // 4: benchmark.ListMacroBenchmarksResponse - (*StatusResponse)(nil), // 5: benchmark.StatusResponse - (*emptypb.Empty)(nil), // 6: google.protobuf.Empty -} -var file_macro_benchmark_proto_depIdxs = []int32{ - 0, // 0: benchmark.Benchmark.StartMacroBenchmark:input_type -> benchmark.StartMacroBenchmarkRequest - 2, // 1: benchmark.Benchmark.GetMacroBenchmark:input_type -> benchmark.GetMacroBenchmarkRequest - 6, // 2: benchmark.Benchmark.ListMacroBenchmarks:input_type -> google.protobuf.Empty - 6, // 3: benchmark.Benchmark.Status:input_type -> google.protobuf.Empty - 1, // 4: benchmark.Benchmark.StartMacroBenchmark:output_type -> benchmark.StartMacroBenchmarkResponse - 3, // 5: benchmark.Benchmark.GetMacroBenchmark:output_type -> benchmark.GetMacroBenchmarkResponse - 4, // 6: benchmark.Benchmark.ListMacroBenchmarks:output_type -> benchmark.ListMacroBenchmarksResponse - 5, // 7: benchmark.Benchmark.Status:output_type -> benchmark.StatusResponse - 4, // [4:8] is the sub-list for method output_type - 0, // [0:4] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_macro_benchmark_proto_init() } -func file_macro_benchmark_proto_init() { - if File_macro_benchmark_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_macro_benchmark_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StartMacroBenchmarkRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_macro_benchmark_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StartMacroBenchmarkResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_macro_benchmark_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GetMacroBenchmarkRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_macro_benchmark_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GetMacroBenchmarkResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_macro_benchmark_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListMacroBenchmarksResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_macro_benchmark_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StatusResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_macro_benchmark_proto_rawDesc, - NumEnums: 0, - NumMessages: 6, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_macro_benchmark_proto_goTypes, - DependencyIndexes: file_macro_benchmark_proto_depIdxs, - MessageInfos: file_macro_benchmark_proto_msgTypes, - }.Build() - File_macro_benchmark_proto = out.File - file_macro_benchmark_proto_rawDesc = nil - file_macro_benchmark_proto_goTypes = nil - file_macro_benchmark_proto_depIdxs = nil -} diff --git a/integration/benchmark/proto/macro_benchmark.proto b/integration/benchmark/proto/macro_benchmark.proto deleted file mode 100644 index e461ea81892..00000000000 --- a/integration/benchmark/proto/macro_benchmark.proto +++ /dev/null @@ -1,28 +0,0 @@ -syntax = "proto3"; - -package benchmark; -option go_package = "github.com/onflow/flow-go/integration/becnhmark/proto"; - -import "google/protobuf/empty.proto"; - -message StartMacroBenchmarkRequest {} -message StartMacroBenchmarkResponse {} - -message GetMacroBenchmarkRequest {} -message GetMacroBenchmarkResponse {} - -message ListMacroBenchmarksResponse {} - -message StatusResponse {} - -service Benchmark { - rpc StartMacroBenchmark(StartMacroBenchmarkRequest) - returns (stream StartMacroBenchmarkResponse) {} - rpc GetMacroBenchmark(GetMacroBenchmarkRequest) - returns (GetMacroBenchmarkResponse) {} - rpc ListMacroBenchmarks(google.protobuf.Empty) - returns (ListMacroBenchmarksResponse) {} - - rpc Status(google.protobuf.Empty) returns (StatusResponse) {} -} - diff --git a/integration/benchmark/proto/macro_benchmark_grpc.pb.go b/integration/benchmark/proto/macro_benchmark_grpc.pb.go deleted file mode 100644 index 065a26fcb39..00000000000 --- a/integration/benchmark/proto/macro_benchmark_grpc.pb.go +++ /dev/null @@ -1,243 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.9 -// source: macro_benchmark.proto - -package proto - -import ( - context "context" - - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - emptypb "google.golang.org/protobuf/types/known/emptypb" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -// BenchmarkClient is the client API for Benchmark service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type BenchmarkClient interface { - StartMacroBenchmark(ctx context.Context, in *StartMacroBenchmarkRequest, opts ...grpc.CallOption) (Benchmark_StartMacroBenchmarkClient, error) - GetMacroBenchmark(ctx context.Context, in *GetMacroBenchmarkRequest, opts ...grpc.CallOption) (*GetMacroBenchmarkResponse, error) - ListMacroBenchmarks(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ListMacroBenchmarksResponse, error) - Status(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*StatusResponse, error) -} - -type benchmarkClient struct { - cc grpc.ClientConnInterface -} - -func NewBenchmarkClient(cc grpc.ClientConnInterface) BenchmarkClient { - return &benchmarkClient{cc} -} - -func (c *benchmarkClient) StartMacroBenchmark(ctx context.Context, in *StartMacroBenchmarkRequest, opts ...grpc.CallOption) (Benchmark_StartMacroBenchmarkClient, error) { - stream, err := c.cc.NewStream(ctx, &Benchmark_ServiceDesc.Streams[0], "/benchmark.Benchmark/StartMacroBenchmark", opts...) - if err != nil { - return nil, err - } - x := &benchmarkStartMacroBenchmarkClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type Benchmark_StartMacroBenchmarkClient interface { - Recv() (*StartMacroBenchmarkResponse, error) - grpc.ClientStream -} - -type benchmarkStartMacroBenchmarkClient struct { - grpc.ClientStream -} - -func (x *benchmarkStartMacroBenchmarkClient) Recv() (*StartMacroBenchmarkResponse, error) { - m := new(StartMacroBenchmarkResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *benchmarkClient) GetMacroBenchmark(ctx context.Context, in *GetMacroBenchmarkRequest, opts ...grpc.CallOption) (*GetMacroBenchmarkResponse, error) { - out := new(GetMacroBenchmarkResponse) - err := c.cc.Invoke(ctx, "/benchmark.Benchmark/GetMacroBenchmark", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *benchmarkClient) ListMacroBenchmarks(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ListMacroBenchmarksResponse, error) { - out := new(ListMacroBenchmarksResponse) - err := c.cc.Invoke(ctx, "/benchmark.Benchmark/ListMacroBenchmarks", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *benchmarkClient) Status(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*StatusResponse, error) { - out := new(StatusResponse) - err := c.cc.Invoke(ctx, "/benchmark.Benchmark/Status", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// BenchmarkServer is the server API for Benchmark service. -// All implementations must embed UnimplementedBenchmarkServer -// for forward compatibility -type BenchmarkServer interface { - StartMacroBenchmark(*StartMacroBenchmarkRequest, Benchmark_StartMacroBenchmarkServer) error - GetMacroBenchmark(context.Context, *GetMacroBenchmarkRequest) (*GetMacroBenchmarkResponse, error) - ListMacroBenchmarks(context.Context, *emptypb.Empty) (*ListMacroBenchmarksResponse, error) - Status(context.Context, *emptypb.Empty) (*StatusResponse, error) - mustEmbedUnimplementedBenchmarkServer() -} - -// UnimplementedBenchmarkServer must be embedded to have forward compatible implementations. -type UnimplementedBenchmarkServer struct { -} - -func (UnimplementedBenchmarkServer) StartMacroBenchmark(*StartMacroBenchmarkRequest, Benchmark_StartMacroBenchmarkServer) error { - return status.Errorf(codes.Unimplemented, "method StartMacroBenchmark not implemented") -} -func (UnimplementedBenchmarkServer) GetMacroBenchmark(context.Context, *GetMacroBenchmarkRequest) (*GetMacroBenchmarkResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetMacroBenchmark not implemented") -} -func (UnimplementedBenchmarkServer) ListMacroBenchmarks(context.Context, *emptypb.Empty) (*ListMacroBenchmarksResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ListMacroBenchmarks not implemented") -} -func (UnimplementedBenchmarkServer) Status(context.Context, *emptypb.Empty) (*StatusResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Status not implemented") -} -func (UnimplementedBenchmarkServer) mustEmbedUnimplementedBenchmarkServer() {} - -// UnsafeBenchmarkServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to BenchmarkServer will -// result in compilation errors. -type UnsafeBenchmarkServer interface { - mustEmbedUnimplementedBenchmarkServer() -} - -func RegisterBenchmarkServer(s grpc.ServiceRegistrar, srv BenchmarkServer) { - s.RegisterService(&Benchmark_ServiceDesc, srv) -} - -func _Benchmark_StartMacroBenchmark_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(StartMacroBenchmarkRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(BenchmarkServer).StartMacroBenchmark(m, &benchmarkStartMacroBenchmarkServer{stream}) -} - -type Benchmark_StartMacroBenchmarkServer interface { - Send(*StartMacroBenchmarkResponse) error - grpc.ServerStream -} - -type benchmarkStartMacroBenchmarkServer struct { - grpc.ServerStream -} - -func (x *benchmarkStartMacroBenchmarkServer) Send(m *StartMacroBenchmarkResponse) error { - return x.ServerStream.SendMsg(m) -} - -func _Benchmark_GetMacroBenchmark_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(GetMacroBenchmarkRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(BenchmarkServer).GetMacroBenchmark(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/benchmark.Benchmark/GetMacroBenchmark", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BenchmarkServer).GetMacroBenchmark(ctx, req.(*GetMacroBenchmarkRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Benchmark_ListMacroBenchmarks_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(emptypb.Empty) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(BenchmarkServer).ListMacroBenchmarks(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/benchmark.Benchmark/ListMacroBenchmarks", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BenchmarkServer).ListMacroBenchmarks(ctx, req.(*emptypb.Empty)) - } - return interceptor(ctx, in, info, handler) -} - -func _Benchmark_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(emptypb.Empty) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(BenchmarkServer).Status(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/benchmark.Benchmark/Status", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BenchmarkServer).Status(ctx, req.(*emptypb.Empty)) - } - return interceptor(ctx, in, info, handler) -} - -// Benchmark_ServiceDesc is the grpc.ServiceDesc for Benchmark service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var Benchmark_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "benchmark.Benchmark", - HandlerType: (*BenchmarkServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "GetMacroBenchmark", - Handler: _Benchmark_GetMacroBenchmark_Handler, - }, - { - MethodName: "ListMacroBenchmarks", - Handler: _Benchmark_ListMacroBenchmarks_Handler, - }, - { - MethodName: "Status", - Handler: _Benchmark_Status_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "StartMacroBenchmark", - Handler: _Benchmark_StartMacroBenchmark_Handler, - ServerStreams: true, - }, - }, - Metadata: "macro_benchmark.proto", -} diff --git a/integration/benchmark/server/bench.sh b/integration/benchmark/server/bench.sh index 778cac6279d..161549aba0f 100755 --- a/integration/benchmark/server/bench.sh +++ b/integration/benchmark/server/bench.sh @@ -7,7 +7,7 @@ set -o pipefail # this will keep the TPS automation code separate from the code that's being tested so we won't run into issues # of having old versions of automation code just because we happen to be testing an older version flow-go git clone https://github.com/onflow/flow-go.git -cd flow-go/integration/localnet +cd flow-go/integration/localnet || exit git fetch git fetch --tags @@ -37,7 +37,7 @@ while read -r input; do # sleep is workaround for slow initialization of some node types, so that benchmark does not quit immediately with "connection refused" sleep 30; - go run ../benchmark/cmd/ci -log-level debug -git-repo-path ../../ -tps-initial 800 -tps-min 1 -tps-max 1200 -duration 30m -load-type "$load" + go run ../benchmark/cmd/ci -log-level info -git-repo-path ../../ -tps-initial 800 -tps-min 1 -tps-max 1200 -duration 30m -load-type "$load" -load-config "../benchmark/server/load-config.yml" # instead of running "make stop" which uses docker-compose for a lot of older versions, # we explicitly run the command here with "docker compose" diff --git a/integration/benchmark/server/branches.recent b/integration/benchmark/server/branches.recent deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/integration/benchmark/server/commits.recent b/integration/benchmark/server/commits.recent deleted file mode 100644 index 538b5965dcc..00000000000 --- a/integration/benchmark/server/commits.recent +++ /dev/null @@ -1 +0,0 @@ -janez/tps-benchmark-evm-load:894151a2390b11e3d9a399b41746d1c112f745fa:evm diff --git a/integration/benchmark/server/flow-go b/integration/benchmark/server/flow-go deleted file mode 160000 index 894151a2390..00000000000 --- a/integration/benchmark/server/flow-go +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 894151a2390b11e3d9a399b41746d1c112f745fa diff --git a/integration/benchmark/server/load-config.yml b/integration/benchmark/server/load-config.yml new file mode 100644 index 00000000000..f7c62d31729 --- /dev/null +++ b/integration/benchmark/server/load-config.yml @@ -0,0 +1,20 @@ +token-transfer: + load_type: token-transfer + tps_initial: 800 + tps_min: 1 + tps_max: 1200 +create-account: + load_type: create-account + tps_initial: 600 + tps_min: 1 + tps_max: 1200 +ledger-heavy: + load_type: ledger-heavy + tps_initial: 3 + tps_min: 1 + tps_max: 1200 +evm-transfer: + load_type: evm-transfer + tps_initial: 500 + tps_min: 1 + tps_max: 1200 diff --git a/integration/benchmark/worker_stats_tracker.go b/integration/benchmark/worker_stats_tracker.go index d2a0f60f92e..cd582a2c2bf 100644 --- a/integration/benchmark/worker_stats_tracker.go +++ b/integration/benchmark/worker_stats_tracker.go @@ -133,7 +133,7 @@ func NewPeriodicStatsLogger( w := NewWorker( ctx, 0, - 1*time.Second, + 3*time.Second, func(workerID int) { stats := st.GetStats() log.Info(). diff --git a/integration/go.mod b/integration/go.mod index 703cf893b19..69816fe19ef 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -42,6 +42,7 @@ require ( golang.org/x/sync v0.6.0 google.golang.org/grpc v1.60.1 google.golang.org/protobuf v1.32.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -351,7 +352,6 @@ require ( google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect modernc.org/libc v1.37.6 // indirect modernc.org/mathutil v1.6.0 // indirect diff --git a/integration/tests/access/cohort2/observer_indexer_enabled_test.go b/integration/tests/access/cohort2/observer_indexer_enabled_test.go index ef6f15e43f2..f36126cf0c7 100644 --- a/integration/tests/access/cohort2/observer_indexer_enabled_test.go +++ b/integration/tests/access/cohort2/observer_indexer_enabled_test.go @@ -18,6 +18,7 @@ import ( sdkcrypto "github.com/onflow/flow-go-sdk/crypto" "github.com/onflow/flow-go-sdk/templates" "github.com/onflow/flow-go/engine/access/rpc/backend" + "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/integration/testnet" "github.com/onflow/flow-go/integration/tests/lib" "github.com/onflow/flow-go/model/flow" @@ -42,8 +43,8 @@ type ObserverIndexerEnabledSuite struct { ObserverSuite } -// SetupTest sets up the test suite by starting the network and preparing the observer client. -// By overriding this function, we can ensure that the observer is started with correct parameters and select +// SetupTest sets up the test suite by starting the network and preparing the observers client. +// By overriding this function, we can ensure that the observers are started with correct parameters and select // the RPCs and REST endpoints that are tested. func (s *ObserverIndexerEnabledSuite) SetupTest() { s.localRpc = map[string]struct{}{ @@ -110,17 +111,23 @@ func (s *ObserverIndexerEnabledSuite) SetupTest() { testnet.NewNodeConfig(flow.RoleVerification, testnet.WithLogLevel(zerolog.FatalLevel)), } - observers := []testnet.ObserverConfig{{ - LogLevel: zerolog.InfoLevel, - AdditionalFlags: []string{ - fmt.Sprintf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir), - fmt.Sprintf("--execution-state-dir=%s", testnet.DefaultExecutionStateDir), - "--execution-data-sync-enabled=true", - "--execution-data-indexing-enabled=true", - "--local-service-api-enabled=true", - "--event-query-mode=execution-nodes-only", + observers := []testnet.ObserverConfig{ + { + LogLevel: zerolog.InfoLevel, + AdditionalFlags: []string{ + fmt.Sprintf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir), + fmt.Sprintf("--execution-state-dir=%s", testnet.DefaultExecutionStateDir), + "--execution-data-sync-enabled=true", + "--execution-data-indexing-enabled=true", + "--local-service-api-enabled=true", + "--event-query-mode=execution-nodes-only", + }, }, - }} + { + ContainerName: "observer_2", + LogLevel: zerolog.InfoLevel, + }, + } // prepare the network conf := testnet.NewNetworkConfig("observer_indexing_enabled_test", nodeConfigs, testnet.WithObservers(observers...)) @@ -134,9 +141,6 @@ func (s *ObserverIndexerEnabledSuite) SetupTest() { } // TestObserverIndexedRPCsHappyPath tests RPCs that are handled by the observer by using a dedicated indexer for the events. -// For now the observer only supports the following RPCs: -// - GetEventsForHeightRange -// - GetEventsForBlockIDs // To ensure that the observer is handling these RPCs, we stop the upstream access node and verify that the observer client // returns success for valid requests and errors for invalid ones. func (s *ObserverIndexerEnabledSuite) TestObserverIndexedRPCsHappyPath() { @@ -264,7 +268,277 @@ func (s *ObserverIndexerEnabledSuite) TestObserverIndexedRPCsHappyPath() { } } require.True(t, found) +} + +// TestAllObserverIndexedRPCsHappyPath tests the observer with the indexer enabled, +// observer configured to proxy requests to an access node and access node itself. All responses are compared +// to ensure all of the endpoints are working as expected. +// For now the observer only supports the following RPCs: +// -GetAccountAtBlockHeight +// -GetEventsForHeightRange +// -GetEventsForBlockIDs +// -GetSystemTransaction +// -GetTransactionsByBlockID +// -GetTransactionResultsByBlockID +// -ExecuteScriptAtBlockID +// -ExecuteScriptAtBlockHeight +// -GetExecutionResultByID +// -GetCollectionByID +// -GetTransaction +// -GetTransactionResult +// -GetTransactionResultByIndex +func (s *ObserverIndexerEnabledSuite) TestAllObserverIndexedRPCsHappyPath() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t := s.T() + + // prepare environment to create a new account + serviceAccountClient, err := s.net.ContainerByName(testnet.PrimaryAN).TestnetClient() + require.NoError(t, err) + + latestBlockID, err := serviceAccountClient.GetLatestBlockID(ctx) + require.NoError(t, err) + + // create new account to deploy Counter to + accountPrivateKey := lib.RandomPrivateKey() + + accountKey := sdk.NewAccountKey(). + FromPrivateKey(accountPrivateKey). + SetHashAlgo(sdkcrypto.SHA3_256). + SetWeight(sdk.AccountKeyWeightThreshold) + + serviceAddress := sdk.Address(serviceAccountClient.Chain.ServiceAddress()) + + // Generate the account creation transaction + createAccountTx, err := templates.CreateAccount( + []*sdk.AccountKey{accountKey}, + []templates.Contract{ + { + Name: lib.CounterContract.Name, + Source: lib.CounterContract.ToCadence(), + }, + }, serviceAddress) + require.NoError(t, err) + + createAccountTx. + SetReferenceBlockID(sdk.Identifier(latestBlockID)). + SetProposalKey(serviceAddress, 0, serviceAccountClient.GetSeqNumber()). + SetPayer(serviceAddress). + SetComputeLimit(9999) + + // send the create account tx + childCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + err = serviceAccountClient.SignAndSendTransaction(childCtx, createAccountTx) + require.NoError(t, err) + + cancel() + + // wait for account to be created + var accountCreationTxRes *sdk.TransactionResult + unittest.RequireReturnsBefore(t, func() { + accountCreationTxRes, err = serviceAccountClient.WaitForSealed(context.Background(), createAccountTx.ID()) + require.NoError(t, err) + }, 20*time.Second, "has to seal before timeout") + + // obtain the account address + var accountCreatedPayload []byte + var newAccountAddress sdk.Address + for _, event := range accountCreationTxRes.Events { + if event.Type == sdk.EventAccountCreated { + accountCreatedEvent := sdk.AccountCreatedEvent(event) + accountCreatedPayload = accountCreatedEvent.Payload + newAccountAddress = accountCreatedEvent.Address() + break + } + } + require.NotEqual(t, sdk.EmptyAddress, newAccountAddress) + + // now we can query events using observerLocal to data which has to be locally indexed + + // get an access node client + accessNode, err := s.getClient(s.net.ContainerByName(testnet.PrimaryAN).Addr(testnet.GRPCPort)) + require.NoError(t, err) + + // get an observer with indexer enabled client + observerLocal, err := s.getObserverClient() + require.NoError(t, err) + + // get an upstream observer client + observerUpstream, err := s.getClient(s.net.ContainerByName("observer_2").Addr(testnet.GRPCPort)) + require.NoError(t, err) + + // wait for data to be synced by observerLocal + require.Eventually(t, func() bool { + _, err := observerLocal.GetAccountAtBlockHeight(ctx, &accessproto.GetAccountAtBlockHeightRequest{ + Address: newAccountAddress.Bytes(), + BlockHeight: accountCreationTxRes.BlockHeight, + }) + statusErr, ok := status.FromError(err) + if !ok || err == nil { + return true + } + return statusErr.Code() != codes.OutOfRange + }, 30*time.Second, 1*time.Second) + + blockWithAccount, err := observerLocal.GetBlockByID(ctx, &accessproto.GetBlockByIDRequest{ + Id: accountCreationTxRes.BlockID[:], + FullBlockResponse: true, + }) + require.NoError(t, err) + + checkRPC := func(rpcCall func(client accessproto.AccessAPIClient) (any, error)) { + observerRes, err := rpcCall(observerLocal) + require.NoError(s.T(), err) + observerUpstreamRes, err := rpcCall(observerUpstream) + require.NoError(s.T(), err) + accessRes, err := rpcCall(accessNode) + require.NoError(s.T(), err) + + require.Equal(s.T(), observerRes, observerUpstreamRes) + require.Equal(s.T(), observerRes, accessRes) + } + + // GetEventsForBlockIDs + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + res, err := client.GetEventsForBlockIDs(ctx, &accessproto.GetEventsForBlockIDsRequest{ + Type: sdk.EventAccountCreated, + BlockIds: [][]byte{blockWithAccount.Block.Id}, + EventEncodingVersion: entities.EventEncodingVersion_JSON_CDC_V0, + }) + return res.Results, err + }) + + var txIndex uint32 + found := false + + // GetEventsForHeightRange + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + res, err := client.GetEventsForHeightRange(ctx, &accessproto.GetEventsForHeightRangeRequest{ + Type: sdk.EventAccountCreated, + StartHeight: blockWithAccount.Block.Height, + EndHeight: blockWithAccount.Block.Height, + EventEncodingVersion: entities.EventEncodingVersion_JSON_CDC_V0, + }) + + // Iterating through response Results to get txIndex of event + for _, eventsInBlock := range res.Results { + for _, event := range eventsInBlock.Events { + if event.Type == sdk.EventAccountCreated { + if bytes.Equal(event.Payload, accountCreatedPayload) { + found = true + txIndex = event.TransactionIndex + } + } + } + } + require.True(t, found) + return res.Results, err + }) + // GetSystemTransaction + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + res, err := client.GetSystemTransaction(ctx, &accessproto.GetSystemTransactionRequest{ + BlockId: blockWithAccount.Block.Id, + }) + return res.Transaction, err + }) + + // GetExecutionResultByID + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + converted, err := convert.MessageToBlock(blockWithAccount.Block) + require.NoError(t, err) + + resultId := converted.Payload.Results[0].ID() + res, err := client.GetExecutionResultByID(ctx, &accessproto.GetExecutionResultByIDRequest{ + Id: convert.IdentifierToMessage(resultId), + }) + return res.ExecutionResult, err + }) + + // GetTransaction + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + res, err := client.GetTransaction(ctx, &accessproto.GetTransactionRequest{ + Id: accountCreationTxRes.TransactionID.Bytes(), + BlockId: blockWithAccount.Block.Id, + CollectionId: nil, + }) + return res.Transaction, err + }) + + // GetTransactionResult + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + res, err := client.GetTransactionResult(ctx, &accessproto.GetTransactionRequest{ + Id: accountCreationTxRes.TransactionID.Bytes(), + BlockId: blockWithAccount.Block.Id, + CollectionId: accountCreationTxRes.CollectionID.Bytes(), + }) + return res.Events, err + }) + + // GetTransactionResultByIndex + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + res, err := client.GetTransactionResultByIndex(ctx, &accessproto.GetTransactionByIndexRequest{ + BlockId: blockWithAccount.Block.Id, + Index: txIndex, + EventEncodingVersion: entities.EventEncodingVersion_JSON_CDC_V0, + }) + return res.Events, err + }) + + // GetTransactionResultsByBlockID + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + res, err := client.GetTransactionResultsByBlockID(ctx, &accessproto.GetTransactionsByBlockIDRequest{ + BlockId: blockWithAccount.Block.Id, + EventEncodingVersion: entities.EventEncodingVersion_JSON_CDC_V0, + }) + return res.TransactionResults, err + }) + + // GetTransactionsByBlockID + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + res, err := client.GetTransactionsByBlockID(ctx, &accessproto.GetTransactionsByBlockIDRequest{ + BlockId: blockWithAccount.Block.Id, + }) + return res.Transactions, err + }) + + // GetCollectionByID + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + res, err := client.GetCollectionByID(ctx, &accessproto.GetCollectionByIDRequest{ + Id: accountCreationTxRes.CollectionID.Bytes(), + }) + return res.Collection, err + }) + + // ExecuteScriptAtBlockHeight + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + res, err := client.ExecuteScriptAtBlockHeight(ctx, &accessproto.ExecuteScriptAtBlockHeightRequest{ + BlockHeight: blockWithAccount.Block.Height, + Script: []byte(simpleScript), + Arguments: make([][]byte, 0), + }) + return res.Value, err + }) + + // ExecuteScriptAtBlockID + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + res, err := client.ExecuteScriptAtBlockID(ctx, &accessproto.ExecuteScriptAtBlockIDRequest{ + BlockId: blockWithAccount.Block.Id, + Script: []byte(simpleScript), + Arguments: make([][]byte, 0), + }) + return res.Value, err + }) + + // GetAccountAtBlockHeight + checkRPC(func(client accessproto.AccessAPIClient) (any, error) { + res, err := client.GetAccountAtBlockHeight(ctx, &accessproto.GetAccountAtBlockHeightRequest{ + Address: newAccountAddress.Bytes(), + BlockHeight: accountCreationTxRes.BlockHeight, + }) + return res.Account, err + }) } func (s *ObserverIndexerEnabledSuite) getRPCs() []RPCTest { diff --git a/integration/tests/access/cohort3/grpc_state_stream_test.go b/integration/tests/access/cohort3/grpc_state_stream_test.go index 1691aa0ef6c..be6f0840b99 100644 --- a/integration/tests/access/cohort3/grpc_state_stream_test.go +++ b/integration/tests/access/cohort3/grpc_state_stream_test.go @@ -8,9 +8,9 @@ import ( "log" "sync" "testing" - "time" "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "google.golang.org/grpc" @@ -21,7 +21,9 @@ import ( "github.com/onflow/flow-go-sdk/test" "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/engine/ghost/client" "github.com/onflow/flow-go/integration/testnet" + "github.com/onflow/flow-go/integration/tests/lib" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/counters" "github.com/onflow/flow-go/utils/unittest" @@ -48,6 +50,7 @@ func TestGrpcStateStream(t *testing.T) { type GrpcStateStreamSuite struct { suite.Suite + lib.TestnetStateTracker log zerolog.Logger @@ -58,7 +61,9 @@ type GrpcStateStreamSuite struct { net *testnet.FlowNetwork // RPC methods to test - testedRPCs func() []RPCTest + testedRPCs func() []subscribeEventsRPCTest + + ghostID flow.Identifier } func (s *GrpcStateStreamSuite) TearDownTest() { @@ -99,6 +104,14 @@ func (s *GrpcStateStreamSuite) SetupTest() { testnet.WithAdditionalFlag("--event-query-mode=execution-nodes-only"), ) + // add the ghost (access) node config + s.ghostID = unittest.IdentifierFixture() + ghostNode := testnet.NewNodeConfig( + flow.RoleAccess, + testnet.WithID(s.ghostID), + testnet.WithLogLevel(zerolog.FatalLevel), + testnet.AsGhost()) + consensusConfigs := []func(config *testnet.NodeConfig){ testnet.WithAdditionalFlag("--cruise-ctl-fallback-proposal-duration=400ms"), testnet.WithAdditionalFlag(fmt.Sprintf("--required-verification-seal-approvals=%d", 1)), @@ -117,6 +130,7 @@ func (s *GrpcStateStreamSuite) SetupTest() { testnet.NewNodeConfig(flow.RoleVerification, testnet.WithLogLevel(zerolog.FatalLevel)), testANConfig, // access_1 controlANConfig, // access_2 + ghostNode, // access ghost } // add the observer node config @@ -142,6 +156,13 @@ func (s *GrpcStateStreamSuite) SetupTest() { s.testedRPCs = s.getRPCs s.net.Start(s.ctx) + s.Track(s.T(), s.ctx, s.Ghost()) +} + +func (s *GrpcStateStreamSuite) Ghost() *client.GhostClient { + client, err := s.net.ContainerByID(s.ghostID).GhostClient() + require.NoError(s.T(), err, "could not get ghost client") + return client } // TestRestEventStreaming tests gRPC event streaming @@ -158,12 +179,17 @@ func (s *GrpcStateStreamSuite) TestHappyPath() { sdkClientTestON, err := getClient(testONURL) s.Require().NoError(err) + // get the first block height + currentFinalized := s.BlockState.HighestFinalizedHeight() + blockA := s.BlockState.WaitForHighestFinalizedProgress(s.T(), currentFinalized) + + // Let the network run for this many blocks + blockCount := uint64(5) + // wait for the requested number of sealed blocks + s.BlockState.WaitForSealed(s.T(), blockA.Header.Height+blockCount) + txGenerator, err := s.net.ContainerByName(testnet.PrimaryAN).TestnetClient() s.Require().NoError(err) - header, err := txGenerator.GetLatestSealedBlockHeader(s.ctx) - s.Require().NoError(err) - - time.Sleep(20 * time.Second) var startValue interface{} txCount := 10 @@ -171,24 +197,21 @@ func (s *GrpcStateStreamSuite) TestHappyPath() { for _, rpc := range s.testedRPCs() { s.T().Run(rpc.name, func(t *testing.T) { if rpc.name == "SubscribeEventsFromStartBlockID" { - startValue = header.ID.Bytes() + startValue = convert.IdentifierToMessage(blockA.ID()) } else { - startValue = header.Height + startValue = blockA.Header.Height } - testANStream, err := rpc.call(s.ctx, sdkClientTestAN, startValue, &executiondata.EventFilter{}) - s.Require().NoError(err) - testANEvents, testANErrs, err := SubscribeEventsHandler(s.ctx, testANStream) + testANRecv := rpc.call(s.ctx, sdkClientTestAN, startValue, &executiondata.EventFilter{}) + testANEvents, testANErrs, err := SubscribeHandler(s.ctx, testANRecv, eventsResponseHandler) s.Require().NoError(err) - controlANStream, err := rpc.call(s.ctx, sdkClientControlAN, startValue, &executiondata.EventFilter{}) - s.Require().NoError(err) - controlANEvents, controlANErrs, err := SubscribeEventsHandler(s.ctx, controlANStream) + controlANRecv := rpc.call(s.ctx, sdkClientControlAN, startValue, &executiondata.EventFilter{}) + controlANEvents, controlANErrs, err := SubscribeHandler(s.ctx, controlANRecv, eventsResponseHandler) s.Require().NoError(err) - testONStream, err := rpc.call(s.ctx, sdkClientTestON, startValue, &executiondata.EventFilter{}) - s.Require().NoError(err) - testONEvents, testONErrs, err := SubscribeEventsHandler(s.ctx, testONStream) + testONRecv := rpc.call(s.ctx, sdkClientTestON, startValue, &executiondata.EventFilter{}) + testONEvents, testONErrs, err := SubscribeHandler(s.ctx, testONRecv, eventsResponseHandler) s.Require().NoError(err) if rpc.generateEvents { @@ -213,7 +236,7 @@ func (s *GrpcStateStreamSuite) TestHappyPath() { foundONTxCount := 0 messageIndex := counters.NewMonotonousCounter(0) - r := newResponseTracker() + r := NewResponseTracker(compareEventsResponse, 3) for { select { @@ -226,7 +249,7 @@ func (s *GrpcStateStreamSuite) TestHappyPath() { case event := <-testANEvents: if has(event.Events, targetEvent) { s.T().Logf("adding access test events: %d %d %v", event.Height, len(event.Events), event.Events) - r.Add(s.T(), event.Height, "access_test", &event) + r.Add(s.T(), event.Height, "access_test", event) foundANTxCount++ } case event := <-controlANEvents: @@ -236,12 +259,12 @@ func (s *GrpcStateStreamSuite) TestHappyPath() { } s.T().Logf("adding control events: %d %d %v", event.Height, len(event.Events), event.Events) - r.Add(s.T(), event.Height, "access_control", &event) + r.Add(s.T(), event.Height, "access_control", event) } case event := <-testONEvents: if has(event.Events, targetEvent) { s.T().Logf("adding observer test events: %d %d %v", event.Height, len(event.Events), event.Events) - r.Add(s.T(), event.Height, "observer_test", &event) + r.Add(s.T(), event.Height, "observer_test", event) foundONTxCount++ } } @@ -250,6 +273,8 @@ func (s *GrpcStateStreamSuite) TestHappyPath() { break } } + + r.AssertAllResponsesHandled(t, txCount) }) } } @@ -270,104 +295,161 @@ func (s *GrpcStateStreamSuite) generateEvents(client *testnet.Client, txCount in } } -type RPCTest struct { +type subscribeEventsRPCTest struct { name string - call func(ctx context.Context, client executiondata.ExecutionDataAPIClient, startValue interface{}, filter *executiondata.EventFilter) (executiondata.ExecutionDataAPI_SubscribeEventsClient, error) + call func(ctx context.Context, client executiondata.ExecutionDataAPIClient, startValue interface{}, filter *executiondata.EventFilter) func() (*executiondata.SubscribeEventsResponse, error) generateEvents bool // add ability to integration test generate new events or use old events to decrease running test time } -func (s *GrpcStateStreamSuite) getRPCs() []RPCTest { - return []RPCTest{ +func (s *GrpcStateStreamSuite) getRPCs() []subscribeEventsRPCTest { + return []subscribeEventsRPCTest{ { name: "SubscribeEventsFromLatest", - call: func(ctx context.Context, client executiondata.ExecutionDataAPIClient, _ interface{}, filter *executiondata.EventFilter) (executiondata.ExecutionDataAPI_SubscribeEventsClient, error) { - return client.SubscribeEventsFromLatest(ctx, &executiondata.SubscribeEventsFromLatestRequest{ + call: func(ctx context.Context, client executiondata.ExecutionDataAPIClient, _ interface{}, filter *executiondata.EventFilter) func() (*executiondata.SubscribeEventsResponse, error) { + stream, err := client.SubscribeEventsFromLatest(ctx, &executiondata.SubscribeEventsFromLatestRequest{ EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, Filter: filter, HeartbeatInterval: 1, }) + s.Require().NoError(err) + return stream.Recv }, generateEvents: true, }, { name: "SubscribeEvents", - call: func(ctx context.Context, client executiondata.ExecutionDataAPIClient, _ interface{}, filter *executiondata.EventFilter) (executiondata.ExecutionDataAPI_SubscribeEventsClient, error) { + call: func(ctx context.Context, client executiondata.ExecutionDataAPIClient, _ interface{}, filter *executiondata.EventFilter) func() (*executiondata.SubscribeEventsResponse, error) { //nolint: staticcheck - return client.SubscribeEvents(ctx, &executiondata.SubscribeEventsRequest{ + stream, err := client.SubscribeEvents(ctx, &executiondata.SubscribeEventsRequest{ StartBlockId: convert.IdentifierToMessage(flow.ZeroID), StartBlockHeight: 0, EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, Filter: filter, HeartbeatInterval: 1, }) + s.Require().NoError(err) + return stream.Recv }, generateEvents: true, }, { name: "SubscribeEventsFromStartBlockID", - call: func(ctx context.Context, client executiondata.ExecutionDataAPIClient, startValue interface{}, filter *executiondata.EventFilter) (executiondata.ExecutionDataAPI_SubscribeEventsClient, error) { - return client.SubscribeEventsFromStartBlockID(ctx, &executiondata.SubscribeEventsFromStartBlockIDRequest{ + call: func(ctx context.Context, client executiondata.ExecutionDataAPIClient, startValue interface{}, filter *executiondata.EventFilter) func() (*executiondata.SubscribeEventsResponse, error) { + stream, err := client.SubscribeEventsFromStartBlockID(ctx, &executiondata.SubscribeEventsFromStartBlockIDRequest{ StartBlockId: startValue.([]byte), EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, Filter: filter, HeartbeatInterval: 1, }) + s.Require().NoError(err) + return stream.Recv }, generateEvents: false, // use previous events }, { name: "SubscribeEventsFromStartHeight", - call: func(ctx context.Context, client executiondata.ExecutionDataAPIClient, startValue interface{}, filter *executiondata.EventFilter) (executiondata.ExecutionDataAPI_SubscribeEventsClient, error) { - return client.SubscribeEventsFromStartHeight(ctx, &executiondata.SubscribeEventsFromStartHeightRequest{ + call: func(ctx context.Context, client executiondata.ExecutionDataAPIClient, startValue interface{}, filter *executiondata.EventFilter) func() (*executiondata.SubscribeEventsResponse, error) { + stream, err := client.SubscribeEventsFromStartHeight(ctx, &executiondata.SubscribeEventsFromStartHeightRequest{ StartBlockHeight: startValue.(uint64), EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, Filter: filter, HeartbeatInterval: 1, }) + s.Require().NoError(err) + return stream.Recv }, generateEvents: false, // use previous events }, } } -type ResponseTracker struct { - r map[uint64]map[string]SubscribeEventsResponse - mu sync.RWMutex +// ResponseTracker is a generic tracker for responses. +type ResponseTracker[T any] struct { + r map[uint64]map[string]T + mu sync.RWMutex + compare func(t *testing.T, responses map[uint64]map[string]T, blockHeight uint64) error + checkCount int // actual common count of responses we want to check + responsesCountToCompare int // count of responses that we want to compare with each other } -func newResponseTracker() *ResponseTracker { - return &ResponseTracker{ - r: make(map[uint64]map[string]SubscribeEventsResponse), +// NewResponseTracker creates a new ResponseTracker. +func NewResponseTracker[T any]( + compare func(t *testing.T, responses map[uint64]map[string]T, blockHeight uint64) error, + responsesCountToCompare int, +) *ResponseTracker[T] { + return &ResponseTracker[T]{ + r: make(map[uint64]map[string]T), + compare: compare, + responsesCountToCompare: responsesCountToCompare, + } +} + +func (r *ResponseTracker[T]) AssertAllResponsesHandled(t *testing.T, expectedCheckCount int) { + assert.Equal(t, expectedCheckCount, r.checkCount) + + // we check if response tracker has some responses which were not checked, but should be checked + hasNotComparedResponses := false + for _, valueMap := range r.r { + if len(valueMap) == r.responsesCountToCompare { + hasNotComparedResponses = true + break + } } + assert.False(t, hasNotComparedResponses) } -func (r *ResponseTracker) Add(t *testing.T, blockHeight uint64, name string, events *SubscribeEventsResponse) { +func (r *ResponseTracker[T]) Add(t *testing.T, blockHeight uint64, name string, response T) { r.mu.Lock() defer r.mu.Unlock() if _, ok := r.r[blockHeight]; !ok { - r.r[blockHeight] = make(map[string]SubscribeEventsResponse) + r.r[blockHeight] = make(map[string]T) } - r.r[blockHeight][name] = *events + r.r[blockHeight][name] = response - if len(r.r[blockHeight]) != 3 { + if len(r.r[blockHeight]) != r.responsesCountToCompare { return } - err := r.compare(t, r.r[blockHeight]["access_control"], r.r[blockHeight]["access_test"]) + r.checkCount += 1 + err := r.compare(t, r.r, blockHeight) if err != nil { - log.Fatalf("failure comparing access and access data %d: %v", blockHeight, err) - } - - err = r.compare(t, r.r[blockHeight]["access_control"], r.r[blockHeight]["observer_test"]) - if err != nil { - log.Fatalf("failure comparing access and observer data %d: %v", blockHeight, err) + log.Fatalf("comparison error at block height %d: %v", blockHeight, err) } delete(r.r, blockHeight) } -func (r *ResponseTracker) compare(t *testing.T, controlData SubscribeEventsResponse, testData SubscribeEventsResponse) error { +func eventsResponseHandler(msg *executiondata.SubscribeEventsResponse) (*SubscribeEventsResponse, error) { + events := convert.MessagesToEvents(msg.GetEvents()) + + return &SubscribeEventsResponse{ + EventsResponse: backend.EventsResponse{ + Height: msg.GetBlockHeight(), + BlockID: convert.MessageToIdentifier(msg.GetBlockId()), + Events: events, + BlockTimestamp: msg.GetBlockTimestamp().AsTime(), + }, + MessageIndex: msg.MessageIndex, + }, nil +} + +func compareEventsResponse(t *testing.T, responses map[uint64]map[string]*SubscribeEventsResponse, blockHeight uint64) error { + + accessControlData := responses[blockHeight]["access_control"] + accessTestData := responses[blockHeight]["access_test"] + observerTestData := responses[blockHeight]["observer_test"] + + // Compare access_control with access_test + compareEvents(t, accessControlData, accessTestData) + + // Compare access_control with observer_test + compareEvents(t, accessControlData, observerTestData) + + return nil +} + +func compareEvents(t *testing.T, controlData, testData *SubscribeEventsResponse) { require.Equal(t, controlData.BlockID, testData.BlockID) require.Equal(t, controlData.Height, testData.Height) require.Equal(t, controlData.BlockTimestamp, testData.BlockTimestamp) @@ -381,8 +463,6 @@ func (r *ResponseTracker) compare(t *testing.T, controlData SubscribeEventsRespo require.Equal(t, controlData.Events[i].EventIndex, testData.Events[i].EventIndex) require.True(t, bytes.Equal(controlData.Events[i].Payload, testData.Events[i].Payload)) } - - return nil } // TODO: switch to SDK versions once crypto library is fixed to support the latest SDK version @@ -396,11 +476,12 @@ func getClient(address string) (executiondata.ExecutionDataAPIClient, error) { return executiondata.NewExecutionDataAPIClient(conn), nil } -func SubscribeEventsHandler( +func SubscribeHandler[T any, V any]( ctx context.Context, - stream executiondata.ExecutionDataAPI_SubscribeEventsClient, -) (<-chan SubscribeEventsResponse, <-chan error, error) { - sub := make(chan SubscribeEventsResponse) + recv func() (T, error), + responseHandler func(T) (V, error), +) (<-chan V, <-chan error, error) { + sub := make(chan V) errChan := make(chan error) sendErr := func(err error) { @@ -415,26 +496,20 @@ func SubscribeEventsHandler( defer close(errChan) for { - resp, err := stream.Recv() + resp, err := recv() if err != nil { if err == io.EOF { return } - sendErr(fmt.Errorf("error receiving event: %w", err)) + sendErr(fmt.Errorf("error receiving response: %w", err)) return } - events := convert.MessagesToEvents(resp.GetEvents()) - - response := SubscribeEventsResponse{ - EventsResponse: backend.EventsResponse{ - Height: resp.GetBlockHeight(), - BlockID: convert.MessageToIdentifier(resp.GetBlockId()), - Events: events, - BlockTimestamp: resp.GetBlockTimestamp().AsTime(), - }, - MessageIndex: resp.MessageIndex, + response, err := responseHandler(resp) + if err != nil { + sendErr(fmt.Errorf("error converting response: %w", err)) + return } select { diff --git a/integration/tests/access/cohort3/grpc_streaming_blocks_test.go b/integration/tests/access/cohort3/grpc_streaming_blocks_test.go new file mode 100644 index 00000000000..82e1c23cf28 --- /dev/null +++ b/integration/tests/access/cohort3/grpc_streaming_blocks_test.go @@ -0,0 +1,278 @@ +package cohort3 + +import ( + "context" + "fmt" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/engine/ghost/client" + "github.com/onflow/flow-go/integration/testnet" + "github.com/onflow/flow-go/integration/tests/lib" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" + + accessproto "github.com/onflow/flow/protobuf/go/flow/access" + "github.com/onflow/flow/protobuf/go/flow/entities" +) + +func TestGrpcBlocksStream(t *testing.T) { + suite.Run(t, new(GrpcBlocksStreamSuite)) +} + +type GrpcBlocksStreamSuite struct { + suite.Suite + lib.TestnetStateTracker + + log zerolog.Logger + + // root context for the current test + ctx context.Context + cancel context.CancelFunc + + net *testnet.FlowNetwork + + // RPC methods to test + testedRPCs func() []subscribeBlocksRPCTest + + ghostID flow.Identifier +} + +func (s *GrpcBlocksStreamSuite) TearDownTest() { + s.log.Info().Msg("================> Start TearDownTest") + s.net.Remove() + s.cancel() + s.log.Info().Msg("================> Finish TearDownTest") +} + +func (s *GrpcBlocksStreamSuite) SetupTest() { + s.log = unittest.LoggerForTest(s.Suite.T(), zerolog.InfoLevel) + s.log.Info().Msg("================> SetupTest") + defer func() { + s.log.Info().Msg("================> Finish SetupTest") + }() + + // access node + accessConfig := testnet.NewNodeConfig( + flow.RoleAccess, + testnet.WithLogLevel(zerolog.InfoLevel), + testnet.WithAdditionalFlag("--execution-data-sync-enabled=true"), + testnet.WithAdditionalFlagf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir), + testnet.WithAdditionalFlag("--execution-data-retry-delay=1s"), + testnet.WithAdditionalFlag("--execution-data-indexing-enabled=true"), + testnet.WithAdditionalFlagf("--execution-state-dir=%s", testnet.DefaultExecutionStateDir), + testnet.WithAdditionalFlag("--event-query-mode=local-only"), + testnet.WithAdditionalFlag("--supports-observer=true"), + testnet.WithAdditionalFlagf("--public-network-execution-data-sync-enabled=true"), + ) + + consensusConfigs := []func(config *testnet.NodeConfig){ + testnet.WithAdditionalFlag("--cruise-ctl-fallback-proposal-duration=400ms"), + testnet.WithAdditionalFlag(fmt.Sprintf("--required-verification-seal-approvals=%d", 1)), + testnet.WithAdditionalFlag(fmt.Sprintf("--required-construction-seal-approvals=%d", 1)), + testnet.WithLogLevel(zerolog.FatalLevel), + } + + // add the ghost (access) node config + s.ghostID = unittest.IdentifierFixture() + ghostNode := testnet.NewNodeConfig( + flow.RoleAccess, + testnet.WithID(s.ghostID), + testnet.WithLogLevel(zerolog.FatalLevel), + testnet.AsGhost()) + + nodeConfigs := []testnet.NodeConfig{ + testnet.NewNodeConfig(flow.RoleCollection, testnet.WithLogLevel(zerolog.FatalLevel)), + testnet.NewNodeConfig(flow.RoleCollection, testnet.WithLogLevel(zerolog.FatalLevel)), + testnet.NewNodeConfig(flow.RoleExecution, testnet.WithLogLevel(zerolog.FatalLevel)), + testnet.NewNodeConfig(flow.RoleExecution, testnet.WithLogLevel(zerolog.FatalLevel)), + testnet.NewNodeConfig(flow.RoleConsensus, consensusConfigs...), + testnet.NewNodeConfig(flow.RoleConsensus, consensusConfigs...), + testnet.NewNodeConfig(flow.RoleConsensus, consensusConfigs...), + testnet.NewNodeConfig(flow.RoleVerification, testnet.WithLogLevel(zerolog.FatalLevel)), + accessConfig, + ghostNode, // access ghost + } + + // add the observer node config + observers := []testnet.ObserverConfig{{ + ContainerName: testnet.PrimaryON, + LogLevel: zerolog.DebugLevel, + AdditionalFlags: []string{ + fmt.Sprintf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir), + fmt.Sprintf("--execution-state-dir=%s", testnet.DefaultExecutionStateDir), + "--execution-data-sync-enabled=true", + "--event-query-mode=execution-nodes-only", + "--execution-data-indexing-enabled=true", + }, + }} + + conf := testnet.NewNetworkConfig("access_blocks_streaming_test", nodeConfigs, testnet.WithObservers(observers...)) + s.net = testnet.PrepareFlowNetwork(s.T(), conf, flow.Localnet) + + // start the network + s.T().Logf("starting flow network with docker containers") + s.ctx, s.cancel = context.WithCancel(context.Background()) + + s.testedRPCs = s.getRPCs + + s.net.Start(s.ctx) + s.Track(s.T(), s.ctx, s.Ghost()) +} + +func (s *GrpcBlocksStreamSuite) Ghost() *client.GhostClient { + client, err := s.net.ContainerByID(s.ghostID).GhostClient() + require.NoError(s.T(), err, "could not get ghost client") + return client +} + +// TestRestEventStreaming tests gRPC event streaming +func (s *GrpcBlocksStreamSuite) TestHappyPath() { + accessUrl := fmt.Sprintf("localhost:%s", s.net.ContainerByName(testnet.PrimaryAN).Port(testnet.GRPCPort)) + accessClient, err := getAccessAPIClient(accessUrl) + s.Require().NoError(err) + + observerURL := fmt.Sprintf("localhost:%s", s.net.ContainerByName(testnet.PrimaryON).Port(testnet.GRPCPort)) + observerClient, err := getAccessAPIClient(observerURL) + s.Require().NoError(err) + + // get the first block height + currentFinalized := s.BlockState.HighestFinalizedHeight() + blockA := s.BlockState.WaitForHighestFinalizedProgress(s.T(), currentFinalized) + + // Let the network run for this many blocks + blockCount := uint64(5) + // wait for the requested number of sealed blocks + s.BlockState.WaitForSealed(s.T(), blockA.Header.Height+blockCount) + + var startValue interface{} + txCount := 10 + + for _, rpc := range s.testedRPCs() { + s.T().Run(rpc.name, func(t *testing.T) { + if rpc.name == "SubscribeBlocksFromStartBlockID" { + startValue = convert.IdentifierToMessage(blockA.ID()) + } else { + startValue = blockA.Header.Height + } + + accessRecv := rpc.call(s.ctx, accessClient, startValue) + accessBlocks, accessBlockErrs, err := SubscribeHandler(s.ctx, accessRecv, blockResponseHandler) + s.Require().NoError(err) + + observerRecv := rpc.call(s.ctx, observerClient, startValue) + observerBlocks, observerBlockErrs, err := SubscribeHandler(s.ctx, observerRecv, blockResponseHandler) + s.Require().NoError(err) + + foundANTxCount := 0 + foundONTxCount := 0 + + r := NewResponseTracker(compareBlocksResponse, 2) + + for { + select { + case err := <-accessBlockErrs: + s.Require().NoErrorf(err, "unexpected AN error") + case err := <-observerBlockErrs: + s.Require().NoErrorf(err, "unexpected ON error") + case block := <-accessBlocks: + s.T().Logf("AN block received: height: %d", block.Header.Height) + r.Add(s.T(), block.Header.Height, "access", block) + foundANTxCount++ + case block := <-observerBlocks: + s.T().Logf("ON block received: height: %d", block.Header.Height) + r.Add(s.T(), block.Header.Height, "observer", block) + foundONTxCount++ + } + + if foundANTxCount >= txCount && foundONTxCount >= txCount { + break + } + } + + r.AssertAllResponsesHandled(t, txCount) + }) + } +} + +func blockResponseHandler(msg *accessproto.SubscribeBlocksResponse) (*flow.Block, error) { + return convert.MessageToBlock(msg.GetBlock()) +} + +func compareBlocksResponse(t *testing.T, responses map[uint64]map[string]*flow.Block, blockHeight uint64) error { + accessData := responses[blockHeight]["access"] + observerData := responses[blockHeight]["observer"] + + // Compare access with observer + compareBlocks(t, accessData, observerData) + + return nil +} + +func compareBlocks(t *testing.T, accessBlock *flow.Block, observerBlock *flow.Block) { + require.Equal(t, accessBlock.ID(), observerBlock.ID()) + require.Equal(t, accessBlock.Header.Height, observerBlock.Header.Height) + require.Equal(t, accessBlock.Header.Timestamp, observerBlock.Header.Timestamp) + require.Equal(t, accessBlock.Payload.Hash(), observerBlock.Payload.Hash()) +} + +type subscribeBlocksRPCTest struct { + name string + call func(ctx context.Context, client accessproto.AccessAPIClient, startValue interface{}) func() (*accessproto.SubscribeBlocksResponse, error) +} + +func (s *GrpcBlocksStreamSuite) getRPCs() []subscribeBlocksRPCTest { + return []subscribeBlocksRPCTest{ + { + name: "SubscribeBlocksFromLatest", + call: func(ctx context.Context, client accessproto.AccessAPIClient, _ interface{}) func() (*accessproto.SubscribeBlocksResponse, error) { + stream, err := client.SubscribeBlocksFromLatest(ctx, &accessproto.SubscribeBlocksFromLatestRequest{ + BlockStatus: entities.BlockStatus_BLOCK_FINALIZED, + FullBlockResponse: true, + }) + s.Require().NoError(err) + return stream.Recv + }, + }, + { + name: "SubscribeBlocksFromStartBlockID", + call: func(ctx context.Context, client accessproto.AccessAPIClient, startValue interface{}) func() (*accessproto.SubscribeBlocksResponse, error) { + stream, err := client.SubscribeBlocksFromStartBlockID(ctx, &accessproto.SubscribeBlocksFromStartBlockIDRequest{ + StartBlockId: startValue.([]byte), + BlockStatus: entities.BlockStatus_BLOCK_FINALIZED, + FullBlockResponse: true, + }) + s.Require().NoError(err) + return stream.Recv + }, + }, + { + name: "SubscribeBlocksFromStartHeight", + call: func(ctx context.Context, client accessproto.AccessAPIClient, startValue interface{}) func() (*accessproto.SubscribeBlocksResponse, error) { + stream, err := client.SubscribeBlocksFromStartHeight(ctx, &accessproto.SubscribeBlocksFromStartHeightRequest{ + StartBlockHeight: startValue.(uint64), + BlockStatus: entities.BlockStatus_BLOCK_FINALIZED, + FullBlockResponse: true, + }) + s.Require().NoError(err) + return stream.Recv + }, + }, + } +} + +func getAccessAPIClient(address string) (accessproto.AccessAPIClient, error) { + conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, err + } + + return accessproto.NewAccessAPIClient(conn), nil +} diff --git a/ledger/complete/wal/checkpoint_v6_reader.go b/ledger/complete/wal/checkpoint_v6_reader.go index 460343c49b4..8408b2a1683 100644 --- a/ledger/complete/wal/checkpoint_v6_reader.go +++ b/ledger/complete/wal/checkpoint_v6_reader.go @@ -20,8 +20,17 @@ import ( // ErrEOFNotReached for indicating end of file not reached error var ErrEOFNotReached = errors.New("expect to reach EOF, but actually didn't") -// TODO: validate the header file and the sub file that contains the root hashes -var ReadTriesRootHash = readTriesRootHash +func ReadTriesRootHash(logger zerolog.Logger, dir string, fileName string) ( + []ledger.RootHash, + error, +) { + err := validateCheckpointFile(logger, dir, fileName) + if err != nil { + return nil, err + } + return readTriesRootHash(logger, dir, fileName) +} + var CheckpointHasRootHash = checkpointHasRootHash // readCheckpointV6 reads checkpoint file from a main file and 17 file parts. @@ -849,3 +858,58 @@ func ensureReachedEOF(reader io.Reader) error { return fmt.Errorf("fail to check if reached EOF: %w", err) } + +func validateCheckpointFile(logger zerolog.Logger, dir, fileName string) error { + headerPath := filePathCheckpointHeader(dir, fileName) + // validate header file + subtrieChecksums, topTrieChecksum, err := readCheckpointHeader(headerPath, logger) + if err != nil { + return err + } + + // validate subtrie files + for index, expectedSum := range subtrieChecksums { + filepath, _, err := filePathSubTries(dir, fileName, index) + if err != nil { + return err + } + err = withFile(logger, filepath, func(f *os.File) error { + _, checksum, err := readSubTriesFooter(f) + if err != nil { + return fmt.Errorf("cannot read sub trie node count: %w", err) + } + + if checksum != expectedSum { + return fmt.Errorf("mismatch checksum in subtrie file. checksum from checkpoint header %v does not "+ + "match with the checksum in subtrie file %v", checksum, expectedSum) + } + return nil + }) + + if err != nil { + return err + } + } + + // validate top trie file + filepath, _ := filePathTopTries(dir, fileName) + err = withFile(logger, filepath, func(file *os.File) error { + // read subtrie Node count and validate + _, _, checkSum, err := readTopTriesFooter(file) + if err != nil { + return err + } + + if topTrieChecksum != checkSum { + return fmt.Errorf("mismatch top trie checksum, header file has %v, toptrie file has %v", + topTrieChecksum, checkSum) + } + + return nil + }) + if err != nil { + return err + } + + return nil +} diff --git a/ledger/complete/wal/checkpoint_v6_test.go b/ledger/complete/wal/checkpoint_v6_test.go index 1bf95e17419..ded3acf3e13 100644 --- a/ledger/complete/wal/checkpoint_v6_test.go +++ b/ledger/complete/wal/checkpoint_v6_test.go @@ -608,6 +608,33 @@ func TestReadCheckpointRootHash(t *testing.T) { }) } +func TestReadCheckpointRootHashValidateChecksum(t *testing.T) { + unittest.RunWithTempDir(t, func(dir string) { + tries := createSimpleTrie(t) + fileName := "checkpoint" + logger := unittest.Logger() + require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, fileName, logger), "fail to store checkpoint") + + // add a wrong checksum to top trie file + topTrieFilePath, _ := filePathTopTries(dir, fileName) + file, err := os.OpenFile(topTrieFilePath, os.O_RDWR, 0644) + require.NoError(t, err) + + fileInfo, err := file.Stat() + require.NoError(t, err) + fileSize := fileInfo.Size() + + invalidSum := encodeCRC32Sum(10) + _, err = file.WriteAt(invalidSum, fileSize-crc32SumSize) + require.NoError(t, err) + require.NoError(t, file.Close()) + + // ReadTriesRootHash will first validate the checksum and detect the error + _, err = ReadTriesRootHash(logger, dir, fileName) + require.Error(t, err) + }) +} + func TestReadCheckpointRootHashMulti(t *testing.T) { unittest.RunWithTempDir(t, func(dir string) { tries := createMultipleRandomTries(t) diff --git a/network/p2p/tracer/internal/rpc_sent_cache.go b/network/p2p/tracer/internal/rpc_sent_cache.go index 655ddf2179f..d1f5de9c294 100644 --- a/network/p2p/tracer/internal/rpc_sent_cache.go +++ b/network/p2p/tracer/internal/rpc_sent_cache.go @@ -1,8 +1,6 @@ package internal import ( - "fmt" - "github.com/rs/zerolog" "github.com/onflow/flow-go/model/flow" @@ -79,5 +77,5 @@ func (r *rpcSentCache) size() uint { // Returns: // - flow.Identifier: the entity ID. func (r *rpcSentCache) rpcSentEntityID(messageId string, controlMsgType p2pmsg.ControlMessageType) flow.Identifier { - return flow.MakeIDFromFingerPrint([]byte(fmt.Sprintf("%s%s", messageId, controlMsgType))) + return flow.MakeIDFromFingerPrint([]byte(messageId + string(controlMsgType))) }