From ec82a844bed4fbe2c901963bb179c7033d827c7d Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Mon, 26 Jun 2023 17:48:40 -0700 Subject: [PATCH 1/7] [Access] Refactor converters into separate files --- engine/common/rpc/convert/accounts.go | 92 ++ engine/common/rpc/convert/accounts_test.go | 29 + engine/common/rpc/convert/blocks.go | 155 +++ engine/common/rpc/convert/collections.go | 86 ++ engine/common/rpc/convert/convert.go | 1124 +---------------- engine/common/rpc/convert/convert_test.go | 215 ---- engine/common/rpc/convert/events.go | 165 +++ engine/common/rpc/convert/events_test.go | 98 ++ engine/common/rpc/convert/execution_data.go | 281 +++++ .../common/rpc/convert/execution_data_test.go | 99 ++ .../common/rpc/convert/execution_results.go | 174 +++ engine/common/rpc/convert/headers.go | 97 ++ engine/common/rpc/convert/shapshots.go | 35 + engine/common/rpc/convert/transactions.go | 123 ++ .../common/rpc/convert/transactions_test.go | 22 + 15 files changed, 1493 insertions(+), 1302 deletions(-) create mode 100644 engine/common/rpc/convert/accounts.go create mode 100644 engine/common/rpc/convert/accounts_test.go create mode 100644 engine/common/rpc/convert/blocks.go create mode 100644 engine/common/rpc/convert/collections.go delete mode 100644 engine/common/rpc/convert/convert_test.go create mode 100644 engine/common/rpc/convert/events.go create mode 100644 engine/common/rpc/convert/events_test.go create mode 100644 engine/common/rpc/convert/execution_data.go create mode 100644 engine/common/rpc/convert/execution_data_test.go create mode 100644 engine/common/rpc/convert/execution_results.go create mode 100644 engine/common/rpc/convert/headers.go create mode 100644 engine/common/rpc/convert/shapshots.go create mode 100644 engine/common/rpc/convert/transactions.go create mode 100644 engine/common/rpc/convert/transactions_test.go diff --git a/engine/common/rpc/convert/accounts.go b/engine/common/rpc/convert/accounts.go new file mode 100644 index 00000000000..0440d3c0685 --- /dev/null +++ b/engine/common/rpc/convert/accounts.go @@ -0,0 +1,92 @@ +package convert + +import ( + "github.com/onflow/flow/protobuf/go/flow/entities" + + "github.com/onflow/flow-go/crypto" + "github.com/onflow/flow-go/crypto/hash" + "github.com/onflow/flow-go/model/flow" +) + +// AccountToMessage converts a flow.Account to a protobuf message +func AccountToMessage(a *flow.Account) (*entities.Account, error) { + keys := make([]*entities.AccountKey, len(a.Keys)) + for i, k := range a.Keys { + messageKey, err := AccountKeyToMessage(k) + if err != nil { + return nil, err + } + keys[i] = messageKey + } + + return &entities.Account{ + Address: a.Address.Bytes(), + Balance: a.Balance, + Code: nil, + Keys: keys, + Contracts: a.Contracts, + }, nil +} + +// MessageToAccount converts a protobuf message to a flow.Account +func MessageToAccount(m *entities.Account) (*flow.Account, error) { + if m == nil { + return nil, ErrEmptyMessage + } + + accountKeys := make([]flow.AccountPublicKey, len(m.GetKeys())) + for i, key := range m.GetKeys() { + accountKey, err := MessageToAccountKey(key) + if err != nil { + return nil, err + } + + accountKeys[i] = *accountKey + } + + return &flow.Account{ + Address: flow.BytesToAddress(m.GetAddress()), + Balance: m.GetBalance(), + Keys: accountKeys, + Contracts: m.Contracts, + }, nil +} + +// AccountKeyToMessage converts a flow.AccountPublicKey to a protobuf message +func AccountKeyToMessage(a flow.AccountPublicKey) (*entities.AccountKey, error) { + publicKey := a.PublicKey.Encode() + return &entities.AccountKey{ + Index: uint32(a.Index), + PublicKey: publicKey, + SignAlgo: uint32(a.SignAlgo), + HashAlgo: uint32(a.HashAlgo), + Weight: uint32(a.Weight), + SequenceNumber: uint32(a.SeqNumber), + Revoked: a.Revoked, + }, nil +} + +// MessageToAccountKey converts a protobuf message to a flow.AccountPublicKey +func MessageToAccountKey(m *entities.AccountKey) (*flow.AccountPublicKey, error) { + if m == nil { + return nil, ErrEmptyMessage + } + + sigAlgo := crypto.SigningAlgorithm(m.GetSignAlgo()) + hashAlgo := hash.HashingAlgorithm(m.GetHashAlgo()) + + publicKey, err := crypto.DecodePublicKey(sigAlgo, m.GetPublicKey()) + if err != nil { + return nil, err + } + + return &flow.AccountPublicKey{ + Index: int(m.GetIndex()), + PublicKey: publicKey, + SignAlgo: sigAlgo, + HashAlgo: hashAlgo, + Weight: int(m.GetWeight()), + SeqNumber: uint64(m.GetSequenceNumber()), + Revoked: m.GetRevoked(), + }, nil +} diff --git a/engine/common/rpc/convert/accounts_test.go b/engine/common/rpc/convert/accounts_test.go new file mode 100644 index 00000000000..2ccbe72733f --- /dev/null +++ b/engine/common/rpc/convert/accounts_test.go @@ -0,0 +1,29 @@ +package convert_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/fvm" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestConvertAccountKey(t *testing.T) { + privateKey, _ := unittest.AccountKeyDefaultFixture() + accountKey := privateKey.PublicKey(fvm.AccountKeyWeightThreshold) + + // Explicitly test if Revoked is properly converted + accountKey.Revoked = true + + msg, err := convert.AccountKeyToMessage(accountKey) + assert.Nil(t, err) + + converted, err := convert.MessageToAccountKey(msg) + assert.Nil(t, err) + + assert.Equal(t, accountKey, *converted) + assert.Equal(t, accountKey.PublicKey, converted.PublicKey) + assert.Equal(t, accountKey.Revoked, converted.Revoked) +} diff --git a/engine/common/rpc/convert/blocks.go b/engine/common/rpc/convert/blocks.go new file mode 100644 index 00000000000..9633c86ddfa --- /dev/null +++ b/engine/common/rpc/convert/blocks.go @@ -0,0 +1,155 @@ +package convert + +import ( + "fmt" + + "github.com/onflow/flow/protobuf/go/flow/entities" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/onflow/flow-go/model/flow" +) + +// BlockToMessage converts a flow.Block to a protobuf Block message. +// signerIDs is a precomputed list of signer IDs for the block based on the block's signer indicies. +func BlockToMessage(h *flow.Block, signerIDs flow.IdentifierList) ( + *entities.Block, + error, +) { + + id := h.ID() + + parentID := h.Header.ParentID + t := timestamppb.New(h.Header.Timestamp) + cg := CollectionGuaranteesToMessages(h.Payload.Guarantees) + + seals := BlockSealsToMessages(h.Payload.Seals) + + execResults, err := ExecutionResultsToMessages(h.Payload.Results) + if err != nil { + return nil, err + } + + blockHeader, err := BlockHeaderToMessage(h.Header, signerIDs) + if err != nil { + return nil, err + } + + bh := entities.Block{ + Id: id[:], + Height: h.Header.Height, + ParentId: parentID[:], + Timestamp: t, + CollectionGuarantees: cg, + BlockSeals: seals, + Signatures: [][]byte{h.Header.ParentVoterSigData}, + ExecutionReceiptMetaList: ExecutionResultMetaListToMessages(h.Payload.Receipts), + ExecutionResultList: execResults, + BlockHeader: blockHeader, + } + + return &bh, nil +} + +// BlockToMessageLight converts a flow.Block to the light form of a protobuf Block message. +func BlockToMessageLight(h *flow.Block) *entities.Block { + id := h.ID() + + parentID := h.Header.ParentID + t := timestamppb.New(h.Header.Timestamp) + cg := CollectionGuaranteesToMessages(h.Payload.Guarantees) + + return &entities.Block{ + Id: id[:], + Height: h.Header.Height, + ParentId: parentID[:], + Timestamp: t, + CollectionGuarantees: cg, + Signatures: [][]byte{h.Header.ParentVoterSigData}, + } +} + +// MessageToBlock converts a protobuf Block message to a flow.Block. +func MessageToBlock(m *entities.Block) (*flow.Block, error) { + payload, err := PayloadFromMessage(m) + if err != nil { + return nil, fmt.Errorf("failed to extract payload data from message: %w", err) + } + header, err := MessageToBlockHeader(m.BlockHeader) + if err != nil { + return nil, fmt.Errorf("failed to convert block header: %w", err) + } + return &flow.Block{ + Header: header, + Payload: payload, + }, nil +} + +// BlockSealToMessage converts a flow.Seal to a protobuf BlockSeal message. +func BlockSealToMessage(s *flow.Seal) *entities.BlockSeal { + id := s.BlockID + result := s.ResultID + return &entities.BlockSeal{ + BlockId: id[:], + ExecutionReceiptId: result[:], + ExecutionReceiptSignatures: [][]byte{}, // filling seals signature with zero + FinalState: StateCommitmentToMessage(s.FinalState), + AggregatedApprovalSigs: AggregatedSignaturesToMessages(s.AggregatedApprovalSigs), + ResultId: IdentifierToMessage(s.ResultID), + } +} + +// MessageToBlockSeal converts a protobuf BlockSeal message to a flow.Seal. +func MessageToBlockSeal(m *entities.BlockSeal) (*flow.Seal, error) { + finalState, err := MessageToStateCommitment(m.FinalState) + if err != nil { + return nil, fmt.Errorf("failed to convert message to block seal: %w", err) + } + return &flow.Seal{ + BlockID: MessageToIdentifier(m.BlockId), + ResultID: MessageToIdentifier(m.ResultId), + FinalState: finalState, + AggregatedApprovalSigs: MessagesToAggregatedSignatures(m.AggregatedApprovalSigs), + }, nil +} + +// BlockSealsToMessages converts a slice of flow.Seal to a slice of protobuf BlockSeal messages. +func BlockSealsToMessages(b []*flow.Seal) []*entities.BlockSeal { + seals := make([]*entities.BlockSeal, len(b)) + for i, s := range b { + seals[i] = BlockSealToMessage(s) + } + return seals +} + +// MessagesToBlockSeals converts a slice of protobuf BlockSeal messages to a slice of flow.Seal. +func MessagesToBlockSeals(m []*entities.BlockSeal) ([]*flow.Seal, error) { + seals := make([]*flow.Seal, len(m)) + for i, s := range m { + msg, err := MessageToBlockSeal(s) + if err != nil { + return nil, err + } + seals[i] = msg + } + return seals, nil +} + +// PayloadFromMessage converts a protobuf Block message to a flow.Payload. +func PayloadFromMessage(m *entities.Block) (*flow.Payload, error) { + cgs := MessagesToCollectionGuarantees(m.CollectionGuarantees) + seals, err := MessagesToBlockSeals(m.BlockSeals) + if err != nil { + return nil, err + } + receipts := MessagesToExecutionResultMetaList(m.ExecutionReceiptMetaList) + results, err := MessagesToExecutionResults(m.ExecutionResultList) + if err != nil { + return nil, err + } + return &flow.Payload{ + Guarantees: cgs, + Seals: seals, + Receipts: receipts, + Results: results, + }, nil +} diff --git a/engine/common/rpc/convert/collections.go b/engine/common/rpc/convert/collections.go new file mode 100644 index 00000000000..00f3f477ccb --- /dev/null +++ b/engine/common/rpc/convert/collections.go @@ -0,0 +1,86 @@ +package convert + +import ( + "fmt" + + "github.com/onflow/flow/protobuf/go/flow/entities" + + "github.com/onflow/flow-go/model/flow" +) + +// CollectionToMessage converts a collection to a protobuf message +func CollectionToMessage(c *flow.Collection) (*entities.Collection, error) { + if c == nil || c.Transactions == nil { + return nil, fmt.Errorf("invalid collection") + } + + transactionsIDs := make([][]byte, len(c.Transactions)) + for i, t := range c.Transactions { + id := t.ID() + transactionsIDs[i] = id[:] + } + + collectionID := c.ID() + + ce := &entities.Collection{ + Id: collectionID[:], + TransactionIds: transactionsIDs, + } + + return ce, nil +} + +// LightCollectionToMessage converts a light collection to a protobuf message +func LightCollectionToMessage(c *flow.LightCollection) (*entities.Collection, error) { + if c == nil || c.Transactions == nil { + return nil, fmt.Errorf("invalid collection") + } + + collectionID := c.ID() + + return &entities.Collection{ + Id: collectionID[:], + TransactionIds: IdentifiersToMessages(c.Transactions), + }, nil +} + +// CollectionGuaranteeToMessage converts a collection guarantee to a protobuf message +func CollectionGuaranteeToMessage(g *flow.CollectionGuarantee) *entities.CollectionGuarantee { + id := g.ID() + + return &entities.CollectionGuarantee{ + CollectionId: id[:], + Signatures: [][]byte{g.Signature}, + ReferenceBlockId: IdentifierToMessage(g.ReferenceBlockID), + Signature: g.Signature, + SignerIndices: g.SignerIndices, + } +} + +// MessageToCollectionGuarantee converts a protobuf message to a collection guarantee +func MessageToCollectionGuarantee(m *entities.CollectionGuarantee) *flow.CollectionGuarantee { + return &flow.CollectionGuarantee{ + CollectionID: MessageToIdentifier(m.CollectionId), + ReferenceBlockID: MessageToIdentifier(m.ReferenceBlockId), + SignerIndices: m.SignerIndices, + Signature: MessageToSignature(m.Signature), + } +} + +// CollectionGuaranteesToMessages converts a slice of collection guarantees to a slice of protobuf messages +func CollectionGuaranteesToMessages(c []*flow.CollectionGuarantee) []*entities.CollectionGuarantee { + cg := make([]*entities.CollectionGuarantee, len(c)) + for i, g := range c { + cg[i] = CollectionGuaranteeToMessage(g) + } + return cg +} + +// MessagesToCollectionGuarantees converts a slice of protobuf messages to a slice of collection guarantees +func MessagesToCollectionGuarantees(m []*entities.CollectionGuarantee) []*flow.CollectionGuarantee { + cg := make([]*flow.CollectionGuarantee, len(m)) + for i, g := range m { + cg[i] = MessageToCollectionGuarantee(g) + } + return cg +} diff --git a/engine/common/rpc/convert/convert.go b/engine/common/rpc/convert/convert.go index 2baa49e7b6b..3419e997def 100644 --- a/engine/common/rpc/convert/convert.go +++ b/engine/common/rpc/convert/convert.go @@ -1,25 +1,13 @@ package convert import ( - "encoding/json" "errors" "fmt" - "github.com/onflow/cadence/encoding/ccf" - jsoncdc "github.com/onflow/cadence/encoding/json" "github.com/onflow/flow/protobuf/go/flow/entities" - execproto "github.com/onflow/flow/protobuf/go/flow/execution" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/onflow/flow-go/crypto" - "github.com/onflow/flow-go/crypto/hash" - "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/executiondatasync/execution_data" - "github.com/onflow/flow-go/state/protocol" - "github.com/onflow/flow-go/state/protocol/inmem" ) var ErrEmptyMessage = errors.New("protobuf message is empty") @@ -34,204 +22,8 @@ var ValidChainIds = map[string]bool{ flow.MonotonicEmulator.String(): true, } -func MessageToTransaction( - m *entities.Transaction, - chain flow.Chain, -) (flow.TransactionBody, error) { - if m == nil { - return flow.TransactionBody{}, ErrEmptyMessage - } - - t := flow.NewTransactionBody() - - proposalKey := m.GetProposalKey() - if proposalKey != nil { - proposalAddress, err := Address(proposalKey.GetAddress(), chain) - if err != nil { - return *t, err - } - t.SetProposalKey(proposalAddress, uint64(proposalKey.GetKeyId()), proposalKey.GetSequenceNumber()) - } - - payer := m.GetPayer() - if payer != nil { - payerAddress, err := Address(payer, chain) - if err != nil { - return *t, err - } - t.SetPayer(payerAddress) - } - - for _, authorizer := range m.GetAuthorizers() { - authorizerAddress, err := Address(authorizer, chain) - if err != nil { - return *t, err - } - t.AddAuthorizer(authorizerAddress) - } - - for _, sig := range m.GetPayloadSignatures() { - addr, err := Address(sig.GetAddress(), chain) - if err != nil { - return *t, err - } - t.AddPayloadSignature(addr, uint64(sig.GetKeyId()), sig.GetSignature()) - } - - for _, sig := range m.GetEnvelopeSignatures() { - addr, err := Address(sig.GetAddress(), chain) - if err != nil { - return *t, err - } - t.AddEnvelopeSignature(addr, uint64(sig.GetKeyId()), sig.GetSignature()) - } - - t.SetScript(m.GetScript()) - t.SetArguments(m.GetArguments()) - t.SetReferenceBlockID(flow.HashToID(m.GetReferenceBlockId())) - t.SetGasLimit(m.GetGasLimit()) - - return *t, nil -} - -func TransactionsToMessages(transactions []*flow.TransactionBody) []*entities.Transaction { - transactionMessages := make([]*entities.Transaction, len(transactions)) - for i, t := range transactions { - transactionMessages[i] = TransactionToMessage(*t) - } - return transactionMessages -} - -func TransactionToMessage(tb flow.TransactionBody) *entities.Transaction { - proposalKeyMessage := &entities.Transaction_ProposalKey{ - Address: tb.ProposalKey.Address.Bytes(), - KeyId: uint32(tb.ProposalKey.KeyIndex), - SequenceNumber: tb.ProposalKey.SequenceNumber, - } - - authMessages := make([][]byte, len(tb.Authorizers)) - for i, auth := range tb.Authorizers { - authMessages[i] = auth.Bytes() - } - - payloadSigMessages := make([]*entities.Transaction_Signature, len(tb.PayloadSignatures)) - - for i, sig := range tb.PayloadSignatures { - payloadSigMessages[i] = &entities.Transaction_Signature{ - Address: sig.Address.Bytes(), - KeyId: uint32(sig.KeyIndex), - Signature: sig.Signature, - } - } - - envelopeSigMessages := make([]*entities.Transaction_Signature, len(tb.EnvelopeSignatures)) - - for i, sig := range tb.EnvelopeSignatures { - envelopeSigMessages[i] = &entities.Transaction_Signature{ - Address: sig.Address.Bytes(), - KeyId: uint32(sig.KeyIndex), - Signature: sig.Signature, - } - } - - return &entities.Transaction{ - Script: tb.Script, - Arguments: tb.Arguments, - ReferenceBlockId: tb.ReferenceBlockID[:], - GasLimit: tb.GasLimit, - ProposalKey: proposalKeyMessage, - Payer: tb.Payer.Bytes(), - Authorizers: authMessages, - PayloadSignatures: payloadSigMessages, - EnvelopeSignatures: envelopeSigMessages, - } -} - -func BlockHeaderToMessage( - h *flow.Header, - signerIDs flow.IdentifierList, -) (*entities.BlockHeader, error) { - id := h.ID() - - t := timestamppb.New(h.Timestamp) - var lastViewTC *entities.TimeoutCertificate - if h.LastViewTC != nil { - newestQC := h.LastViewTC.NewestQC - lastViewTC = &entities.TimeoutCertificate{ - View: h.LastViewTC.View, - HighQcViews: h.LastViewTC.NewestQCViews, - SignerIndices: h.LastViewTC.SignerIndices, - SigData: h.LastViewTC.SigData, - HighestQc: &entities.QuorumCertificate{ - View: newestQC.View, - BlockId: newestQC.BlockID[:], - SignerIndices: newestQC.SignerIndices, - SigData: newestQC.SigData, - }, - } - } - parentVoterIds := IdentifiersToMessages(signerIDs) - - return &entities.BlockHeader{ - Id: id[:], - ParentId: h.ParentID[:], - Height: h.Height, - PayloadHash: h.PayloadHash[:], - Timestamp: t, - View: h.View, - ParentView: h.ParentView, - ParentVoterIndices: h.ParentVoterIndices, - ParentVoterIds: parentVoterIds, - ParentVoterSigData: h.ParentVoterSigData, - ProposerId: h.ProposerID[:], - ProposerSigData: h.ProposerSigData, - ChainId: h.ChainID.String(), - LastViewTc: lastViewTC, - }, nil -} - -func MessageToBlockHeader(m *entities.BlockHeader) (*flow.Header, error) { - chainId, err := MessageToChainId(m.ChainId) - if err != nil { - return nil, fmt.Errorf("failed to convert ChainId: %w", err) - } - var lastViewTC *flow.TimeoutCertificate - if m.LastViewTc != nil { - newestQC := m.LastViewTc.HighestQc - if newestQC == nil { - return nil, fmt.Errorf("invalid structure newest QC should be present") - } - lastViewTC = &flow.TimeoutCertificate{ - View: m.LastViewTc.View, - NewestQCViews: m.LastViewTc.HighQcViews, - SignerIndices: m.LastViewTc.SignerIndices, - SigData: m.LastViewTc.SigData, - NewestQC: &flow.QuorumCertificate{ - View: newestQC.View, - BlockID: MessageToIdentifier(newestQC.BlockId), - SignerIndices: newestQC.SignerIndices, - SigData: newestQC.SigData, - }, - } - } - - return &flow.Header{ - ParentID: MessageToIdentifier(m.ParentId), - Height: m.Height, - PayloadHash: MessageToIdentifier(m.PayloadHash), - Timestamp: m.Timestamp.AsTime(), - View: m.View, - ParentView: m.ParentView, - ParentVoterIndices: m.ParentVoterIndices, - ParentVoterSigData: m.ParentVoterSigData, - ProposerID: MessageToIdentifier(m.ProposerId), - ProposerSigData: m.ProposerSigData, - ChainID: *chainId, - LastViewTC: lastViewTC, - }, nil -} - -// MessageToChainId checks chainId from enumeration to prevent a panic on Chain() being called +// MessageToChainId converts the chainID from a protobuf message to a flow.ChainID +// It returns an error if the value is not a valid chainId func MessageToChainId(m string) (*flow.ChainID, error) { if !ValidChainIds[m] { return nil, fmt.Errorf("invalid chainId %s: ", m) @@ -240,209 +32,21 @@ func MessageToChainId(m string) (*flow.ChainID, error) { return &chainId, nil } -func CollectionGuaranteesToMessages(c []*flow.CollectionGuarantee) []*entities.CollectionGuarantee { - cg := make([]*entities.CollectionGuarantee, len(c)) - for i, g := range c { - cg[i] = CollectionGuaranteeToMessage(g) - } - return cg -} - -func MessagesToCollectionGuarantees(m []*entities.CollectionGuarantee) []*flow.CollectionGuarantee { - cg := make([]*flow.CollectionGuarantee, len(m)) - for i, g := range m { - cg[i] = MessageToCollectionGuarantee(g) - } - return cg -} - -func BlockSealsToMessages(b []*flow.Seal) []*entities.BlockSeal { - seals := make([]*entities.BlockSeal, len(b)) - for i, s := range b { - seals[i] = BlockSealToMessage(s) - } - return seals -} - -func MessagesToBlockSeals(m []*entities.BlockSeal) ([]*flow.Seal, error) { - seals := make([]*flow.Seal, len(m)) - for i, s := range m { - msg, err := MessageToBlockSeal(s) - if err != nil { - return nil, err - } - seals[i] = msg - } - return seals, nil -} - -func ExecutionResultsToMessages(e []*flow.ExecutionResult) ( - []*entities.ExecutionResult, - error, -) { - execResults := make([]*entities.ExecutionResult, len(e)) - for i, execRes := range e { - parsedExecResult, err := ExecutionResultToMessage(execRes) - if err != nil { - return nil, err - } - execResults[i] = parsedExecResult - } - return execResults, nil -} - -func MessagesToExecutionResults(m []*entities.ExecutionResult) ( - []*flow.ExecutionResult, - error, -) { - execResults := make([]*flow.ExecutionResult, len(m)) - for i, e := range m { - parsedExecResult, err := MessageToExecutionResult(e) - if err != nil { - return nil, fmt.Errorf("failed to convert message at index %d to execution result: %w", i, err) - } - execResults[i] = parsedExecResult - } - return execResults, nil -} - -func BlockToMessage(h *flow.Block, signerIDs flow.IdentifierList) ( - *entities.Block, - error, -) { - - id := h.ID() - - parentID := h.Header.ParentID - t := timestamppb.New(h.Header.Timestamp) - cg := CollectionGuaranteesToMessages(h.Payload.Guarantees) - - seals := BlockSealsToMessages(h.Payload.Seals) - - execResults, err := ExecutionResultsToMessages(h.Payload.Results) - if err != nil { - return nil, err - } - - blockHeader, err := BlockHeaderToMessage(h.Header, signerIDs) - if err != nil { - return nil, err - } - - bh := entities.Block{ - Id: id[:], - Height: h.Header.Height, - ParentId: parentID[:], - Timestamp: t, - CollectionGuarantees: cg, - BlockSeals: seals, - Signatures: [][]byte{h.Header.ParentVoterSigData}, - ExecutionReceiptMetaList: ExecutionResultMetaListToMessages(h.Payload.Receipts), - ExecutionResultList: execResults, - BlockHeader: blockHeader, - } - - return &bh, nil -} - -func BlockToMessageLight(h *flow.Block) *entities.Block { - id := h.ID() - - parentID := h.Header.ParentID - t := timestamppb.New(h.Header.Timestamp) - cg := CollectionGuaranteesToMessages(h.Payload.Guarantees) - - return &entities.Block{ - Id: id[:], - Height: h.Header.Height, - ParentId: parentID[:], - Timestamp: t, - CollectionGuarantees: cg, - Signatures: [][]byte{h.Header.ParentVoterSigData}, - } -} - -func MessageToBlock(m *entities.Block) (*flow.Block, error) { - payload, err := PayloadFromMessage(m) - if err != nil { - return nil, fmt.Errorf("failed to extract payload data from message: %w", err) - } - header, err := MessageToBlockHeader(m.BlockHeader) - if err != nil { - return nil, fmt.Errorf("failed to convert block header: %w", err) - } - return &flow.Block{ - Header: header, - Payload: payload, - }, nil -} - -func MessagesToExecutionResultMetaList(m []*entities.ExecutionReceiptMeta) flow.ExecutionReceiptMetaList { - execMetaList := make([]*flow.ExecutionReceiptMeta, len(m)) - for i, message := range m { - execMetaList[i] = &flow.ExecutionReceiptMeta{ - ExecutorID: MessageToIdentifier(message.ExecutorId), - ResultID: MessageToIdentifier(message.ResultId), - Spocks: MessagesToSignatures(message.Spocks), - ExecutorSignature: MessageToSignature(message.ExecutorSignature), - } - } - return execMetaList[:] -} - -func ExecutionResultMetaListToMessages(e flow.ExecutionReceiptMetaList) []*entities.ExecutionReceiptMeta { - messageList := make([]*entities.ExecutionReceiptMeta, len(e)) - for i, execMeta := range e { - messageList[i] = &entities.ExecutionReceiptMeta{ - ExecutorId: IdentifierToMessage(execMeta.ExecutorID), - ResultId: IdentifierToMessage(execMeta.ResultID), - Spocks: SignaturesToMessages(execMeta.Spocks), - ExecutorSignature: MessageToSignature(execMeta.ExecutorSignature), +// AggregatedSignaturesToMessages converts a slice of AggregatedSignature structs to a corresponding +// slice of protobuf messages +func AggregatedSignaturesToMessages(a []flow.AggregatedSignature) []*entities.AggregatedSignature { + parsedMessages := make([]*entities.AggregatedSignature, len(a)) + for i, sig := range a { + parsedMessages[i] = &entities.AggregatedSignature{ + SignerIds: IdentifiersToMessages(sig.SignerIDs), + VerifierSignatures: SignaturesToMessages(sig.VerifierSignatures), } } - return messageList -} - -func PayloadFromMessage(m *entities.Block) (*flow.Payload, error) { - cgs := MessagesToCollectionGuarantees(m.CollectionGuarantees) - seals, err := MessagesToBlockSeals(m.BlockSeals) - if err != nil { - return nil, err - } - receipts := MessagesToExecutionResultMetaList(m.ExecutionReceiptMetaList) - results, err := MessagesToExecutionResults(m.ExecutionResultList) - if err != nil { - return nil, err - } - return &flow.Payload{ - Guarantees: cgs, - Seals: seals, - Receipts: receipts, - Results: results, - }, nil -} - -func CollectionGuaranteeToMessage(g *flow.CollectionGuarantee) *entities.CollectionGuarantee { - id := g.ID() - - return &entities.CollectionGuarantee{ - CollectionId: id[:], - Signatures: [][]byte{g.Signature}, - ReferenceBlockId: IdentifierToMessage(g.ReferenceBlockID), - Signature: g.Signature, - SignerIndices: g.SignerIndices, - } -} - -func MessageToCollectionGuarantee(m *entities.CollectionGuarantee) *flow.CollectionGuarantee { - return &flow.CollectionGuarantee{ - CollectionID: MessageToIdentifier(m.CollectionId), - ReferenceBlockID: MessageToIdentifier(m.ReferenceBlockId), - SignerIndices: m.SignerIndices, - Signature: MessageToSignature(m.Signature), - } + return parsedMessages } +// MessagesToAggregatedSignatures converts a slice of protobuf messages to their corresponding +// AggregatedSignature structs func MessagesToAggregatedSignatures(m []*entities.AggregatedSignature) []flow.AggregatedSignature { parsedSignatures := make([]flow.AggregatedSignature, len(m)) for i, message := range m { @@ -454,29 +58,17 @@ func MessagesToAggregatedSignatures(m []*entities.AggregatedSignature) []flow.Ag return parsedSignatures } -func AggregatedSignaturesToMessages(a []flow.AggregatedSignature) []*entities.AggregatedSignature { - parsedMessages := make([]*entities.AggregatedSignature, len(a)) - for i, sig := range a { - parsedMessages[i] = &entities.AggregatedSignature{ - SignerIds: IdentifiersToMessages(sig.SignerIDs), - VerifierSignatures: SignaturesToMessages(sig.VerifierSignatures), - } - } - return parsedMessages -} - -func MessagesToSignatures(m [][]byte) []crypto.Signature { - signatures := make([]crypto.Signature, len(m)) - for i, message := range m { - signatures[i] = MessageToSignature(message) - } - return signatures +// SignatureToMessage converts a crypto.Signature to a byte slice for inclusion in a protobuf message +func SignatureToMessage(s crypto.Signature) []byte { + return s[:] } +// MessageToSignature converts a byte slice from a protobuf message to a crypto.Signature func MessageToSignature(m []byte) crypto.Signature { return m[:] } +// SignaturesToMessages converts a slice of crypto.Signatures to a slice of byte slices for inclusion in a protobuf message func SignaturesToMessages(s []crypto.Signature) [][]byte { messages := make([][]byte, len(s)) for i, sig := range s { @@ -485,269 +77,26 @@ func SignaturesToMessages(s []crypto.Signature) [][]byte { return messages } -func SignatureToMessage(s crypto.Signature) []byte { - return s[:] -} - -func BlockSealToMessage(s *flow.Seal) *entities.BlockSeal { - id := s.BlockID - result := s.ResultID - return &entities.BlockSeal{ - BlockId: id[:], - ExecutionReceiptId: result[:], - ExecutionReceiptSignatures: [][]byte{}, // filling seals signature with zero - FinalState: StateCommitmentToMessage(s.FinalState), - AggregatedApprovalSigs: AggregatedSignaturesToMessages(s.AggregatedApprovalSigs), - ResultId: IdentifierToMessage(s.ResultID), - } -} - -func MessageToBlockSeal(m *entities.BlockSeal) (*flow.Seal, error) { - finalState, err := MessageToStateCommitment(m.FinalState) - if err != nil { - return nil, fmt.Errorf("failed to convert message to block seal: %w", err) - } - return &flow.Seal{ - BlockID: MessageToIdentifier(m.BlockId), - ResultID: MessageToIdentifier(m.ResultId), - FinalState: finalState, - AggregatedApprovalSigs: MessagesToAggregatedSignatures(m.AggregatedApprovalSigs), - }, nil -} - -func CollectionToMessage(c *flow.Collection) (*entities.Collection, error) { - if c == nil || c.Transactions == nil { - return nil, fmt.Errorf("invalid collection") - } - - transactionsIDs := make([][]byte, len(c.Transactions)) - for i, t := range c.Transactions { - id := t.ID() - transactionsIDs[i] = id[:] - } - - collectionID := c.ID() - - ce := &entities.Collection{ - Id: collectionID[:], - TransactionIds: transactionsIDs, - } - - return ce, nil -} - -func LightCollectionToMessage(c *flow.LightCollection) (*entities.Collection, error) { - if c == nil || c.Transactions == nil { - return nil, fmt.Errorf("invalid collection") - } - - collectionID := c.ID() - - return &entities.Collection{ - Id: collectionID[:], - TransactionIds: IdentifiersToMessages(c.Transactions), - }, nil -} - -func EventToMessage(e flow.Event) *entities.Event { - return &entities.Event{ - Type: string(e.Type), - TransactionId: e.TransactionID[:], - TransactionIndex: e.TransactionIndex, - EventIndex: e.EventIndex, - Payload: e.Payload, - } -} - -func MessageToAccount(m *entities.Account) (*flow.Account, error) { - if m == nil { - return nil, ErrEmptyMessage - } - - accountKeys := make([]flow.AccountPublicKey, len(m.GetKeys())) - for i, key := range m.GetKeys() { - accountKey, err := MessageToAccountKey(key) - if err != nil { - return nil, err - } - - accountKeys[i] = *accountKey - } - - return &flow.Account{ - Address: flow.BytesToAddress(m.GetAddress()), - Balance: m.GetBalance(), - Keys: accountKeys, - Contracts: m.Contracts, - }, nil -} - -func AccountToMessage(a *flow.Account) (*entities.Account, error) { - keys := make([]*entities.AccountKey, len(a.Keys)) - for i, k := range a.Keys { - messageKey, err := AccountKeyToMessage(k) - if err != nil { - return nil, err - } - keys[i] = messageKey - } - - return &entities.Account{ - Address: a.Address.Bytes(), - Balance: a.Balance, - Code: nil, - Keys: keys, - Contracts: a.Contracts, - }, nil -} - -func MessageToAccountKey(m *entities.AccountKey) (*flow.AccountPublicKey, error) { - if m == nil { - return nil, ErrEmptyMessage - } - - sigAlgo := crypto.SigningAlgorithm(m.GetSignAlgo()) - hashAlgo := hash.HashingAlgorithm(m.GetHashAlgo()) - - publicKey, err := crypto.DecodePublicKey(sigAlgo, m.GetPublicKey()) - if err != nil { - return nil, err - } - - return &flow.AccountPublicKey{ - Index: int(m.GetIndex()), - PublicKey: publicKey, - SignAlgo: sigAlgo, - HashAlgo: hashAlgo, - Weight: int(m.GetWeight()), - SeqNumber: uint64(m.GetSequenceNumber()), - Revoked: m.GetRevoked(), - }, nil -} - -func AccountKeyToMessage(a flow.AccountPublicKey) (*entities.AccountKey, error) { - publicKey := a.PublicKey.Encode() - return &entities.AccountKey{ - Index: uint32(a.Index), - PublicKey: publicKey, - SignAlgo: uint32(a.SignAlgo), - HashAlgo: uint32(a.HashAlgo), - Weight: uint32(a.Weight), - SequenceNumber: uint32(a.SeqNumber), - Revoked: a.Revoked, - }, nil -} - -func MessageToEvent(m *entities.Event) flow.Event { - return flow.Event{ - Type: flow.EventType(m.GetType()), - TransactionID: flow.HashToID(m.GetTransactionId()), - TransactionIndex: m.GetTransactionIndex(), - EventIndex: m.GetEventIndex(), - Payload: m.GetPayload(), - } -} - -func MessagesToEvents(l []*entities.Event) []flow.Event { - events := make([]flow.Event, len(l)) - - for i, m := range l { - events[i] = MessageToEvent(m) - } - - return events -} - -func MessagesToEventsFromVersion(l []*entities.Event, version execproto.EventEncodingVersion) ([]flow.Event, error) { - events := make([]flow.Event, len(l)) - for i, m := range l { - event, err := MessageToEventFromVersion(m, version) - if err != nil { - return nil, fmt.Errorf("could not convert event at index %d from format %d: %w", - m.EventIndex, version, err) - } - events[i] = *event - } - return events, nil -} - -func MessageToEventFromVersion(m *entities.Event, version execproto.EventEncodingVersion) (*flow.Event, error) { - switch version { - case execproto.EventEncodingVersion_CCF_V0: - convertedPayload, err := CcfPayloadToJsonPayload(m.Payload) - if err != nil { - return nil, fmt.Errorf("could not convert event payload from CCF to Json: %w", err) - } - return &flow.Event{ - Type: flow.EventType(m.GetType()), - TransactionID: flow.HashToID(m.GetTransactionId()), - TransactionIndex: m.GetTransactionIndex(), - EventIndex: m.GetEventIndex(), - Payload: convertedPayload, - }, nil - case execproto.EventEncodingVersion_JSON_CDC_V0: - je := MessageToEvent(m) - return &je, nil - default: - return nil, fmt.Errorf("invalid encoding format %d", version) - } -} - -func CcfPayloadToJsonPayload(p []byte) ([]byte, error) { - val, err := ccf.Decode(nil, p) - if err != nil { - return nil, fmt.Errorf("unable to decode from ccf format: %w", err) - } - res, err := jsoncdc.Encode(val) - if err != nil { - return nil, fmt.Errorf("unable to encode to json-cdc format: %w", err) - } - return res, nil -} - -func CcfEventToJsonEvent(e flow.Event) (*flow.Event, error) { - convertedPayload, err := CcfPayloadToJsonPayload(e.Payload) - if err != nil { - return nil, err - } - return &flow.Event{ - Type: e.Type, - TransactionID: e.TransactionID, - TransactionIndex: e.TransactionIndex, - EventIndex: e.EventIndex, - Payload: convertedPayload, - }, nil -} - -func EventsToMessages(flowEvents []flow.Event) []*entities.Event { - events := make([]*entities.Event, len(flowEvents)) - for i, e := range flowEvents { - event := EventToMessage(e) - events[i] = event +// MessagesToSignatures converts a slice of byte slices from a protobuf message to a slice of crypto.Signatures +func MessagesToSignatures(m [][]byte) []crypto.Signature { + signatures := make([]crypto.Signature, len(m)) + for i, message := range m { + signatures[i] = MessageToSignature(message) } - return events + return signatures } +// IdentifierToMessage converts a flow.Identifier to a byte slice for inclusion in a protobuf message func IdentifierToMessage(i flow.Identifier) []byte { return i[:] } +// MessageToIdentifier converts a byte slice from a protobuf message to a flow.Identifier func MessageToIdentifier(b []byte) flow.Identifier { return flow.HashToID(b) } -func StateCommitmentToMessage(s flow.StateCommitment) []byte { - return s[:] -} - -func MessageToStateCommitment(bytes []byte) (sc flow.StateCommitment, err error) { - if len(bytes) != len(sc) { - return sc, fmt.Errorf("invalid state commitment length. got %d expected %d", len(bytes), len(sc)) - } - copy(sc[:], bytes) - return -} - +// IdentifiersToMessages converts a slice of flow.Identifiers to a slice of byte slices for inclusion in a protobuf message func IdentifiersToMessages(l []flow.Identifier) [][]byte { results := make([][]byte, len(l)) for i, item := range l { @@ -756,6 +105,7 @@ func IdentifiersToMessages(l []flow.Identifier) [][]byte { return results } +// MessagesToIdentifiers converts a slice of byte slices from a protobuf message to a slice of flow.Identifiers func MessagesToIdentifiers(l [][]byte) []flow.Identifier { results := make([]flow.Identifier, len(l)) for i, item := range l { @@ -764,416 +114,16 @@ func MessagesToIdentifiers(l [][]byte) []flow.Identifier { return results } -// SnapshotToBytes converts a `protocol.Snapshot` to bytes, encoded as JSON -func SnapshotToBytes(snapshot protocol.Snapshot) ([]byte, error) { - serializable, err := inmem.FromSnapshot(snapshot) - if err != nil { - return nil, err - } - - data, err := json.Marshal(serializable.Encodable()) - if err != nil { - return nil, err - } - - return data, nil -} - -// BytesToInmemSnapshot converts an array of bytes to `inmem.Snapshot` -func BytesToInmemSnapshot(bytes []byte) (*inmem.Snapshot, error) { - var encodable inmem.EncodableSnapshot - err := json.Unmarshal(bytes, &encodable) - if err != nil { - return nil, fmt.Errorf("could not unmarshal decoded snapshot: %w", err) - } - - return inmem.SnapshotFromEncodable(encodable), nil -} - -func MessagesToChunkList(m []*entities.Chunk) (flow.ChunkList, error) { - parsedChunks := make(flow.ChunkList, len(m)) - for i, chunk := range m { - parsedChunk, err := MessageToChunk(chunk) - if err != nil { - return nil, fmt.Errorf("failed to parse message at index %d to chunk: %w", i, err) - } - parsedChunks[i] = parsedChunk - } - return parsedChunks, nil -} - -func MessagesToServiceEventList(m []*entities.ServiceEvent) ( - flow.ServiceEventList, - error, -) { - parsedServiceEvents := make(flow.ServiceEventList, len(m)) - for i, serviceEvent := range m { - parsedServiceEvent, err := MessageToServiceEvent(serviceEvent) - if err != nil { - return nil, fmt.Errorf("failed to parse service event at index %d from message: %w", i, err) - } - parsedServiceEvents[i] = *parsedServiceEvent - } - return parsedServiceEvents, nil -} - -func MessageToExecutionResult(m *entities.ExecutionResult) ( - *flow.ExecutionResult, - error, -) { - // convert Chunks - parsedChunks, err := MessagesToChunkList(m.Chunks) - if err != nil { - return nil, fmt.Errorf("failed to parse messages to ChunkList: %w", err) - } - // convert ServiceEvents - parsedServiceEvents, err := MessagesToServiceEventList(m.ServiceEvents) - if err != nil { - return nil, err - } - return &flow.ExecutionResult{ - PreviousResultID: MessageToIdentifier(m.PreviousResultId), - BlockID: MessageToIdentifier(m.BlockId), - Chunks: parsedChunks, - ServiceEvents: parsedServiceEvents, - ExecutionDataID: MessageToIdentifier(m.ExecutionDataId), - }, nil -} - -func ExecutionResultToMessage(er *flow.ExecutionResult) ( - *entities.ExecutionResult, - error, -) { - - chunks := make([]*entities.Chunk, len(er.Chunks)) - - for i, chunk := range er.Chunks { - chunks[i] = ChunkToMessage(chunk) - } - - serviceEvents := make([]*entities.ServiceEvent, len(er.ServiceEvents)) - var err error - for i, serviceEvent := range er.ServiceEvents { - serviceEvents[i], err = ServiceEventToMessage(serviceEvent) - if err != nil { - return nil, fmt.Errorf("error while convering service event %d: %w", i, err) - } - } - - return &entities.ExecutionResult{ - PreviousResultId: IdentifierToMessage(er.PreviousResultID), - BlockId: IdentifierToMessage(er.BlockID), - Chunks: chunks, - ServiceEvents: serviceEvents, - ExecutionDataId: IdentifierToMessage(er.ExecutionDataID), - }, nil -} - -func ServiceEventToMessage(event flow.ServiceEvent) (*entities.ServiceEvent, error) { - - bytes, err := json.Marshal(event.Event) - if err != nil { - return nil, fmt.Errorf("cannot marshal service event: %w", err) - } - - return &entities.ServiceEvent{ - Type: event.Type.String(), - Payload: bytes, - }, nil -} - -func MessageToServiceEvent(m *entities.ServiceEvent) (*flow.ServiceEvent, error) { - rawEvent := m.Payload - eventType := flow.ServiceEventType(m.Type) - se, err := flow.ServiceEventJSONMarshaller.UnmarshalWithType(rawEvent, eventType) - - return &se, err -} - -func ChunkToMessage(chunk *flow.Chunk) *entities.Chunk { - return &entities.Chunk{ - CollectionIndex: uint32(chunk.CollectionIndex), - StartState: StateCommitmentToMessage(chunk.StartState), - EventCollection: IdentifierToMessage(chunk.EventCollection), - BlockId: IdentifierToMessage(chunk.BlockID), - TotalComputationUsed: chunk.TotalComputationUsed, - NumberOfTransactions: uint32(chunk.NumberOfTransactions), - Index: chunk.Index, - EndState: StateCommitmentToMessage(chunk.EndState), - } -} - -func MessageToChunk(m *entities.Chunk) (*flow.Chunk, error) { - startState, err := flow.ToStateCommitment(m.StartState) - if err != nil { - return nil, fmt.Errorf("failed to parse Message start state to Chunk: %w", err) - } - endState, err := flow.ToStateCommitment(m.EndState) - if err != nil { - return nil, fmt.Errorf("failed to parse Message end state to Chunk: %w", err) - } - chunkBody := flow.ChunkBody{ - CollectionIndex: uint(m.CollectionIndex), - StartState: startState, - EventCollection: MessageToIdentifier(m.EventCollection), - BlockID: MessageToIdentifier(m.BlockId), - TotalComputationUsed: m.TotalComputationUsed, - NumberOfTransactions: uint64(m.NumberOfTransactions), - } - return &flow.Chunk{ - ChunkBody: chunkBody, - Index: m.Index, - EndState: endState, - }, nil -} - -func BlockExecutionDataToMessage(data *execution_data.BlockExecutionData) ( - *entities.BlockExecutionData, - error, -) { - chunkExecutionDatas := make([]*entities.ChunkExecutionData, len(data.ChunkExecutionDatas)) - for i, chunk := range data.ChunkExecutionDatas { - chunkMessage, err := ChunkExecutionDataToMessage(chunk) - if err != nil { - return nil, err - } - chunkExecutionDatas[i] = chunkMessage - } - return &entities.BlockExecutionData{ - BlockId: IdentifierToMessage(data.BlockID), - ChunkExecutionData: chunkExecutionDatas, - }, nil -} - -func ChunkExecutionDataToMessage(data *execution_data.ChunkExecutionData) ( - *entities.ChunkExecutionData, - error, -) { - collection := &entities.ExecutionDataCollection{} - if data.Collection != nil { - collection = &entities.ExecutionDataCollection{ - Transactions: TransactionsToMessages(data.Collection.Transactions), - } - } - - events := EventsToMessages(data.Events) - if len(events) == 0 { - events = nil - } - - var trieUpdate *entities.TrieUpdate - if data.TrieUpdate != nil { - paths := make([][]byte, len(data.TrieUpdate.Paths)) - for i, path := range data.TrieUpdate.Paths { - paths[i] = path[:] - } - - payloads := make([]*entities.Payload, len(data.TrieUpdate.Payloads)) - for i, payload := range data.TrieUpdate.Payloads { - key, err := payload.Key() - if err != nil { - return nil, err - } - keyParts := make([]*entities.KeyPart, len(key.KeyParts)) - for j, keyPart := range key.KeyParts { - keyParts[j] = &entities.KeyPart{ - Type: uint32(keyPart.Type), - Value: keyPart.Value, - } - } - payloads[i] = &entities.Payload{ - KeyPart: keyParts, - Value: payload.Value(), - } - } - - trieUpdate = &entities.TrieUpdate{ - RootHash: data.TrieUpdate.RootHash[:], - Paths: paths, - Payloads: payloads, - } - } - - return &entities.ChunkExecutionData{ - Collection: collection, - Events: events, - TrieUpdate: trieUpdate, - }, nil -} - -func MessageToBlockExecutionData( - m *entities.BlockExecutionData, - chain flow.Chain, -) (*execution_data.BlockExecutionData, error) { - if m == nil { - return nil, ErrEmptyMessage - } - chunks := make([]*execution_data.ChunkExecutionData, len(m.ChunkExecutionData)) - for i, chunk := range m.GetChunkExecutionData() { - convertedChunk, err := MessageToChunkExecutionData(chunk, chain) - if err != nil { - return nil, err - } - chunks[i] = convertedChunk - } - - return &execution_data.BlockExecutionData{ - BlockID: MessageToIdentifier(m.GetBlockId()), - ChunkExecutionDatas: chunks, - }, nil -} - -func MessageToChunkExecutionData( - m *entities.ChunkExecutionData, - chain flow.Chain, -) (*execution_data.ChunkExecutionData, error) { - collection, err := messageToTrustedCollection(m.GetCollection(), chain) - if err != nil { - return nil, err - } - - var trieUpdate *ledger.TrieUpdate - if m.GetTrieUpdate() != nil { - trieUpdate, err = MessageToTrieUpdate(m.GetTrieUpdate()) - if err != nil { - return nil, err - } - } - - events := MessagesToEvents(m.GetEvents()) - if len(events) == 0 { - events = nil - } - - return &execution_data.ChunkExecutionData{ - Collection: collection, - Events: events, - TrieUpdate: trieUpdate, - }, nil -} - -func messageToTrustedCollection( - m *entities.ExecutionDataCollection, - chain flow.Chain, -) (*flow.Collection, error) { - messages := m.GetTransactions() - transactions := make([]*flow.TransactionBody, len(messages)) - for i, message := range messages { - transaction, err := messageToTrustedTransaction(message, chain) - if err != nil { - return nil, fmt.Errorf("could not convert transaction %d: %w", i, err) - } - transactions[i] = &transaction - } - - if len(transactions) == 0 { - return nil, nil - } - - return &flow.Collection{Transactions: transactions}, nil -} - -// messageToTrustedTransaction converts a transaction message to a transaction body. -// This is useful when converting transactions from trusted state like BlockExecutionData which -// contain service transactions that do not conform to external transaction format. -func messageToTrustedTransaction( - m *entities.Transaction, - chain flow.Chain, -) (flow.TransactionBody, error) { - if m == nil { - return flow.TransactionBody{}, ErrEmptyMessage - } - - t := flow.NewTransactionBody() - - proposalKey := m.GetProposalKey() - if proposalKey != nil { - proposalAddress, err := insecureAddress(proposalKey.GetAddress(), chain) - if err != nil { - return *t, fmt.Errorf("could not convert proposer address: %w", err) - } - t.SetProposalKey(proposalAddress, uint64(proposalKey.GetKeyId()), proposalKey.GetSequenceNumber()) - } - - payer := m.GetPayer() - if payer != nil { - payerAddress, err := insecureAddress(payer, chain) - if err != nil { - return *t, fmt.Errorf("could not convert payer address: %w", err) - } - t.SetPayer(payerAddress) - } - - for _, authorizer := range m.GetAuthorizers() { - authorizerAddress, err := Address(authorizer, chain) - if err != nil { - return *t, fmt.Errorf("could not convert authorizer address: %w", err) - } - t.AddAuthorizer(authorizerAddress) - } - - for _, sig := range m.GetPayloadSignatures() { - addr, err := Address(sig.GetAddress(), chain) - if err != nil { - return *t, fmt.Errorf("could not convert payload signature address: %w", err) - } - t.AddPayloadSignature(addr, uint64(sig.GetKeyId()), sig.GetSignature()) - } - - for _, sig := range m.GetEnvelopeSignatures() { - addr, err := Address(sig.GetAddress(), chain) - if err != nil { - return *t, fmt.Errorf("could not convert envelope signature address: %w", err) - } - t.AddEnvelopeSignature(addr, uint64(sig.GetKeyId()), sig.GetSignature()) - } - - t.SetScript(m.GetScript()) - t.SetArguments(m.GetArguments()) - t.SetReferenceBlockID(flow.HashToID(m.GetReferenceBlockId())) - t.SetGasLimit(m.GetGasLimit()) - - return *t, nil -} - -func MessageToTrieUpdate(m *entities.TrieUpdate) (*ledger.TrieUpdate, error) { - rootHash, err := ledger.ToRootHash(m.GetRootHash()) - if err != nil { - return nil, fmt.Errorf("could not convert root hash: %w", err) - } - - paths := make([]ledger.Path, len(m.GetPaths())) - for i, path := range m.GetPaths() { - convertedPath, err := ledger.ToPath(path) - if err != nil { - return nil, fmt.Errorf("could not convert path %d: %w", i, err) - } - paths[i] = convertedPath - } - - payloads := make([]*ledger.Payload, len(m.Payloads)) - for i, payload := range m.GetPayloads() { - keyParts := make([]ledger.KeyPart, len(payload.GetKeyPart())) - for j, keypart := range payload.GetKeyPart() { - keyParts[j] = ledger.NewKeyPart(uint16(keypart.GetType()), keypart.GetValue()) - } - payloads[i] = ledger.NewPayload(ledger.NewKey(keyParts), payload.GetValue()) - } - - return &ledger.TrieUpdate{ - RootHash: rootHash, - Paths: paths, - Payloads: payloads, - }, nil +// StateCommitmentToMessage converts a flow.StateCommitment to a byte slice for inclusion in a protobuf message +func StateCommitmentToMessage(s flow.StateCommitment) []byte { + return s[:] } -// insecureAddress converts a raw address to a flow.Address, skipping validation -// This is useful when converting transactions from trusted state like BlockExecutionData. -// This should only be used for trusted inputs -func insecureAddress(rawAddress []byte, chain flow.Chain) (flow.Address, error) { - if len(rawAddress) == 0 { - return flow.EmptyAddress, status.Error(codes.InvalidArgument, "address cannot be empty") +// MessageToStateCommitment converts a byte slice from a protobuf message to a flow.StateCommitment +func MessageToStateCommitment(bytes []byte) (sc flow.StateCommitment, err error) { + if len(bytes) != len(sc) { + return sc, fmt.Errorf("invalid state commitment length. got %d expected %d", len(bytes), len(sc)) } - - return flow.BytesToAddress(rawAddress), nil + copy(sc[:], bytes) + return } diff --git a/engine/common/rpc/convert/convert_test.go b/engine/common/rpc/convert/convert_test.go deleted file mode 100644 index f85447c905f..00000000000 --- a/engine/common/rpc/convert/convert_test.go +++ /dev/null @@ -1,215 +0,0 @@ -package convert_test - -import ( - "bytes" - "math/rand" - "testing" - - "github.com/onflow/cadence" - "github.com/onflow/cadence/encoding/ccf" - jsoncdc "github.com/onflow/cadence/encoding/json" - execproto "github.com/onflow/flow/protobuf/go/flow/execution" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/onflow/flow-go/engine/common/rpc/convert" - "github.com/onflow/flow-go/fvm" - "github.com/onflow/flow-go/ledger" - "github.com/onflow/flow-go/ledger/common/testutils" - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/executiondatasync/execution_data" - "github.com/onflow/flow-go/utils/unittest" -) - -func TestConvertTransaction(t *testing.T) { - tx := unittest.TransactionBodyFixture() - - msg := convert.TransactionToMessage(tx) - converted, err := convert.MessageToTransaction(msg, flow.Testnet.Chain()) - assert.Nil(t, err) - - assert.Equal(t, tx, converted) - assert.Equal(t, tx.ID(), converted.ID()) -} - -func TestConvertAccountKey(t *testing.T) { - privateKey, _ := unittest.AccountKeyDefaultFixture() - accountKey := privateKey.PublicKey(fvm.AccountKeyWeightThreshold) - - // Explicitly test if Revoked is properly converted - accountKey.Revoked = true - - msg, err := convert.AccountKeyToMessage(accountKey) - assert.Nil(t, err) - - converted, err := convert.MessageToAccountKey(msg) - assert.Nil(t, err) - - assert.Equal(t, accountKey, *converted) - assert.Equal(t, accountKey.PublicKey, converted.PublicKey) - assert.Equal(t, accountKey.Revoked, converted.Revoked) -} - -func TestConvertEvents(t *testing.T) { - t.Run("empty", func(t *testing.T) { - messages := convert.EventsToMessages(nil) - assert.Len(t, messages, 0) - }) - - t.Run("simple", func(t *testing.T) { - - txID := unittest.IdentifierFixture() - event := unittest.EventFixture(flow.EventAccountCreated, 2, 3, txID, 0) - - messages := convert.EventsToMessages([]flow.Event{event}) - - require.Len(t, messages, 1) - - message := messages[0] - - require.Equal(t, event.EventIndex, message.EventIndex) - require.Equal(t, event.TransactionIndex, message.TransactionIndex) - require.Equal(t, event.Payload, message.Payload) - require.Equal(t, event.TransactionID[:], message.TransactionId) - require.Equal(t, string(event.Type), message.Type) - }) - - t.Run("convert event from ccf format", func(t *testing.T) { - cadenceValue, err := cadence.NewValue(2) - require.NoError(t, err) - ccfPayload, err := ccf.Encode(cadenceValue) - require.NoError(t, err) - jsonPayload, err := jsoncdc.Encode(cadenceValue) - require.NoError(t, err) - txID := unittest.IdentifierFixture() - ccfEvent := unittest.EventFixture( - flow.EventAccountCreated, 2, 3, txID, 0) - ccfEvent.Payload = ccfPayload - jsonEvent := unittest.EventFixture( - flow.EventAccountCreated, 2, 3, txID, 0) - jsonEvent.Payload = jsonPayload - message := convert.EventToMessage(ccfEvent) - convertedEvent, err := convert.MessageToEventFromVersion(message, execproto.EventEncodingVersion_CCF_V0) - assert.NoError(t, err) - assert.Equal(t, jsonEvent, *convertedEvent) - }) - - t.Run("convert event from json cdc format", func(t *testing.T) { - cadenceValue, err := cadence.NewValue(2) - require.NoError(t, err) - txID := unittest.IdentifierFixture() - jsonEvent := unittest.EventFixture( - flow.EventAccountCreated, 2, 3, txID, 0) - jsonPayload, err := jsoncdc.Encode(cadenceValue) - require.NoError(t, err) - jsonEvent.Payload = jsonPayload - message := convert.EventToMessage(jsonEvent) - convertedEvent, err := convert.MessageToEventFromVersion(message, execproto.EventEncodingVersion_JSON_CDC_V0) - assert.NoError(t, err) - assert.Equal(t, jsonEvent, *convertedEvent) - }) - - t.Run("convert payload from ccf to jsoncdc", func(t *testing.T) { - // Round trip conversion check - cadenceValue, err := cadence.NewValue(2) - require.NoError(t, err) - ccfPayload, err := ccf.Encode(cadenceValue) - require.NoError(t, err) - txID := unittest.IdentifierFixture() - ccfEvent := unittest.EventFixture( - flow.EventAccountCreated, 2, 3, txID, 0) - ccfEvent.Payload = ccfPayload - - jsonEvent := unittest.EventFixture( - flow.EventAccountCreated, 2, 3, txID, 0) - jsonPayload, err := jsoncdc.Encode(cadenceValue) - require.NoError(t, err) - jsonEvent.Payload = jsonPayload - - res, err := convert.CcfEventToJsonEvent(ccfEvent) - require.NoError(t, err) - require.Equal(t, jsonEvent, *res) - }) -} - -// TestConvertBlockExecutionData checks if conversions between BlockExecutionData and it's fields are consistent. -func TestConvertBlockExecutionData(t *testing.T) { - // Initialize the BlockExecutionData object - numChunks := 5 - ced := make([]*execution_data.ChunkExecutionData, numChunks) - bed := &execution_data.BlockExecutionData{ - BlockID: unittest.IdentifierFixture(), - ChunkExecutionDatas: ced, - } - - // Fill the chunk execution datas with trie updates, collections, and events - minSerializedSize := uint64(10 * execution_data.DefaultMaxBlobSize) - for i := 0; i < numChunks; i++ { - // the service chunk sometimes does not have any trie updates - if i == numChunks-1 { - tx1 := unittest.TransactionBodyFixture() - // proposal key and payer are empty addresses for service tx - tx1.ProposalKey.Address = flow.EmptyAddress - tx1.Payer = flow.EmptyAddress - bed.ChunkExecutionDatas[i] = &execution_data.ChunkExecutionData{ - Collection: &flow.Collection{Transactions: []*flow.TransactionBody{&tx1}}, - } - continue - } - - // Initialize collection - tx1 := unittest.TransactionBodyFixture() - tx2 := unittest.TransactionBodyFixture() - col := &flow.Collection{Transactions: []*flow.TransactionBody{&tx1, &tx2}} - - // Initialize events - header := unittest.BlockHeaderFixture() - events := unittest.BlockEventsFixture(header, 5).Events - - chunk := &execution_data.ChunkExecutionData{ - Collection: col, - Events: events, - TrieUpdate: testutils.TrieUpdateFixture(1, 1, 8), - } - size := 1 - - // Fill the TrieUpdate with data - inner: - for { - buf := &bytes.Buffer{} - require.NoError(t, execution_data.DefaultSerializer.Serialize(buf, chunk)) - - if buf.Len() >= int(minSerializedSize) { - break inner - } - - v := make([]byte, size) - _, _ = rand.Read(v) - - k, err := chunk.TrieUpdate.Payloads[0].Key() - require.NoError(t, err) - - chunk.TrieUpdate.Payloads[0] = ledger.NewPayload(k, v) - size *= 2 - } - bed.ChunkExecutionDatas[i] = chunk - } - - t.Run("chunk execution data conversions", func(t *testing.T) { - chunkMsg, err := convert.ChunkExecutionDataToMessage(bed.ChunkExecutionDatas[0]) - assert.Nil(t, err) - - chunkReConverted, err := convert.MessageToChunkExecutionData(chunkMsg, flow.Testnet.Chain()) - assert.Nil(t, err) - assert.Equal(t, bed.ChunkExecutionDatas[0], chunkReConverted) - }) - - t.Run("block execution data conversions", func(t *testing.T) { - blockMsg, err := convert.BlockExecutionDataToMessage(bed) - assert.Nil(t, err) - - bedReConverted, err := convert.MessageToBlockExecutionData(blockMsg, flow.Testnet.Chain()) - assert.Nil(t, err) - assert.Equal(t, bed, bedReConverted) - }) -} diff --git a/engine/common/rpc/convert/events.go b/engine/common/rpc/convert/events.go new file mode 100644 index 00000000000..0d0b20179e7 --- /dev/null +++ b/engine/common/rpc/convert/events.go @@ -0,0 +1,165 @@ +package convert + +import ( + "encoding/json" + "fmt" + + "github.com/onflow/cadence/encoding/ccf" + jsoncdc "github.com/onflow/cadence/encoding/json" + "github.com/onflow/flow/protobuf/go/flow/entities" + execproto "github.com/onflow/flow/protobuf/go/flow/execution" + + "github.com/onflow/flow-go/model/flow" +) + +// EventToMessage converts a flow.Event to a protobuf message +// Note: this function does not convert the payload encoding +func EventToMessage(e flow.Event) *entities.Event { + return &entities.Event{ + Type: string(e.Type), + TransactionId: e.TransactionID[:], + TransactionIndex: e.TransactionIndex, + EventIndex: e.EventIndex, + Payload: e.Payload, + } +} + +// MessageToEvent converts a protobuf message to a flow.Event +// Note: this function does not convert the payload encoding +func MessageToEvent(m *entities.Event) flow.Event { + return flow.Event{ + Type: flow.EventType(m.GetType()), + TransactionID: flow.HashToID(m.GetTransactionId()), + TransactionIndex: m.GetTransactionIndex(), + EventIndex: m.GetEventIndex(), + Payload: m.GetPayload(), + } +} + +// EventsToMessages converts a slice of flow.Events to a slice of protobuf messages +// Note: this function does not convert the payload encoding +func EventsToMessages(flowEvents []flow.Event) []*entities.Event { + events := make([]*entities.Event, len(flowEvents)) + for i, e := range flowEvents { + event := EventToMessage(e) + events[i] = event + } + return events +} + +// MessagesToEvents converts a slice of protobuf messages to a slice of flow.Events +// Note: this function does not convert the payload encoding +func MessagesToEvents(l []*entities.Event) []flow.Event { + events := make([]flow.Event, len(l)) + + for i, m := range l { + events[i] = MessageToEvent(m) + } + + return events +} + +// MessageToEventFromVersion converts a protobuf message to a flow.Event, and converts the payload +// encoding from CCF to JSON if the input version is CCF +func MessageToEventFromVersion(m *entities.Event, inputVersion execproto.EventEncodingVersion) (*flow.Event, error) { + switch inputVersion { + case execproto.EventEncodingVersion_CCF_V0: + convertedPayload, err := CcfPayloadToJsonPayload(m.Payload) + if err != nil { + return nil, fmt.Errorf("could not convert event payload from CCF to Json: %w", err) + } + return &flow.Event{ + Type: flow.EventType(m.GetType()), + TransactionID: flow.HashToID(m.GetTransactionId()), + TransactionIndex: m.GetTransactionIndex(), + EventIndex: m.GetEventIndex(), + Payload: convertedPayload, + }, nil + case execproto.EventEncodingVersion_JSON_CDC_V0: + je := MessageToEvent(m) + return &je, nil + default: + return nil, fmt.Errorf("invalid encoding format %d", inputVersion) + } +} + +// MessagesToEventsFromVersion converts a slice of protobuf messages to a slice of flow.Events, converting +// the payload encoding from CCF to JSON if the input version is CCF +func MessagesToEventsFromVersion(l []*entities.Event, version execproto.EventEncodingVersion) ([]flow.Event, error) { + events := make([]flow.Event, len(l)) + for i, m := range l { + event, err := MessageToEventFromVersion(m, version) + if err != nil { + return nil, fmt.Errorf("could not convert event at index %d from format %d: %w", + m.EventIndex, version, err) + } + events[i] = *event + } + return events, nil +} + +// ServiceEventToMessage converts a flow.ServiceEvent to a protobuf message +func ServiceEventToMessage(event flow.ServiceEvent) (*entities.ServiceEvent, error) { + bytes, err := json.Marshal(event.Event) + if err != nil { + return nil, fmt.Errorf("cannot marshal service event: %w", err) + } + + return &entities.ServiceEvent{ + Type: event.Type.String(), + Payload: bytes, + }, nil +} + +// MessageToServiceEvent converts a protobuf message to a flow.ServiceEvent +func MessageToServiceEvent(m *entities.ServiceEvent) (*flow.ServiceEvent, error) { + rawEvent := m.Payload + eventType := flow.ServiceEventType(m.Type) + se, err := flow.ServiceEventJSONMarshaller.UnmarshalWithType(rawEvent, eventType) + + return &se, err +} + +// ServiceEventsToMessages converts a slice of flow.ServiceEvents to a slice of protobuf messages +func MessagesToServiceEventList(m []*entities.ServiceEvent) ( + flow.ServiceEventList, + error, +) { + parsedServiceEvents := make(flow.ServiceEventList, len(m)) + for i, serviceEvent := range m { + parsedServiceEvent, err := MessageToServiceEvent(serviceEvent) + if err != nil { + return nil, fmt.Errorf("failed to parse service event at index %d from message: %w", i, err) + } + parsedServiceEvents[i] = *parsedServiceEvent + } + return parsedServiceEvents, nil +} + +// CcfPayloadToJsonPayload converts a CCF-encoded payload to a JSON-encoded payload +func CcfPayloadToJsonPayload(p []byte) ([]byte, error) { + val, err := ccf.Decode(nil, p) + if err != nil { + return nil, fmt.Errorf("unable to decode from ccf format: %w", err) + } + res, err := jsoncdc.Encode(val) + if err != nil { + return nil, fmt.Errorf("unable to encode to json-cdc format: %w", err) + } + return res, nil +} + +// CcfEventToJsonEvent returns a new event with the payload converted from CCF to JSON +func CcfEventToJsonEvent(e flow.Event) (*flow.Event, error) { + convertedPayload, err := CcfPayloadToJsonPayload(e.Payload) + if err != nil { + return nil, err + } + return &flow.Event{ + Type: e.Type, + TransactionID: e.TransactionID, + TransactionIndex: e.TransactionIndex, + EventIndex: e.EventIndex, + Payload: convertedPayload, + }, nil +} diff --git a/engine/common/rpc/convert/events_test.go b/engine/common/rpc/convert/events_test.go new file mode 100644 index 00000000000..f04ee6a3e5b --- /dev/null +++ b/engine/common/rpc/convert/events_test.go @@ -0,0 +1,98 @@ +package convert_test + +import ( + "testing" + + "github.com/onflow/cadence" + "github.com/onflow/cadence/encoding/ccf" + jsoncdc "github.com/onflow/cadence/encoding/json" + execproto "github.com/onflow/flow/protobuf/go/flow/execution" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestConvertEvents(t *testing.T) { + t.Run("empty", func(t *testing.T) { + messages := convert.EventsToMessages(nil) + assert.Len(t, messages, 0) + }) + + t.Run("simple", func(t *testing.T) { + + txID := unittest.IdentifierFixture() + event := unittest.EventFixture(flow.EventAccountCreated, 2, 3, txID, 0) + + messages := convert.EventsToMessages([]flow.Event{event}) + + require.Len(t, messages, 1) + + message := messages[0] + + require.Equal(t, event.EventIndex, message.EventIndex) + require.Equal(t, event.TransactionIndex, message.TransactionIndex) + require.Equal(t, event.Payload, message.Payload) + require.Equal(t, event.TransactionID[:], message.TransactionId) + require.Equal(t, string(event.Type), message.Type) + }) + + t.Run("convert event from ccf format", func(t *testing.T) { + cadenceValue, err := cadence.NewValue(2) + require.NoError(t, err) + ccfPayload, err := ccf.Encode(cadenceValue) + require.NoError(t, err) + jsonPayload, err := jsoncdc.Encode(cadenceValue) + require.NoError(t, err) + txID := unittest.IdentifierFixture() + ccfEvent := unittest.EventFixture( + flow.EventAccountCreated, 2, 3, txID, 0) + ccfEvent.Payload = ccfPayload + jsonEvent := unittest.EventFixture( + flow.EventAccountCreated, 2, 3, txID, 0) + jsonEvent.Payload = jsonPayload + message := convert.EventToMessage(ccfEvent) + convertedEvent, err := convert.MessageToEventFromVersion(message, execproto.EventEncodingVersion_CCF_V0) + assert.NoError(t, err) + assert.Equal(t, jsonEvent, *convertedEvent) + }) + + t.Run("convert event from json cdc format", func(t *testing.T) { + cadenceValue, err := cadence.NewValue(2) + require.NoError(t, err) + txID := unittest.IdentifierFixture() + jsonEvent := unittest.EventFixture( + flow.EventAccountCreated, 2, 3, txID, 0) + jsonPayload, err := jsoncdc.Encode(cadenceValue) + require.NoError(t, err) + jsonEvent.Payload = jsonPayload + message := convert.EventToMessage(jsonEvent) + convertedEvent, err := convert.MessageToEventFromVersion(message, execproto.EventEncodingVersion_JSON_CDC_V0) + assert.NoError(t, err) + assert.Equal(t, jsonEvent, *convertedEvent) + }) + + t.Run("convert payload from ccf to jsoncdc", func(t *testing.T) { + // Round trip conversion check + cadenceValue, err := cadence.NewValue(2) + require.NoError(t, err) + ccfPayload, err := ccf.Encode(cadenceValue) + require.NoError(t, err) + txID := unittest.IdentifierFixture() + ccfEvent := unittest.EventFixture( + flow.EventAccountCreated, 2, 3, txID, 0) + ccfEvent.Payload = ccfPayload + + jsonEvent := unittest.EventFixture( + flow.EventAccountCreated, 2, 3, txID, 0) + jsonPayload, err := jsoncdc.Encode(cadenceValue) + require.NoError(t, err) + jsonEvent.Payload = jsonPayload + + res, err := convert.CcfEventToJsonEvent(ccfEvent) + require.NoError(t, err) + require.Equal(t, jsonEvent, *res) + }) +} diff --git a/engine/common/rpc/convert/execution_data.go b/engine/common/rpc/convert/execution_data.go new file mode 100644 index 00000000000..1e9c6031b2c --- /dev/null +++ b/engine/common/rpc/convert/execution_data.go @@ -0,0 +1,281 @@ +package convert + +import ( + "fmt" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/onflow/flow/protobuf/go/flow/entities" + + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/executiondatasync/execution_data" +) + +// BlockExecutionDataToMessage converts a BlockExecutionData to a protobuf message +func BlockExecutionDataToMessage(data *execution_data.BlockExecutionData) ( + *entities.BlockExecutionData, + error, +) { + chunkExecutionDatas := make([]*entities.ChunkExecutionData, len(data.ChunkExecutionDatas)) + for i, chunk := range data.ChunkExecutionDatas { + chunkMessage, err := ChunkExecutionDataToMessage(chunk) + if err != nil { + return nil, err + } + chunkExecutionDatas[i] = chunkMessage + } + return &entities.BlockExecutionData{ + BlockId: IdentifierToMessage(data.BlockID), + ChunkExecutionData: chunkExecutionDatas, + }, nil +} + +// MessageToBlockExecutionData converts a protobuf message to a BlockExecutionData +func MessageToBlockExecutionData( + m *entities.BlockExecutionData, + chain flow.Chain, +) (*execution_data.BlockExecutionData, error) { + if m == nil { + return nil, ErrEmptyMessage + } + chunks := make([]*execution_data.ChunkExecutionData, len(m.ChunkExecutionData)) + for i, chunk := range m.GetChunkExecutionData() { + convertedChunk, err := MessageToChunkExecutionData(chunk, chain) + if err != nil { + return nil, err + } + chunks[i] = convertedChunk + } + + return &execution_data.BlockExecutionData{ + BlockID: MessageToIdentifier(m.GetBlockId()), + ChunkExecutionDatas: chunks, + }, nil +} + +// ChunkExecutionDataToMessage converts a ChunkExecutionData to a protobuf message +func ChunkExecutionDataToMessage(data *execution_data.ChunkExecutionData) ( + *entities.ChunkExecutionData, + error, +) { + collection := &entities.ExecutionDataCollection{} + if data.Collection != nil { + collection = &entities.ExecutionDataCollection{ + Transactions: TransactionsToMessages(data.Collection.Transactions), + } + } + + events := EventsToMessages(data.Events) + if len(events) == 0 { + events = nil + } + + trieUpdate, err := TrieUpdateToMessage(data.TrieUpdate) + if err != nil { + return nil, err + } + + return &entities.ChunkExecutionData{ + Collection: collection, + Events: events, + TrieUpdate: trieUpdate, + }, nil +} + +// MessageToChunkExecutionData converts a protobuf message to a ChunkExecutionData +func MessageToChunkExecutionData( + m *entities.ChunkExecutionData, + chain flow.Chain, +) (*execution_data.ChunkExecutionData, error) { + collection, err := messageToTrustedCollection(m.GetCollection(), chain) + if err != nil { + return nil, err + } + + var trieUpdate *ledger.TrieUpdate + if m.GetTrieUpdate() != nil { + trieUpdate, err = MessageToTrieUpdate(m.GetTrieUpdate()) + if err != nil { + return nil, err + } + } + + events := MessagesToEvents(m.GetEvents()) + if len(events) == 0 { + events = nil + } + + return &execution_data.ChunkExecutionData{ + Collection: collection, + Events: events, + TrieUpdate: trieUpdate, + }, nil +} + +// MessageToTrieUpdate converts a protobuf message to a TrieUpdate +func MessageToTrieUpdate(m *entities.TrieUpdate) (*ledger.TrieUpdate, error) { + rootHash, err := ledger.ToRootHash(m.GetRootHash()) + if err != nil { + return nil, fmt.Errorf("could not convert root hash: %w", err) + } + + paths := make([]ledger.Path, len(m.GetPaths())) + for i, path := range m.GetPaths() { + convertedPath, err := ledger.ToPath(path) + if err != nil { + return nil, fmt.Errorf("could not convert path %d: %w", i, err) + } + paths[i] = convertedPath + } + + payloads := make([]*ledger.Payload, len(m.Payloads)) + for i, payload := range m.GetPayloads() { + keyParts := make([]ledger.KeyPart, len(payload.GetKeyPart())) + for j, keypart := range payload.GetKeyPart() { + keyParts[j] = ledger.NewKeyPart(uint16(keypart.GetType()), keypart.GetValue()) + } + payloads[i] = ledger.NewPayload(ledger.NewKey(keyParts), payload.GetValue()) + } + + return &ledger.TrieUpdate{ + RootHash: rootHash, + Paths: paths, + Payloads: payloads, + }, nil +} + +// TrieUpdateToMessage converts a TrieUpdate to a protobuf message +func TrieUpdateToMessage(t *ledger.TrieUpdate) (*entities.TrieUpdate, error) { + if t == nil { + return nil, nil + } + + paths := make([][]byte, len(t.Paths)) + for i, path := range t.Paths { + paths[i] = path[:] + } + + payloads := make([]*entities.Payload, len(t.Payloads)) + for i, payload := range t.Payloads { + key, err := payload.Key() + if err != nil { + return nil, fmt.Errorf("could not convert payload %d: %w", i, err) + } + keyParts := make([]*entities.KeyPart, len(key.KeyParts)) + for j, keyPart := range key.KeyParts { + keyParts[j] = &entities.KeyPart{ + Type: uint32(keyPart.Type), + Value: keyPart.Value, + } + } + payloads[i] = &entities.Payload{ + KeyPart: keyParts, + Value: payload.Value(), + } + } + + return &entities.TrieUpdate{ + RootHash: t.RootHash[:], + Paths: paths, + Payloads: payloads, + }, nil +} + +// messageToTrustedCollection converts a protobuf message to a collection using the +// messageToTrustedTransaction converter to support service transactions. +func messageToTrustedCollection( + m *entities.ExecutionDataCollection, + chain flow.Chain, +) (*flow.Collection, error) { + messages := m.GetTransactions() + transactions := make([]*flow.TransactionBody, len(messages)) + for i, message := range messages { + transaction, err := messageToTrustedTransaction(message, chain) + if err != nil { + return nil, fmt.Errorf("could not convert transaction %d: %w", i, err) + } + transactions[i] = &transaction + } + + if len(transactions) == 0 { + return nil, nil + } + + return &flow.Collection{Transactions: transactions}, nil +} + +// messageToTrustedTransaction converts a transaction message to a transaction body. +// This is useful when converting transactions from trusted state like BlockExecutionData which +// contain service transactions that do not conform to external transaction format. +func messageToTrustedTransaction( + m *entities.Transaction, + chain flow.Chain, +) (flow.TransactionBody, error) { + if m == nil { + return flow.TransactionBody{}, ErrEmptyMessage + } + + t := flow.NewTransactionBody() + + proposalKey := m.GetProposalKey() + if proposalKey != nil { + proposalAddress, err := insecureAddress(proposalKey.GetAddress(), chain) + if err != nil { + return *t, fmt.Errorf("could not convert proposer address: %w", err) + } + t.SetProposalKey(proposalAddress, uint64(proposalKey.GetKeyId()), proposalKey.GetSequenceNumber()) + } + + payer := m.GetPayer() + if payer != nil { + payerAddress, err := insecureAddress(payer, chain) + if err != nil { + return *t, fmt.Errorf("could not convert payer address: %w", err) + } + t.SetPayer(payerAddress) + } + + for _, authorizer := range m.GetAuthorizers() { + authorizerAddress, err := Address(authorizer, chain) + if err != nil { + return *t, fmt.Errorf("could not convert authorizer address: %w", err) + } + t.AddAuthorizer(authorizerAddress) + } + + for _, sig := range m.GetPayloadSignatures() { + addr, err := Address(sig.GetAddress(), chain) + if err != nil { + return *t, fmt.Errorf("could not convert payload signature address: %w", err) + } + t.AddPayloadSignature(addr, uint64(sig.GetKeyId()), sig.GetSignature()) + } + + for _, sig := range m.GetEnvelopeSignatures() { + addr, err := Address(sig.GetAddress(), chain) + if err != nil { + return *t, fmt.Errorf("could not convert envelope signature address: %w", err) + } + t.AddEnvelopeSignature(addr, uint64(sig.GetKeyId()), sig.GetSignature()) + } + + t.SetScript(m.GetScript()) + t.SetArguments(m.GetArguments()) + t.SetReferenceBlockID(flow.HashToID(m.GetReferenceBlockId())) + t.SetGasLimit(m.GetGasLimit()) + + return *t, nil +} + +// insecureAddress converts a raw address to a flow.Address, skipping validation +// This is useful when converting transactions from trusted state like BlockExecutionData. +// This should only be used for trusted inputs +func insecureAddress(rawAddress []byte, chain flow.Chain) (flow.Address, error) { + if len(rawAddress) == 0 { + return flow.EmptyAddress, status.Error(codes.InvalidArgument, "address cannot be empty") + } + + return flow.BytesToAddress(rawAddress), nil +} diff --git a/engine/common/rpc/convert/execution_data_test.go b/engine/common/rpc/convert/execution_data_test.go new file mode 100644 index 00000000000..a0060d56c43 --- /dev/null +++ b/engine/common/rpc/convert/execution_data_test.go @@ -0,0 +1,99 @@ +package convert_test + +import ( + "bytes" + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/testutils" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/executiondatasync/execution_data" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestConvertBlockExecutionData checks if conversions between BlockExecutionData and it's fields are consistent. +func TestConvertBlockExecutionData(t *testing.T) { + // Initialize the BlockExecutionData object + numChunks := 5 + ced := make([]*execution_data.ChunkExecutionData, numChunks) + bed := &execution_data.BlockExecutionData{ + BlockID: unittest.IdentifierFixture(), + ChunkExecutionDatas: ced, + } + + // Fill the chunk execution datas with trie updates, collections, and events + minSerializedSize := uint64(10 * execution_data.DefaultMaxBlobSize) + for i := 0; i < numChunks; i++ { + // the service chunk sometimes does not have any trie updates + if i == numChunks-1 { + tx1 := unittest.TransactionBodyFixture() + // proposal key and payer are empty addresses for service tx + tx1.ProposalKey.Address = flow.EmptyAddress + tx1.Payer = flow.EmptyAddress + bed.ChunkExecutionDatas[i] = &execution_data.ChunkExecutionData{ + Collection: &flow.Collection{Transactions: []*flow.TransactionBody{&tx1}}, + } + continue + } + + // Initialize collection + tx1 := unittest.TransactionBodyFixture() + tx2 := unittest.TransactionBodyFixture() + col := &flow.Collection{Transactions: []*flow.TransactionBody{&tx1, &tx2}} + + // Initialize events + header := unittest.BlockHeaderFixture() + events := unittest.BlockEventsFixture(header, 5).Events + + chunk := &execution_data.ChunkExecutionData{ + Collection: col, + Events: events, + TrieUpdate: testutils.TrieUpdateFixture(1, 1, 8), + } + size := 1 + + // Fill the TrieUpdate with data + inner: + for { + buf := &bytes.Buffer{} + require.NoError(t, execution_data.DefaultSerializer.Serialize(buf, chunk)) + + if buf.Len() >= int(minSerializedSize) { + break inner + } + + v := make([]byte, size) + _, _ = rand.Read(v) + + k, err := chunk.TrieUpdate.Payloads[0].Key() + require.NoError(t, err) + + chunk.TrieUpdate.Payloads[0] = ledger.NewPayload(k, v) + size *= 2 + } + bed.ChunkExecutionDatas[i] = chunk + } + + t.Run("chunk execution data conversions", func(t *testing.T) { + chunkMsg, err := convert.ChunkExecutionDataToMessage(bed.ChunkExecutionDatas[0]) + assert.Nil(t, err) + + chunkReConverted, err := convert.MessageToChunkExecutionData(chunkMsg, flow.Testnet.Chain()) + assert.Nil(t, err) + assert.Equal(t, bed.ChunkExecutionDatas[0], chunkReConverted) + }) + + t.Run("block execution data conversions", func(t *testing.T) { + blockMsg, err := convert.BlockExecutionDataToMessage(bed) + assert.Nil(t, err) + + bedReConverted, err := convert.MessageToBlockExecutionData(blockMsg, flow.Testnet.Chain()) + assert.Nil(t, err) + assert.Equal(t, bed, bedReConverted) + }) +} diff --git a/engine/common/rpc/convert/execution_results.go b/engine/common/rpc/convert/execution_results.go new file mode 100644 index 00000000000..bbe7541edeb --- /dev/null +++ b/engine/common/rpc/convert/execution_results.go @@ -0,0 +1,174 @@ +package convert + +import ( + "fmt" + + "github.com/onflow/flow/protobuf/go/flow/entities" + + "github.com/onflow/flow-go/model/flow" +) + +// ExecutionResultToMessage converts an execution result to a protobuf message +func ExecutionResultToMessage(er *flow.ExecutionResult) ( + *entities.ExecutionResult, + error, +) { + chunks := make([]*entities.Chunk, len(er.Chunks)) + + for i, chunk := range er.Chunks { + chunks[i] = ChunkToMessage(chunk) + } + + serviceEvents := make([]*entities.ServiceEvent, len(er.ServiceEvents)) + var err error + for i, serviceEvent := range er.ServiceEvents { + serviceEvents[i], err = ServiceEventToMessage(serviceEvent) + if err != nil { + return nil, fmt.Errorf("error while convering service event %d: %w", i, err) + } + } + + return &entities.ExecutionResult{ + PreviousResultId: IdentifierToMessage(er.PreviousResultID), + BlockId: IdentifierToMessage(er.BlockID), + Chunks: chunks, + ServiceEvents: serviceEvents, + ExecutionDataId: IdentifierToMessage(er.ExecutionDataID), + }, nil +} + +// MessageToExecutionResult converts a protobuf message to an execution result +func MessageToExecutionResult(m *entities.ExecutionResult) ( + *flow.ExecutionResult, + error, +) { + // convert Chunks + parsedChunks, err := MessagesToChunkList(m.Chunks) + if err != nil { + return nil, fmt.Errorf("failed to parse messages to ChunkList: %w", err) + } + // convert ServiceEvents + parsedServiceEvents, err := MessagesToServiceEventList(m.ServiceEvents) + if err != nil { + return nil, err + } + return &flow.ExecutionResult{ + PreviousResultID: MessageToIdentifier(m.PreviousResultId), + BlockID: MessageToIdentifier(m.BlockId), + Chunks: parsedChunks, + ServiceEvents: parsedServiceEvents, + ExecutionDataID: MessageToIdentifier(m.ExecutionDataId), + }, nil +} + +// ExecutionResultsToMessages converts a slice of execution results to a slice of protobuf messages +func ExecutionResultsToMessages(e []*flow.ExecutionResult) ( + []*entities.ExecutionResult, + error, +) { + execResults := make([]*entities.ExecutionResult, len(e)) + for i, execRes := range e { + parsedExecResult, err := ExecutionResultToMessage(execRes) + if err != nil { + return nil, err + } + execResults[i] = parsedExecResult + } + return execResults, nil +} + +// MessagesToExecutionResults converts a slice of protobuf messages to a slice of execution results +func MessagesToExecutionResults(m []*entities.ExecutionResult) ( + []*flow.ExecutionResult, + error, +) { + execResults := make([]*flow.ExecutionResult, len(m)) + for i, e := range m { + parsedExecResult, err := MessageToExecutionResult(e) + if err != nil { + return nil, fmt.Errorf("failed to convert message at index %d to execution result: %w", i, err) + } + execResults[i] = parsedExecResult + } + return execResults, nil +} + +// ExecutionResultMetaListToMessages converts an execution result meta list to a slice of protobuf messages +func ExecutionResultMetaListToMessages(e flow.ExecutionReceiptMetaList) []*entities.ExecutionReceiptMeta { + messageList := make([]*entities.ExecutionReceiptMeta, len(e)) + for i, execMeta := range e { + messageList[i] = &entities.ExecutionReceiptMeta{ + ExecutorId: IdentifierToMessage(execMeta.ExecutorID), + ResultId: IdentifierToMessage(execMeta.ResultID), + Spocks: SignaturesToMessages(execMeta.Spocks), + ExecutorSignature: MessageToSignature(execMeta.ExecutorSignature), + } + } + return messageList +} + +// MessagesToExecutionResultMetaList converts a slice of protobuf messages to an execution result meta list +func MessagesToExecutionResultMetaList(m []*entities.ExecutionReceiptMeta) flow.ExecutionReceiptMetaList { + execMetaList := make([]*flow.ExecutionReceiptMeta, len(m)) + for i, message := range m { + execMetaList[i] = &flow.ExecutionReceiptMeta{ + ExecutorID: MessageToIdentifier(message.ExecutorId), + ResultID: MessageToIdentifier(message.ResultId), + Spocks: MessagesToSignatures(message.Spocks), + ExecutorSignature: MessageToSignature(message.ExecutorSignature), + } + } + return execMetaList[:] +} + +// ChunkToMessage converts a chunk to a protobuf message +func ChunkToMessage(chunk *flow.Chunk) *entities.Chunk { + return &entities.Chunk{ + CollectionIndex: uint32(chunk.CollectionIndex), + StartState: StateCommitmentToMessage(chunk.StartState), + EventCollection: IdentifierToMessage(chunk.EventCollection), + BlockId: IdentifierToMessage(chunk.BlockID), + TotalComputationUsed: chunk.TotalComputationUsed, + NumberOfTransactions: uint32(chunk.NumberOfTransactions), + Index: chunk.Index, + EndState: StateCommitmentToMessage(chunk.EndState), + } +} + +// MessageToChunk converts a protobuf message to a chunk +func MessageToChunk(m *entities.Chunk) (*flow.Chunk, error) { + startState, err := flow.ToStateCommitment(m.StartState) + if err != nil { + return nil, fmt.Errorf("failed to parse Message start state to Chunk: %w", err) + } + endState, err := flow.ToStateCommitment(m.EndState) + if err != nil { + return nil, fmt.Errorf("failed to parse Message end state to Chunk: %w", err) + } + chunkBody := flow.ChunkBody{ + CollectionIndex: uint(m.CollectionIndex), + StartState: startState, + EventCollection: MessageToIdentifier(m.EventCollection), + BlockID: MessageToIdentifier(m.BlockId), + TotalComputationUsed: m.TotalComputationUsed, + NumberOfTransactions: uint64(m.NumberOfTransactions), + } + return &flow.Chunk{ + ChunkBody: chunkBody, + Index: m.Index, + EndState: endState, + }, nil +} + +// MessagesToChunkList converts a slice of protobuf messages to a chunk list +func MessagesToChunkList(m []*entities.Chunk) (flow.ChunkList, error) { + parsedChunks := make(flow.ChunkList, len(m)) + for i, chunk := range m { + parsedChunk, err := MessageToChunk(chunk) + if err != nil { + return nil, fmt.Errorf("failed to parse message at index %d to chunk: %w", i, err) + } + parsedChunks[i] = parsedChunk + } + return parsedChunks, nil +} diff --git a/engine/common/rpc/convert/headers.go b/engine/common/rpc/convert/headers.go new file mode 100644 index 00000000000..b45686c853d --- /dev/null +++ b/engine/common/rpc/convert/headers.go @@ -0,0 +1,97 @@ +package convert + +import ( + "fmt" + + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/onflow/flow/protobuf/go/flow/entities" + + "github.com/onflow/flow-go/model/flow" +) + +// BlockHeaderToMessage converts a flow.Header to a protobuf message +func BlockHeaderToMessage( + h *flow.Header, + signerIDs flow.IdentifierList, +) (*entities.BlockHeader, error) { + id := h.ID() + + t := timestamppb.New(h.Timestamp) + var lastViewTC *entities.TimeoutCertificate + if h.LastViewTC != nil { + newestQC := h.LastViewTC.NewestQC + lastViewTC = &entities.TimeoutCertificate{ + View: h.LastViewTC.View, + HighQcViews: h.LastViewTC.NewestQCViews, + SignerIndices: h.LastViewTC.SignerIndices, + SigData: h.LastViewTC.SigData, + HighestQc: &entities.QuorumCertificate{ + View: newestQC.View, + BlockId: newestQC.BlockID[:], + SignerIndices: newestQC.SignerIndices, + SigData: newestQC.SigData, + }, + } + } + parentVoterIds := IdentifiersToMessages(signerIDs) + + return &entities.BlockHeader{ + Id: id[:], + ParentId: h.ParentID[:], + Height: h.Height, + PayloadHash: h.PayloadHash[:], + Timestamp: t, + View: h.View, + ParentView: h.ParentView, + ParentVoterIndices: h.ParentVoterIndices, + ParentVoterIds: parentVoterIds, + ParentVoterSigData: h.ParentVoterSigData, + ProposerId: h.ProposerID[:], + ProposerSigData: h.ProposerSigData, + ChainId: h.ChainID.String(), + LastViewTc: lastViewTC, + }, nil +} + +// MessageToBlockHeader converts a protobuf message to a flow.Header +func MessageToBlockHeader(m *entities.BlockHeader) (*flow.Header, error) { + chainId, err := MessageToChainId(m.ChainId) + if err != nil { + return nil, fmt.Errorf("failed to convert ChainId: %w", err) + } + var lastViewTC *flow.TimeoutCertificate + if m.LastViewTc != nil { + newestQC := m.LastViewTc.HighestQc + if newestQC == nil { + return nil, fmt.Errorf("invalid structure newest QC should be present") + } + lastViewTC = &flow.TimeoutCertificate{ + View: m.LastViewTc.View, + NewestQCViews: m.LastViewTc.HighQcViews, + SignerIndices: m.LastViewTc.SignerIndices, + SigData: m.LastViewTc.SigData, + NewestQC: &flow.QuorumCertificate{ + View: newestQC.View, + BlockID: MessageToIdentifier(newestQC.BlockId), + SignerIndices: newestQC.SignerIndices, + SigData: newestQC.SigData, + }, + } + } + + return &flow.Header{ + ParentID: MessageToIdentifier(m.ParentId), + Height: m.Height, + PayloadHash: MessageToIdentifier(m.PayloadHash), + Timestamp: m.Timestamp.AsTime(), + View: m.View, + ParentView: m.ParentView, + ParentVoterIndices: m.ParentVoterIndices, + ParentVoterSigData: m.ParentVoterSigData, + ProposerID: MessageToIdentifier(m.ProposerId), + ProposerSigData: m.ProposerSigData, + ChainID: *chainId, + LastViewTC: lastViewTC, + }, nil +} diff --git a/engine/common/rpc/convert/shapshots.go b/engine/common/rpc/convert/shapshots.go new file mode 100644 index 00000000000..963f95dbd09 --- /dev/null +++ b/engine/common/rpc/convert/shapshots.go @@ -0,0 +1,35 @@ +package convert + +import ( + "encoding/json" + "fmt" + + "github.com/onflow/flow-go/state/protocol" + "github.com/onflow/flow-go/state/protocol/inmem" +) + +// SnapshotToBytes converts a `protocol.Snapshot` to bytes, encoded as JSON +func SnapshotToBytes(snapshot protocol.Snapshot) ([]byte, error) { + serializable, err := inmem.FromSnapshot(snapshot) + if err != nil { + return nil, err + } + + data, err := json.Marshal(serializable.Encodable()) + if err != nil { + return nil, err + } + + return data, nil +} + +// BytesToInmemSnapshot converts an array of bytes to `inmem.Snapshot` +func BytesToInmemSnapshot(bytes []byte) (*inmem.Snapshot, error) { + var encodable inmem.EncodableSnapshot + err := json.Unmarshal(bytes, &encodable) + if err != nil { + return nil, fmt.Errorf("could not unmarshal decoded snapshot: %w", err) + } + + return inmem.SnapshotFromEncodable(encodable), nil +} diff --git a/engine/common/rpc/convert/transactions.go b/engine/common/rpc/convert/transactions.go new file mode 100644 index 00000000000..ce94b5bae1c --- /dev/null +++ b/engine/common/rpc/convert/transactions.go @@ -0,0 +1,123 @@ +package convert + +import ( + "github.com/onflow/flow/protobuf/go/flow/entities" + + "github.com/onflow/flow-go/model/flow" +) + +// TransactionToMessage converts a flow.TransactionBody to a protobuf message +func TransactionToMessage(tb flow.TransactionBody) *entities.Transaction { + proposalKeyMessage := &entities.Transaction_ProposalKey{ + Address: tb.ProposalKey.Address.Bytes(), + KeyId: uint32(tb.ProposalKey.KeyIndex), + SequenceNumber: tb.ProposalKey.SequenceNumber, + } + + authMessages := make([][]byte, len(tb.Authorizers)) + for i, auth := range tb.Authorizers { + authMessages[i] = auth.Bytes() + } + + payloadSigMessages := make([]*entities.Transaction_Signature, len(tb.PayloadSignatures)) + + for i, sig := range tb.PayloadSignatures { + payloadSigMessages[i] = &entities.Transaction_Signature{ + Address: sig.Address.Bytes(), + KeyId: uint32(sig.KeyIndex), + Signature: sig.Signature, + } + } + + envelopeSigMessages := make([]*entities.Transaction_Signature, len(tb.EnvelopeSignatures)) + + for i, sig := range tb.EnvelopeSignatures { + envelopeSigMessages[i] = &entities.Transaction_Signature{ + Address: sig.Address.Bytes(), + KeyId: uint32(sig.KeyIndex), + Signature: sig.Signature, + } + } + + return &entities.Transaction{ + Script: tb.Script, + Arguments: tb.Arguments, + ReferenceBlockId: tb.ReferenceBlockID[:], + GasLimit: tb.GasLimit, + ProposalKey: proposalKeyMessage, + Payer: tb.Payer.Bytes(), + Authorizers: authMessages, + PayloadSignatures: payloadSigMessages, + EnvelopeSignatures: envelopeSigMessages, + } +} + +// MessageToTransaction converts a protobuf message to a flow.TransactionBody +func MessageToTransaction( + m *entities.Transaction, + chain flow.Chain, +) (flow.TransactionBody, error) { + if m == nil { + return flow.TransactionBody{}, ErrEmptyMessage + } + + t := flow.NewTransactionBody() + + proposalKey := m.GetProposalKey() + if proposalKey != nil { + proposalAddress, err := Address(proposalKey.GetAddress(), chain) + if err != nil { + return *t, err + } + t.SetProposalKey(proposalAddress, uint64(proposalKey.GetKeyId()), proposalKey.GetSequenceNumber()) + } + + payer := m.GetPayer() + if payer != nil { + payerAddress, err := Address(payer, chain) + if err != nil { + return *t, err + } + t.SetPayer(payerAddress) + } + + for _, authorizer := range m.GetAuthorizers() { + authorizerAddress, err := Address(authorizer, chain) + if err != nil { + return *t, err + } + t.AddAuthorizer(authorizerAddress) + } + + for _, sig := range m.GetPayloadSignatures() { + addr, err := Address(sig.GetAddress(), chain) + if err != nil { + return *t, err + } + t.AddPayloadSignature(addr, uint64(sig.GetKeyId()), sig.GetSignature()) + } + + for _, sig := range m.GetEnvelopeSignatures() { + addr, err := Address(sig.GetAddress(), chain) + if err != nil { + return *t, err + } + t.AddEnvelopeSignature(addr, uint64(sig.GetKeyId()), sig.GetSignature()) + } + + t.SetScript(m.GetScript()) + t.SetArguments(m.GetArguments()) + t.SetReferenceBlockID(flow.HashToID(m.GetReferenceBlockId())) + t.SetGasLimit(m.GetGasLimit()) + + return *t, nil +} + +// TransactionsToMessages converts a slice of flow.TransactionBody to a slice of protobuf messages +func TransactionsToMessages(transactions []*flow.TransactionBody) []*entities.Transaction { + transactionMessages := make([]*entities.Transaction, len(transactions)) + for i, t := range transactions { + transactionMessages[i] = TransactionToMessage(*t) + } + return transactionMessages +} diff --git a/engine/common/rpc/convert/transactions_test.go b/engine/common/rpc/convert/transactions_test.go new file mode 100644 index 00000000000..39c8abf1b5c --- /dev/null +++ b/engine/common/rpc/convert/transactions_test.go @@ -0,0 +1,22 @@ +package convert_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestConvertTransaction(t *testing.T) { + tx := unittest.TransactionBodyFixture() + + msg := convert.TransactionToMessage(tx) + converted, err := convert.MessageToTransaction(msg, flow.Testnet.Chain()) + assert.Nil(t, err) + + assert.Equal(t, tx, converted) + assert.Equal(t, tx.ID(), converted.ID()) +} From 9b9f929409ca7f1d48f98e0080be4247b57d75f6 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Mon, 26 Jun 2023 21:23:34 -0700 Subject: [PATCH 2/7] add unit tests for blocks and headers --- engine/common/rpc/convert/accounts_test.go | 29 ++++++++++ engine/common/rpc/convert/blocks.go | 1 - engine/common/rpc/convert/blocks_test.go | 65 ++++++++++++++++++++++ engine/common/rpc/convert/headers_test.go | 29 ++++++++++ utils/unittest/fixtures.go | 3 +- 5 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 engine/common/rpc/convert/blocks_test.go create mode 100644 engine/common/rpc/convert/headers_test.go diff --git a/engine/common/rpc/convert/accounts_test.go b/engine/common/rpc/convert/accounts_test.go index 2ccbe72733f..a1b3d80e5c7 100644 --- a/engine/common/rpc/convert/accounts_test.go +++ b/engine/common/rpc/convert/accounts_test.go @@ -4,13 +4,42 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/crypto" + "github.com/onflow/flow-go/crypto/hash" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/fvm" "github.com/onflow/flow-go/utils/unittest" ) +// TestConvertAccount tests that converting an account to and from a protobuf message results in +// the same account +func TestConvertAccount(t *testing.T) { + t.Parallel() + + a, err := unittest.AccountFixture() + require.NoError(t, err) + + key2, err := unittest.AccountKeyFixture(128, crypto.ECDSAP256, hash.SHA3_256) + require.NoError(t, err) + + a.Keys = append(a.Keys, key2.PublicKey(500)) + + msg, err := convert.AccountToMessage(a) + require.NoError(t, err) + + converted, err := convert.MessageToAccount(msg) + require.NoError(t, err) + + assert.Equal(t, a, converted) +} + +// TestConvertAccountKey tests that converting an account key to and from a protobuf message results +// in the same account key func TestConvertAccountKey(t *testing.T) { + t.Parallel() + privateKey, _ := unittest.AccountKeyDefaultFixture() accountKey := privateKey.PublicKey(fvm.AccountKeyWeightThreshold) diff --git a/engine/common/rpc/convert/blocks.go b/engine/common/rpc/convert/blocks.go index 9633c86ddfa..3c42fffb4c0 100644 --- a/engine/common/rpc/convert/blocks.go +++ b/engine/common/rpc/convert/blocks.go @@ -15,7 +15,6 @@ func BlockToMessage(h *flow.Block, signerIDs flow.IdentifierList) ( *entities.Block, error, ) { - id := h.ID() parentID := h.Header.ParentID diff --git a/engine/common/rpc/convert/blocks_test.go b/engine/common/rpc/convert/blocks_test.go new file mode 100644 index 00000000000..87af2eabd85 --- /dev/null +++ b/engine/common/rpc/convert/blocks_test.go @@ -0,0 +1,65 @@ +package convert_test + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestConvertBlock tests that converting a block to and from a protobuf message results in the same +// block +func TestConvertBlock(t *testing.T) { + t.Parallel() + + block := unittest.FullBlockFixture() + block.SetPayload(unittest.PayloadFixture(unittest.WithAllTheFixins)) + + signerIDs := unittest.IdentifierListFixture(5) + + msg, err := convert.BlockToMessage(&block, signerIDs) + require.NoError(t, err) + + converted, err := convert.MessageToBlock(msg) + require.NoError(t, err) + + assert.Equal(t, block, *converted) +} + +// TestConvertBlockLight tests that converting a block to its light form results in only the correct +// fields being set +func TestConvertBlockLight(t *testing.T) { + t.Parallel() + + block := unittest.FullBlockFixture() + block.SetPayload(unittest.PayloadFixture(unittest.WithAllTheFixins)) + + msg := convert.BlockToMessageLight(&block) + + // required fields are set + blockID := block.ID() + assert.Equal(t, 0, bytes.Compare(blockID[:], msg.Id)) + assert.Equal(t, block.Header.Height, msg.Height) + assert.Equal(t, 0, bytes.Compare(block.Header.ParentID[:], msg.ParentId)) + assert.Equal(t, block.Header.Timestamp, msg.Timestamp.AsTime()) + assert.Equal(t, 0, bytes.Compare(block.Header.ParentVoterSigData, msg.Signatures[0])) + + guarantees := []*flow.CollectionGuarantee{} + for _, g := range msg.CollectionGuarantees { + guarantee := convert.MessageToCollectionGuarantee(g) + guarantees = append(guarantees, guarantee) + } + + assert.Equal(t, block.Payload.Guarantees, guarantees) + + // all other fields are not + assert.Nil(t, msg.BlockHeader) + assert.Len(t, msg.BlockSeals, 0) + assert.Len(t, msg.ExecutionReceiptMetaList, 0) + assert.Len(t, msg.ExecutionResultList, 0) +} diff --git a/engine/common/rpc/convert/headers_test.go b/engine/common/rpc/convert/headers_test.go new file mode 100644 index 00000000000..d74b930b307 --- /dev/null +++ b/engine/common/rpc/convert/headers_test.go @@ -0,0 +1,29 @@ +package convert_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestConvertBlockHeader tests that converting a header to and from a protobuf message results in the same +// header +func TestConvertBlockHeader(t *testing.T) { + t.Parallel() + + header := unittest.BlockHeaderFixture() + + signerIDs := unittest.IdentifierListFixture(5) + + msg, err := convert.BlockHeaderToMessage(header, signerIDs) + require.NoError(t, err) + + converted, err := convert.MessageToBlockHeader(msg) + require.NoError(t, err) + + assert.Equal(t, header, converted) +} diff --git a/utils/unittest/fixtures.go b/utils/unittest/fixtures.go index 884bdd5abf7..489187fc86b 100644 --- a/utils/unittest/fixtures.go +++ b/utils/unittest/fixtures.go @@ -717,7 +717,7 @@ func ExecutionReceiptFixture(opts ...func(*flow.ExecutionReceipt)) *flow.Executi receipt := &flow.ExecutionReceipt{ ExecutorID: IdentifierFixture(), ExecutionResult: *ExecutionResultFixture(), - Spocks: nil, + Spocks: SignaturesFixture(5), ExecutorSignature: SignatureFixture(), } @@ -849,6 +849,7 @@ func ExecutionResultFixture(opts ...func(*flow.ExecutionResult)) *flow.Execution PreviousResultID: IdentifierFixture(), BlockID: IdentifierFixture(), Chunks: ChunkListFixture(2, blockID), + ServiceEvents: ServiceEventsFixture(3), ExecutionDataID: IdentifierFixture(), } From ba6cc1e1c859690d66826ab434bcab6be196ab25 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Tue, 27 Jun 2023 20:32:52 -0700 Subject: [PATCH 3/7] add more unittests --- engine/common/rpc/convert/collections_test.go | 71 ++++++ engine/common/rpc/convert/events.go | 36 ++- engine/common/rpc/convert/events_test.go | 218 ++++++++++++++---- .../common/rpc/convert/execution_data_test.go | 103 +++------ .../rpc/convert/execution_results_test.go | 55 +++++ .../convert/{shapshots.go => snapshots.go} | 0 engine/common/rpc/convert/snapshots_test.go | 27 +++ .../common/rpc/convert/transactions_test.go | 13 +- utils/unittest/fixtures.go | 23 +- 9 files changed, 403 insertions(+), 143 deletions(-) create mode 100644 engine/common/rpc/convert/collections_test.go create mode 100644 engine/common/rpc/convert/execution_results_test.go rename engine/common/rpc/convert/{shapshots.go => snapshots.go} (100%) create mode 100644 engine/common/rpc/convert/snapshots_test.go diff --git a/engine/common/rpc/convert/collections_test.go b/engine/common/rpc/convert/collections_test.go new file mode 100644 index 00000000000..a95f712a7b8 --- /dev/null +++ b/engine/common/rpc/convert/collections_test.go @@ -0,0 +1,71 @@ +package convert_test + +import ( + "testing" + + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestConvertCollection tests that converting a collection to a protobuf message results in the correct +// set of transaction IDs +func TestConvertCollection(t *testing.T) { + t.Parallel() + + collection := unittest.CollectionFixture(5) + txIDs := make([]flow.Identifier, 0, len(collection.Transactions)) + for _, tx := range collection.Transactions { + txIDs = append(txIDs, tx.ID()) + } + + t.Run("convert collection to message", func(t *testing.T) { + msg, err := convert.CollectionToMessage(&collection) + require.NoError(t, err) + + assert.Len(t, msg.TransactionIds, len(txIDs)) + for i, txID := range txIDs { + assert.Equal(t, txID[:], msg.TransactionIds[i]) + } + }) + + t.Run("convert light collection to message", func(t *testing.T) { + lightCollection := flow.LightCollection{Transactions: txIDs} + + msg, err := convert.LightCollectionToMessage(&lightCollection) + require.NoError(t, err) + + assert.Len(t, msg.TransactionIds, len(txIDs)) + for i, txID := range txIDs { + assert.Equal(t, txID[:], msg.TransactionIds[i]) + } + }) +} + +// TestConvertCollectionGuarantee tests that converting a collection guarantee to and from a protobuf +// message results in the same collection guarantee +func TestConvertCollectionGuarantee(t *testing.T) { + t.Parallel() + + guarantee := unittest.CollectionGuaranteeFixture(unittest.WithCollRef(unittest.IdentifierFixture())) + + msg := convert.CollectionGuaranteeToMessage(guarantee) + converted := convert.MessageToCollectionGuarantee(msg) + + assert.Equal(t, guarantee, converted) +} + +// TestConvertCollectionGuarantees tests that converting a collection guarantee to and from a protobuf +// message results in the same collection guarantee +func TestConvertCollectionGuarantees(t *testing.T) { + t.Parallel() + + guarantees := unittest.CollectionGuaranteesFixture(5, unittest.WithCollRef(unittest.IdentifierFixture())) + + msg := convert.CollectionGuaranteesToMessages(guarantees) + converted := convert.MessagesToCollectionGuarantees(msg) + + assert.Equal(t, guarantees, converted) +} diff --git a/engine/common/rpc/convert/events.go b/engine/common/rpc/convert/events.go index 0d0b20179e7..cdcce9d1e7a 100644 --- a/engine/common/rpc/convert/events.go +++ b/engine/common/rpc/convert/events.go @@ -51,33 +51,26 @@ func EventsToMessages(flowEvents []flow.Event) []*entities.Event { // Note: this function does not convert the payload encoding func MessagesToEvents(l []*entities.Event) []flow.Event { events := make([]flow.Event, len(l)) - for i, m := range l { events[i] = MessageToEvent(m) } - return events } // MessageToEventFromVersion converts a protobuf message to a flow.Event, and converts the payload // encoding from CCF to JSON if the input version is CCF func MessageToEventFromVersion(m *entities.Event, inputVersion execproto.EventEncodingVersion) (*flow.Event, error) { + event := MessageToEvent(m) switch inputVersion { case execproto.EventEncodingVersion_CCF_V0: - convertedPayload, err := CcfPayloadToJsonPayload(m.Payload) + convertedPayload, err := CcfPayloadToJsonPayload(event.Payload) if err != nil { return nil, fmt.Errorf("could not convert event payload from CCF to Json: %w", err) } - return &flow.Event{ - Type: flow.EventType(m.GetType()), - TransactionID: flow.HashToID(m.GetTransactionId()), - TransactionIndex: m.GetTransactionIndex(), - EventIndex: m.GetEventIndex(), - Payload: convertedPayload, - }, nil + event.Payload = convertedPayload + return &event, nil case execproto.EventEncodingVersion_JSON_CDC_V0: - je := MessageToEvent(m) - return &je, nil + return &event, nil default: return nil, fmt.Errorf("invalid encoding format %d", inputVersion) } @@ -120,6 +113,22 @@ func MessageToServiceEvent(m *entities.ServiceEvent) (*flow.ServiceEvent, error) return &se, err } +// ServiceEventListToMessages converts a slice of flow.ServiceEvents to a slice of protobuf messages +func ServiceEventListToMessages(list flow.ServiceEventList) ( + []*entities.ServiceEvent, + error, +) { + entities := make([]*entities.ServiceEvent, len(list)) + for i, serviceEvent := range list { + m, err := ServiceEventToMessage(serviceEvent) + if err != nil { + return nil, fmt.Errorf("failed to convert service event at index %d to message: %w", i, err) + } + entities[i] = m + } + return entities, nil +} + // ServiceEventsToMessages converts a slice of flow.ServiceEvents to a slice of protobuf messages func MessagesToServiceEventList(m []*entities.ServiceEvent) ( flow.ServiceEventList, @@ -138,6 +147,9 @@ func MessagesToServiceEventList(m []*entities.ServiceEvent) ( // CcfPayloadToJsonPayload converts a CCF-encoded payload to a JSON-encoded payload func CcfPayloadToJsonPayload(p []byte) ([]byte, error) { + if len(p) == 0 { + return p, nil + } val, err := ccf.Decode(nil, p) if err != nil { return nil, fmt.Errorf("unable to decode from ccf format: %w", err) diff --git a/engine/common/rpc/convert/events_test.go b/engine/common/rpc/convert/events_test.go index f04ee6a3e5b..f0d5a7f857d 100644 --- a/engine/common/rpc/convert/events_test.go +++ b/engine/common/rpc/convert/events_test.go @@ -15,84 +15,200 @@ import ( "github.com/onflow/flow-go/utils/unittest" ) -func TestConvertEvents(t *testing.T) { - t.Run("empty", func(t *testing.T) { - messages := convert.EventsToMessages(nil) - assert.Len(t, messages, 0) - }) +// TestConvertEventWithoutPayloadConversion tests converting events to and from protobuf messages +// with no payload modification +func TestConvertEventWithoutPayloadConversion(t *testing.T) { + t.Parallel() - t.Run("simple", func(t *testing.T) { + txID := unittest.IdentifierFixture() + cadenceValue, err := cadence.NewValue(2) + require.NoError(t, err) - txID := unittest.IdentifierFixture() + t.Run("convert empty event", func(t *testing.T) { event := unittest.EventFixture(flow.EventAccountCreated, 2, 3, txID, 0) - messages := convert.EventsToMessages([]flow.Event{event}) - - require.Len(t, messages, 1) + msg := convert.EventToMessage(event) + converted := convert.MessageToEvent(msg) - message := messages[0] - - require.Equal(t, event.EventIndex, message.EventIndex) - require.Equal(t, event.TransactionIndex, message.TransactionIndex) - require.Equal(t, event.Payload, message.Payload) - require.Equal(t, event.TransactionID[:], message.TransactionId) - require.Equal(t, string(event.Type), message.Type) + assert.Equal(t, event, converted) }) - t.Run("convert event from ccf format", func(t *testing.T) { - cadenceValue, err := cadence.NewValue(2) - require.NoError(t, err) + t.Run("convert json cdc encoded event", func(t *testing.T) { ccfPayload, err := ccf.Encode(cadenceValue) require.NoError(t, err) + + event := unittest.EventFixture(flow.EventAccountCreated, 2, 3, txID, 0) + event.Payload = ccfPayload + + msg := convert.EventToMessage(event) + converted := convert.MessageToEvent(msg) + + assert.Equal(t, event, converted) + }) + + t.Run("convert json cdc encoded event", func(t *testing.T) { jsonPayload, err := jsoncdc.Encode(cadenceValue) require.NoError(t, err) - txID := unittest.IdentifierFixture() - ccfEvent := unittest.EventFixture( - flow.EventAccountCreated, 2, 3, txID, 0) - ccfEvent.Payload = ccfPayload - jsonEvent := unittest.EventFixture( - flow.EventAccountCreated, 2, 3, txID, 0) - jsonEvent.Payload = jsonPayload + + event := unittest.EventFixture(flow.EventAccountCreated, 2, 3, txID, 0) + event.Payload = jsonPayload + + msg := convert.EventToMessage(event) + converted := convert.MessageToEvent(msg) + + assert.Equal(t, event.Type, converted.Type) + }) +} + +// TestConvertEventWithPayloadConversion tests converting events to and from protobuf messages +// with payload modification +func TestConvertEventWithPayloadConversion(t *testing.T) { + t.Parallel() + + txID := unittest.IdentifierFixture() + cadenceValue, err := cadence.NewValue(2) + require.NoError(t, err) + + ccfEvent := unittest.EventFixture(flow.EventAccountCreated, 2, 3, txID, 0) + ccfEvent.Payload, err = ccf.Encode(cadenceValue) + require.NoError(t, err) + + jsonEvent := unittest.EventFixture(flow.EventAccountCreated, 2, 3, txID, 0) + jsonEvent.Payload, err = jsoncdc.Encode(cadenceValue) + require.NoError(t, err) + + t.Run("convert empty ccf payload", func(t *testing.T) { + event := unittest.EventFixture(flow.EventAccountCreated, 2, 3, txID, 0) + + msg := convert.EventToMessage(event) + convertedEvent, err := convert.MessageToEventFromVersion(msg, execproto.EventEncodingVersion_CCF_V0) + assert.NoError(t, err) + + assert.Equal(t, event, *convertedEvent) + }) + + t.Run("convert empty jsoncdc payload", func(t *testing.T) { + event := unittest.EventFixture(flow.EventAccountCreated, 2, 3, txID, 0) + + msg := convert.EventToMessage(event) + convertedEvent, err := convert.MessageToEventFromVersion(msg, execproto.EventEncodingVersion_JSON_CDC_V0) + assert.NoError(t, err) + + assert.Equal(t, event, *convertedEvent) + }) + + t.Run("convert payload from ccf to jsoncdc", func(t *testing.T) { message := convert.EventToMessage(ccfEvent) convertedEvent, err := convert.MessageToEventFromVersion(message, execproto.EventEncodingVersion_CCF_V0) assert.NoError(t, err) + assert.Equal(t, jsonEvent, *convertedEvent) }) - t.Run("convert event from json cdc format", func(t *testing.T) { - cadenceValue, err := cadence.NewValue(2) - require.NoError(t, err) - txID := unittest.IdentifierFixture() - jsonEvent := unittest.EventFixture( - flow.EventAccountCreated, 2, 3, txID, 0) - jsonPayload, err := jsoncdc.Encode(cadenceValue) - require.NoError(t, err) - jsonEvent.Payload = jsonPayload + t.Run("convert payload from jsoncdc to jsoncdc", func(t *testing.T) { message := convert.EventToMessage(jsonEvent) convertedEvent, err := convert.MessageToEventFromVersion(message, execproto.EventEncodingVersion_JSON_CDC_V0) assert.NoError(t, err) + assert.Equal(t, jsonEvent, *convertedEvent) }) +} - t.Run("convert payload from ccf to jsoncdc", func(t *testing.T) { - // Round trip conversion check - cadenceValue, err := cadence.NewValue(2) - require.NoError(t, err) - ccfPayload, err := ccf.Encode(cadenceValue) +func TestConvertEvents(t *testing.T) { + t.Parallel() + + eventCount := 3 + txID := unittest.IdentifierFixture() + + events := make([]flow.Event, eventCount) + ccfEvents := make([]flow.Event, eventCount) + jsonEvents := make([]flow.Event, eventCount) + for i := 0; i < eventCount; i++ { + cadenceValue, err := cadence.NewValue(i) require.NoError(t, err) - txID := unittest.IdentifierFixture() - ccfEvent := unittest.EventFixture( - flow.EventAccountCreated, 2, 3, txID, 0) - ccfEvent.Payload = ccfPayload - jsonEvent := unittest.EventFixture( - flow.EventAccountCreated, 2, 3, txID, 0) - jsonPayload, err := jsoncdc.Encode(cadenceValue) + eventIndex := 3 + uint32(i) + + event := unittest.EventFixture(flow.EventAccountCreated, 2, eventIndex, txID, 0) + ccfEvent := unittest.EventFixture(flow.EventAccountCreated, 2, eventIndex, txID, 0) + jsonEvent := unittest.EventFixture(flow.EventAccountCreated, 2, eventIndex, txID, 0) + + ccfEvent.Payload, err = ccf.Encode(cadenceValue) require.NoError(t, err) - jsonEvent.Payload = jsonPayload - res, err := convert.CcfEventToJsonEvent(ccfEvent) + jsonEvent.Payload, err = jsoncdc.Encode(cadenceValue) require.NoError(t, err) - require.Equal(t, jsonEvent, *res) + + events[i] = event + ccfEvents[i] = ccfEvent + jsonEvents[i] = jsonEvent + } + + t.Run("empty", func(t *testing.T) { + messages := convert.EventsToMessages(nil) + assert.Len(t, messages, 0) + }) + + t.Run("convert with passthrough payload conversion", func(t *testing.T) { + messages := convert.EventsToMessages(events) + require.Len(t, messages, len(events)) + + for i, message := range messages { + event := events[i] + require.Equal(t, event.EventIndex, message.EventIndex) + require.Equal(t, event.TransactionIndex, message.TransactionIndex) + require.Equal(t, event.Payload, message.Payload) + require.Equal(t, event.TransactionID[:], message.TransactionId) + require.Equal(t, string(event.Type), message.Type) + } + + converted := convert.MessagesToEvents(messages) + assert.Equal(t, events, converted) }) + + t.Run("convert event from ccf to jsoncdc", func(t *testing.T) { + messages := convert.EventsToMessages(ccfEvents) + converted, err := convert.MessagesToEventsFromVersion(messages, execproto.EventEncodingVersion_CCF_V0) + assert.NoError(t, err) + + assert.Equal(t, jsonEvents, converted) + }) + + t.Run("convert event from jsoncdc", func(t *testing.T) { + messages := convert.EventsToMessages(jsonEvents) + converted, err := convert.MessagesToEventsFromVersion(messages, execproto.EventEncodingVersion_JSON_CDC_V0) + assert.NoError(t, err) + + assert.Equal(t, jsonEvents, converted) + }) +} + +func TestConvertServiceEvent(t *testing.T) { + t.Parallel() + + serviceEvents := unittest.ServiceEventsFixture(1) + require.Len(t, serviceEvents, 1) + + msg, err := convert.ServiceEventToMessage(serviceEvents[0]) + require.NoError(t, err) + + converted, err := convert.MessageToServiceEvent(msg) + require.NoError(t, err) + + assert.Equal(t, serviceEvents[0], *converted) +} + +func TestConvertServiceEventList(t *testing.T) { + t.Parallel() + + serviceEvents := unittest.ServiceEventsFixture(5) + require.Len(t, serviceEvents, 5) + + msg, err := convert.ServiceEventListToMessages(serviceEvents) + require.NoError(t, err) + + converted, err := convert.MessagesToServiceEventList(msg) + require.NoError(t, err) + + assert.Equal(t, serviceEvents, converted) } diff --git a/engine/common/rpc/convert/execution_data_test.go b/engine/common/rpc/convert/execution_data_test.go index a0060d56c43..2ebfd915c28 100644 --- a/engine/common/rpc/convert/execution_data_test.go +++ b/engine/common/rpc/convert/execution_data_test.go @@ -1,99 +1,60 @@ package convert_test import ( - "bytes" - "math/rand" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/engine/common/rpc/convert" - "github.com/onflow/flow-go/ledger" - "github.com/onflow/flow-go/ledger/common/testutils" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/executiondatasync/execution_data" "github.com/onflow/flow-go/utils/unittest" ) -// TestConvertBlockExecutionData checks if conversions between BlockExecutionData and it's fields are consistent. -func TestConvertBlockExecutionData(t *testing.T) { - // Initialize the BlockExecutionData object - numChunks := 5 - ced := make([]*execution_data.ChunkExecutionData, numChunks) - bed := &execution_data.BlockExecutionData{ - BlockID: unittest.IdentifierFixture(), - ChunkExecutionDatas: ced, - } - - // Fill the chunk execution datas with trie updates, collections, and events - minSerializedSize := uint64(10 * execution_data.DefaultMaxBlobSize) - for i := 0; i < numChunks; i++ { - // the service chunk sometimes does not have any trie updates - if i == numChunks-1 { - tx1 := unittest.TransactionBodyFixture() - // proposal key and payer are empty addresses for service tx - tx1.ProposalKey.Address = flow.EmptyAddress - tx1.Payer = flow.EmptyAddress - bed.ChunkExecutionDatas[i] = &execution_data.ChunkExecutionData{ - Collection: &flow.Collection{Transactions: []*flow.TransactionBody{&tx1}}, - } - continue - } - - // Initialize collection - tx1 := unittest.TransactionBodyFixture() - tx2 := unittest.TransactionBodyFixture() - col := &flow.Collection{Transactions: []*flow.TransactionBody{&tx1, &tx2}} - - // Initialize events - header := unittest.BlockHeaderFixture() - events := unittest.BlockEventsFixture(header, 5).Events - - chunk := &execution_data.ChunkExecutionData{ - Collection: col, - Events: events, - TrieUpdate: testutils.TrieUpdateFixture(1, 1, 8), - } - size := 1 - - // Fill the TrieUpdate with data - inner: - for { - buf := &bytes.Buffer{} - require.NoError(t, execution_data.DefaultSerializer.Serialize(buf, chunk)) +func TestConvertBlockExecutionData1(t *testing.T) { + t.Parallel() - if buf.Len() >= int(minSerializedSize) { - break inner - } + chain := flow.Testnet.Chain() // this is used by the AddressFixture + events := unittest.EventsFixture(5) - v := make([]byte, size) - _, _ = rand.Read(v) - - k, err := chunk.TrieUpdate.Payloads[0].Key() - require.NoError(t, err) + chunks := 5 + chunkData := make([]*execution_data.ChunkExecutionData, 0, chunks) + for i := 0; i < chunks-1; i++ { + chunkData = append(chunkData, unittest.ChunkExecutionDataFixture(t, execution_data.DefaultMaxBlobSize/5, unittest.WithChunkEvents(events))) + } + makeServiceTx := func(ced *execution_data.ChunkExecutionData) { + // proposal key and payer are empty addresses for service tx + collection := unittest.CollectionFixture(1) + collection.Transactions[0].ProposalKey.Address = flow.EmptyAddress + collection.Transactions[0].Payer = flow.EmptyAddress + ced.Collection = &collection - chunk.TrieUpdate.Payloads[0] = ledger.NewPayload(k, v) - size *= 2 - } - bed.ChunkExecutionDatas[i] = chunk + // the service chunk sometimes does not have any trie updates + ced.TrieUpdate = nil } + chunk := unittest.ChunkExecutionDataFixture(t, execution_data.DefaultMaxBlobSize/5, unittest.WithChunkEvents(events), makeServiceTx) + chunkData = append(chunkData, chunk) + + blockData := unittest.BlockExecutionDataFixture(unittest.WithChunkExecutionDatas(chunkData...)) t.Run("chunk execution data conversions", func(t *testing.T) { - chunkMsg, err := convert.ChunkExecutionDataToMessage(bed.ChunkExecutionDatas[0]) - assert.Nil(t, err) + chunkMsg, err := convert.ChunkExecutionDataToMessage(chunkData[0]) + require.NoError(t, err) chunkReConverted, err := convert.MessageToChunkExecutionData(chunkMsg, flow.Testnet.Chain()) - assert.Nil(t, err) - assert.Equal(t, bed.ChunkExecutionDatas[0], chunkReConverted) + require.NoError(t, err) + + assert.Equal(t, chunkData[0], chunkReConverted) }) t.Run("block execution data conversions", func(t *testing.T) { - blockMsg, err := convert.BlockExecutionDataToMessage(bed) - assert.Nil(t, err) + msg, err := convert.BlockExecutionDataToMessage(blockData) + require.NoError(t, err) + + converted, err := convert.MessageToBlockExecutionData(msg, chain) + require.NoError(t, err) - bedReConverted, err := convert.MessageToBlockExecutionData(blockMsg, flow.Testnet.Chain()) - assert.Nil(t, err) - assert.Equal(t, bed, bedReConverted) + assert.Equal(t, blockData, converted) }) } diff --git a/engine/common/rpc/convert/execution_results_test.go b/engine/common/rpc/convert/execution_results_test.go new file mode 100644 index 00000000000..a985014e6dd --- /dev/null +++ b/engine/common/rpc/convert/execution_results_test.go @@ -0,0 +1,55 @@ +package convert_test + +import ( + "testing" + + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConvertExecutionResult(t *testing.T) { + t.Parallel() + + er := unittest.ExecutionResultFixture() + + msg, err := convert.ExecutionResultToMessage(er) + require.NoError(t, err) + + converted, err := convert.MessageToExecutionResult(msg) + require.NoError(t, err) + + assert.Equal(t, er, converted) +} + +func TestConvertExecutionResults(t *testing.T) { + t.Parallel() + + results := []*flow.ExecutionResult{ + unittest.ExecutionResultFixture(), + unittest.ExecutionResultFixture(), + unittest.ExecutionResultFixture(), + } + + msg, err := convert.ExecutionResultsToMessages(results) + require.NoError(t, err) + + converted, err := convert.MessagesToExecutionResults(msg) + require.NoError(t, err) + + assert.Equal(t, results, converted) +} + +func TestConvertExecutionResultMetaList(t *testing.T) { + t.Parallel() + + block := unittest.FullBlockFixture() + metaList := block.Payload.Receipts + + msg := convert.ExecutionResultMetaListToMessages(metaList) + converted := convert.MessagesToExecutionResultMetaList(msg) + + assert.Equal(t, metaList, converted) +} diff --git a/engine/common/rpc/convert/shapshots.go b/engine/common/rpc/convert/snapshots.go similarity index 100% rename from engine/common/rpc/convert/shapshots.go rename to engine/common/rpc/convert/snapshots.go diff --git a/engine/common/rpc/convert/snapshots_test.go b/engine/common/rpc/convert/snapshots_test.go new file mode 100644 index 00000000000..2e1d4ce91e1 --- /dev/null +++ b/engine/common/rpc/convert/snapshots_test.go @@ -0,0 +1,27 @@ +package convert_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestConvertSnapshot(t *testing.T) { + t.Parallel() + + identities := unittest.CompleteIdentitySet() + snapshot := unittest.RootSnapshotFixtureWithChainID(identities, flow.Testnet.Chain().ChainID()) + + msg, err := convert.SnapshotToBytes(snapshot) + require.NoError(t, err) + + converted, err := convert.BytesToInmemSnapshot(msg) + require.NoError(t, err) + + assert.Equal(t, snapshot, converted) +} diff --git a/engine/common/rpc/convert/transactions_test.go b/engine/common/rpc/convert/transactions_test.go index 39c8abf1b5c..939a214bc72 100644 --- a/engine/common/rpc/convert/transactions_test.go +++ b/engine/common/rpc/convert/transactions_test.go @@ -4,18 +4,29 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/onflow/cadence" + jsoncdc "github.com/onflow/cadence/encoding/json" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" ) func TestConvertTransaction(t *testing.T) { + t.Parallel() + tx := unittest.TransactionBodyFixture() + arg, err := jsoncdc.Encode(cadence.NewAddress(unittest.AddressFixture())) + require.NoError(t, err) + + // add fields not included in the fixture + tx.Arguments = append(tx.Arguments, arg) + tx.EnvelopeSignatures = append(tx.EnvelopeSignatures, unittest.TransactionSignatureFixture()) msg := convert.TransactionToMessage(tx) converted, err := convert.MessageToTransaction(msg, flow.Testnet.Chain()) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, tx, converted) assert.Equal(t, tx.ID(), converted.ID()) diff --git a/utils/unittest/fixtures.go b/utils/unittest/fixtures.go index 489187fc86b..8d9edb9df4b 100644 --- a/utils/unittest/fixtures.go +++ b/utils/unittest/fixtures.go @@ -1609,6 +1609,18 @@ func BlockEventsFixture( n int, types ...flow.EventType, ) flow.BlockEvents { + return flow.BlockEvents{ + BlockID: header.ID(), + BlockHeight: header.Height, + BlockTimestamp: header.Timestamp, + Events: EventsFixture(n, types...), + } +} + +func EventsFixture( + n int, + types ...flow.EventType, +) []flow.Event { if len(types) == 0 { types = []flow.EventType{"A.0x1.Foo.Bar", "A.0x2.Zoo.Moo", "A.0x3.Goo.Hoo"} } @@ -1618,12 +1630,7 @@ func BlockEventsFixture( events[i] = EventFixture(types[i%len(types)], 0, uint32(i), IdentifierFixture(), 0) } - return flow.BlockEvents{ - BlockID: header.ID(), - BlockHeight: header.Height, - BlockTimestamp: header.Timestamp, - Events: events, - } + return events } // EventFixture returns an event @@ -2412,7 +2419,7 @@ func WithChunkEvents(events flow.EventsList) func(*execution_data.ChunkExecution } func ChunkExecutionDataFixture(t *testing.T, minSize int, opts ...func(*execution_data.ChunkExecutionData)) *execution_data.ChunkExecutionData { - collection := CollectionFixture(1) + collection := CollectionFixture(5) ced := &execution_data.ChunkExecutionData{ Collection: &collection, Events: flow.EventsList{}, @@ -2423,7 +2430,7 @@ func ChunkExecutionDataFixture(t *testing.T, minSize int, opts ...func(*executio opt(ced) } - if minSize <= 1 { + if minSize <= 1 || ced.TrieUpdate == nil { return ced } From 01ef9daf2f7ecb475b4c39939c599751c84bd8c3 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Tue, 27 Jun 2023 21:10:17 -0700 Subject: [PATCH 4/7] revert updates to main fixtures --- .../common/rpc/convert/execution_results_test.go | 9 +++++---- utils/unittest/fixtures.go | 14 +++++++++++--- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/engine/common/rpc/convert/execution_results_test.go b/engine/common/rpc/convert/execution_results_test.go index a985014e6dd..3f38a5b7e59 100644 --- a/engine/common/rpc/convert/execution_results_test.go +++ b/engine/common/rpc/convert/execution_results_test.go @@ -13,7 +13,7 @@ import ( func TestConvertExecutionResult(t *testing.T) { t.Parallel() - er := unittest.ExecutionResultFixture() + er := unittest.ExecutionResultFixture(unittest.WithServiceEvents(3)) msg, err := convert.ExecutionResultToMessage(er) require.NoError(t, err) @@ -28,9 +28,9 @@ func TestConvertExecutionResults(t *testing.T) { t.Parallel() results := []*flow.ExecutionResult{ - unittest.ExecutionResultFixture(), - unittest.ExecutionResultFixture(), - unittest.ExecutionResultFixture(), + unittest.ExecutionResultFixture(unittest.WithServiceEvents(3)), + unittest.ExecutionResultFixture(unittest.WithServiceEvents(3)), + unittest.ExecutionResultFixture(unittest.WithServiceEvents(3)), } msg, err := convert.ExecutionResultsToMessages(results) @@ -46,6 +46,7 @@ func TestConvertExecutionResultMetaList(t *testing.T) { t.Parallel() block := unittest.FullBlockFixture() + block.SetPayload(unittest.PayloadFixture(unittest.WithAllTheFixins)) metaList := block.Payload.Receipts msg := convert.ExecutionResultMetaListToMessages(metaList) diff --git a/utils/unittest/fixtures.go b/utils/unittest/fixtures.go index 8d9edb9df4b..f5d454d01b0 100644 --- a/utils/unittest/fixtures.go +++ b/utils/unittest/fixtures.go @@ -264,7 +264,10 @@ func WithAllTheFixins(payload *flow.Payload) { payload.Seals = Seal.Fixtures(3) payload.Guarantees = CollectionGuaranteesFixture(4) for i := 0; i < 10; i++ { - receipt := ExecutionReceiptFixture() + receipt := ExecutionReceiptFixture( + WithResult(ExecutionResultFixture(WithServiceEvents(3))), + WithSpocks(SignaturesFixture(3)), + ) payload.Receipts = flow.ExecutionReceiptMetaList{receipt.Meta()} payload.Results = flow.ExecutionResultList{&receipt.ExecutionResult} } @@ -713,11 +716,17 @@ func WithResult(result *flow.ExecutionResult) func(*flow.ExecutionReceipt) { } } +func WithSpocks(spocks []crypto.Signature) func(*flow.ExecutionReceipt) { + return func(receipt *flow.ExecutionReceipt) { + receipt.Spocks = spocks + } +} + func ExecutionReceiptFixture(opts ...func(*flow.ExecutionReceipt)) *flow.ExecutionReceipt { receipt := &flow.ExecutionReceipt{ ExecutorID: IdentifierFixture(), ExecutionResult: *ExecutionResultFixture(), - Spocks: SignaturesFixture(5), + Spocks: nil, ExecutorSignature: SignatureFixture(), } @@ -849,7 +858,6 @@ func ExecutionResultFixture(opts ...func(*flow.ExecutionResult)) *flow.Execution PreviousResultID: IdentifierFixture(), BlockID: IdentifierFixture(), Chunks: ChunkListFixture(2, blockID), - ServiceEvents: ServiceEventsFixture(3), ExecutionDataID: IdentifierFixture(), } From dca40dde6f9f5931a0a2ed18704ff84985a619ec Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Tue, 27 Jun 2023 21:21:40 -0700 Subject: [PATCH 5/7] fix import lint errors --- engine/common/rpc/convert/collections_test.go | 5 +++-- engine/common/rpc/convert/events_test.go | 5 +++-- engine/common/rpc/convert/execution_results_test.go | 5 +++-- engine/common/rpc/convert/transactions_test.go | 1 + 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/engine/common/rpc/convert/collections_test.go b/engine/common/rpc/convert/collections_test.go index a95f712a7b8..75ab6f25adc 100644 --- a/engine/common/rpc/convert/collections_test.go +++ b/engine/common/rpc/convert/collections_test.go @@ -3,11 +3,12 @@ package convert_test import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) // TestConvertCollection tests that converting a collection to a protobuf message results in the correct diff --git a/engine/common/rpc/convert/events_test.go b/engine/common/rpc/convert/events_test.go index f0d5a7f857d..f01942c2326 100644 --- a/engine/common/rpc/convert/events_test.go +++ b/engine/common/rpc/convert/events_test.go @@ -3,12 +3,13 @@ package convert_test import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/onflow/cadence" "github.com/onflow/cadence/encoding/ccf" jsoncdc "github.com/onflow/cadence/encoding/json" execproto "github.com/onflow/flow/protobuf/go/flow/execution" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" diff --git a/engine/common/rpc/convert/execution_results_test.go b/engine/common/rpc/convert/execution_results_test.go index 3f38a5b7e59..cce0bd175e6 100644 --- a/engine/common/rpc/convert/execution_results_test.go +++ b/engine/common/rpc/convert/execution_results_test.go @@ -3,11 +3,12 @@ package convert_test import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestConvertExecutionResult(t *testing.T) { diff --git a/engine/common/rpc/convert/transactions_test.go b/engine/common/rpc/convert/transactions_test.go index 939a214bc72..c9c5141f9a8 100644 --- a/engine/common/rpc/convert/transactions_test.go +++ b/engine/common/rpc/convert/transactions_test.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/cadence" jsoncdc "github.com/onflow/cadence/encoding/json" + "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" From 07a99194ac06bca032cd00f8119397d047f16428 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Wed, 28 Jun 2023 15:19:02 -0700 Subject: [PATCH 6/7] remove empty check for ccf encoded payloads --- engine/common/rpc/convert/events.go | 3 --- engine/common/rpc/convert/events_test.go | 10 ---------- 2 files changed, 13 deletions(-) diff --git a/engine/common/rpc/convert/events.go b/engine/common/rpc/convert/events.go index cdcce9d1e7a..d3bd469cd48 100644 --- a/engine/common/rpc/convert/events.go +++ b/engine/common/rpc/convert/events.go @@ -147,9 +147,6 @@ func MessagesToServiceEventList(m []*entities.ServiceEvent) ( // CcfPayloadToJsonPayload converts a CCF-encoded payload to a JSON-encoded payload func CcfPayloadToJsonPayload(p []byte) ([]byte, error) { - if len(p) == 0 { - return p, nil - } val, err := ccf.Decode(nil, p) if err != nil { return nil, fmt.Errorf("unable to decode from ccf format: %w", err) diff --git a/engine/common/rpc/convert/events_test.go b/engine/common/rpc/convert/events_test.go index f01942c2326..b340cbb6edb 100644 --- a/engine/common/rpc/convert/events_test.go +++ b/engine/common/rpc/convert/events_test.go @@ -78,16 +78,6 @@ func TestConvertEventWithPayloadConversion(t *testing.T) { jsonEvent.Payload, err = jsoncdc.Encode(cadenceValue) require.NoError(t, err) - t.Run("convert empty ccf payload", func(t *testing.T) { - event := unittest.EventFixture(flow.EventAccountCreated, 2, 3, txID, 0) - - msg := convert.EventToMessage(event) - convertedEvent, err := convert.MessageToEventFromVersion(msg, execproto.EventEncodingVersion_CCF_V0) - assert.NoError(t, err) - - assert.Equal(t, event, *convertedEvent) - }) - t.Run("convert empty jsoncdc payload", func(t *testing.T) { event := unittest.EventFixture(flow.EventAccountCreated, 2, 3, txID, 0) From 6191498d544b2c805c2408d1cd047bb489b6b308 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Wed, 28 Jun 2023 15:20:44 -0700 Subject: [PATCH 7/7] remove empty jsoncdc test. this is not a valid happy case --- engine/common/rpc/convert/events_test.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/engine/common/rpc/convert/events_test.go b/engine/common/rpc/convert/events_test.go index b340cbb6edb..2cf010fa011 100644 --- a/engine/common/rpc/convert/events_test.go +++ b/engine/common/rpc/convert/events_test.go @@ -78,16 +78,6 @@ func TestConvertEventWithPayloadConversion(t *testing.T) { jsonEvent.Payload, err = jsoncdc.Encode(cadenceValue) require.NoError(t, err) - t.Run("convert empty jsoncdc payload", func(t *testing.T) { - event := unittest.EventFixture(flow.EventAccountCreated, 2, 3, txID, 0) - - msg := convert.EventToMessage(event) - convertedEvent, err := convert.MessageToEventFromVersion(msg, execproto.EventEncodingVersion_JSON_CDC_V0) - assert.NoError(t, err) - - assert.Equal(t, event, *convertedEvent) - }) - t.Run("convert payload from ccf to jsoncdc", func(t *testing.T) { message := convert.EventToMessage(ccfEvent) convertedEvent, err := convert.MessageToEventFromVersion(message, execproto.EventEncodingVersion_CCF_V0)