Skip to content

Commit

Permalink
Merge pull request #5386 from onflow/fxamacker/extract-payloads-from-…
Browse files Browse the repository at this point in the history
…state-extraction

Optimize migration by adding ability to read or extract payloads from state
  • Loading branch information
fxamacker authored Feb 22, 2024
2 parents 27793f4 + 3c2d313 commit cb9a2f3
Show file tree
Hide file tree
Showing 7 changed files with 1,654 additions and 50 deletions.
157 changes: 138 additions & 19 deletions cmd/util/cmd/execution-state-extract/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package extract

import (
"encoding/hex"
"fmt"
"os"
"path"
"strings"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

runtimeCommon "github.com/onflow/cadence/runtime/common"

"github.com/onflow/flow-go/cmd/util/cmd/common"
"github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -26,6 +31,9 @@ var (
flagNoReport bool
flagValidateMigration bool
flagLogVerboseValidationError bool
flagInputPayloadFileName string
flagOutputPayloadFileName string
flagOutputPayloadByAddresses string
)

var Cmd = &cobra.Command{
Expand Down Expand Up @@ -68,6 +76,35 @@ func init() {
Cmd.Flags().BoolVar(&flagLogVerboseValidationError, "log-verbose-validation-error", false,
"log entire Cadence values on validation error (atree migration)")

// If specified, the state will consist of payloads from the given input payload file.
// If not specified, then the state will be extracted from the latest checkpoint file.
// This flag can be used to reduce total duration of migrations when state extraction involves
// multiple migrations because it helps avoid repeatedly reading from checkpoint file to rebuild trie.
// The input payload file must be created by state extraction running with either
// flagOutputPayloadFileName or flagOutputPayloadByAddresses.
Cmd.Flags().StringVar(
&flagInputPayloadFileName,
"input-payload-filename",
"",
"input payload file",
)

Cmd.Flags().StringVar(
&flagOutputPayloadFileName,
"output-payload-filename",
"",
"output payload file",
)

Cmd.Flags().StringVar(
// Extract payloads of specified addresses (comma separated list of hex-encoded addresses)
// to file specified by --output-payload-filename.
// If no address is specified (empty string) then this flag is ignored.
&flagOutputPayloadByAddresses,
"extract-payloads-by-address",
"",
"extract payloads of addresses (comma separated hex-encoded addresses) to file specified by output-payload-filename",
)
}

func run(*cobra.Command, []string) {
Expand All @@ -78,6 +115,19 @@ func run(*cobra.Command, []string) {
return
}

if len(flagBlockHash) == 0 && len(flagStateCommitment) == 0 && len(flagInputPayloadFileName) == 0 {
log.Fatal().Msg("--block-hash or --state-commitment or --input-payload-filename must be specified")
}

if len(flagInputPayloadFileName) > 0 && (len(flagBlockHash) > 0 || len(flagStateCommitment) > 0) {
log.Fatal().Msg("--input-payload-filename cannot be used with --block-hash or --state-commitment")
}

// When flagOutputPayloadByAddresses is specified, flagOutputPayloadFileName is required.
if len(flagOutputPayloadFileName) == 0 && len(flagOutputPayloadByAddresses) > 0 {
log.Fatal().Msg("--extract-payloads-by-address requires --output-payload-filename to be specified")
}

if len(flagBlockHash) > 0 {
blockID, err := flow.HexStringToIdentifier(flagBlockHash)
if err != nil {
Expand Down Expand Up @@ -112,20 +162,38 @@ func run(*cobra.Command, []string) {
log.Info().Msgf("extracting state by state commitment: %x", stateCommitment)
}

if len(flagBlockHash) == 0 && len(flagStateCommitment) == 0 {
log.Fatal().Msg("no --block-hash or --state-commitment was specified")
if len(flagInputPayloadFileName) > 0 {
if _, err := os.Stat(flagInputPayloadFileName); os.IsNotExist(err) {
log.Fatal().Msgf("payload input file %s doesn't exist", flagInputPayloadFileName)
}
}

log.Info().Msgf("Extracting state from %s, exporting root checkpoint to %s, version: %v",
flagExecutionStateDir,
path.Join(flagOutputDir, bootstrap.FilenameWALRootCheckpoint),
6,
)
if len(flagOutputPayloadFileName) > 0 {
if _, err := os.Stat(flagOutputPayloadFileName); os.IsExist(err) {
log.Fatal().Msgf("payload output file %s exists", flagOutputPayloadFileName)
}
}

var exportedAddresses []runtimeCommon.Address

if len(flagOutputPayloadByAddresses) > 0 {

addresses := strings.Split(flagOutputPayloadByAddresses, ",")

log.Info().Msgf("Block state commitment: %s from %v, output dir: %s",
hex.EncodeToString(stateCommitment[:]),
flagExecutionStateDir,
flagOutputDir)
for _, hexAddr := range addresses {
b, err := hex.DecodeString(strings.TrimSpace(hexAddr))
if err != nil {
log.Fatal().Err(err).Msgf("cannot hex decode address %s for payload export", strings.TrimSpace(hexAddr))
}

addr, err := runtimeCommon.BytesToAddress(b)
if err != nil {
log.Fatal().Err(err).Msgf("cannot decode address %x for payload export", b)
}

exportedAddresses = append(exportedAddresses, addr)
}
}

// err := ensureCheckpointFileExist(flagExecutionStateDir)
// if err != nil {
Expand All @@ -148,14 +216,65 @@ func run(*cobra.Command, []string) {
log.Warn().Msgf("atree migration has verbose validation error logging enabled which may increase size of log")
}

err := extractExecutionState(
log.Logger,
flagExecutionStateDir,
stateCommitment,
flagOutputDir,
flagNWorker,
!flagNoMigration,
)
var inputMsg string
if len(flagInputPayloadFileName) > 0 {
// Input is payloads
inputMsg = fmt.Sprintf("reading payloads from %s", flagInputPayloadFileName)
} else {
// Input is execution state
inputMsg = fmt.Sprintf("reading block state commitment %s from %s",
hex.EncodeToString(stateCommitment[:]),
flagExecutionStateDir,
)
}

var outputMsg string
if len(flagOutputPayloadFileName) > 0 {
// Output is payload file
if len(exportedAddresses) == 0 {
outputMsg = fmt.Sprintf("exporting all payloads to %s", flagOutputPayloadFileName)
} else {
outputMsg = fmt.Sprintf(
"exporting payloads by addresses %v to %s",
flagOutputPayloadByAddresses,
flagOutputPayloadFileName,
)
}
} else {
// Output is checkpoint files
outputMsg = fmt.Sprintf(
"exporting root checkpoint to %s, version: %d",
path.Join(flagOutputDir, bootstrap.FilenameWALRootCheckpoint),
6,
)
}

log.Info().Msgf("state extraction plan: %s, %s", inputMsg, outputMsg)

var err error
if len(flagInputPayloadFileName) > 0 {
err = extractExecutionStateFromPayloads(
log.Logger,
flagExecutionStateDir,
flagOutputDir,
flagNWorker,
!flagNoMigration,
flagInputPayloadFileName,
flagOutputPayloadFileName,
exportedAddresses,
)
} else {
err = extractExecutionState(
log.Logger,
flagExecutionStateDir,
stateCommitment,
flagOutputDir,
flagNWorker,
!flagNoMigration,
flagOutputPayloadFileName,
exportedAddresses,
)
}

if err != nil {
log.Fatal().Err(err).Msgf("error extracting the execution state: %s", err.Error())
Expand Down
Loading

0 comments on commit cb9a2f3

Please sign in to comment.