Skip to content

Commit

Permalink
Bor waypoint storage (#9793)
Browse files Browse the repository at this point in the history
Implementation of db and snapshot storage for additional synced hiemdall
waypoint types

* Checkpoint
* Milestones

This is targeted at the Astrid downloader which uses waypoints to verify
headers during syncing and fork choice selection.

Post milestones for heimdall these types are currently downloaded by
erigon but not persisted locally. This change adds persistence for these
types.

In addition to the pure persistence changes this PR also contains a
refactor step which is part of the process of extracting polygon related
types from erigon core into a seperate package which may eventually be
extracted to a separate module and possibly repo.

The aim is rather than the core `turbo\snapshotsync\freezeblocks` having
to know about types it manages and how to exaract and index their
contents this can concern it self with a set of macro shard management
actions.

This process is partially completed by this PR, a final step will be to
remove BorSnapshots and to simplify the places in the code which has to
remeber to deal with them. This requires further testing so has been
left out of this PR to avoid delays in delivering the base types.

# Status

* Waypont types and storage are complete and integrated in to the
BorHeimdall stage, The code has been tested to check that types are
inserted into mdbx, extracted and merged correctly
* I have verified that when produced from block 0 the new snapshot
correctly follow the merging strategy of existing snapshots
* The functionality is enables by a **--bor.waypoints=true** this is
false by default.

# Testing

This has been tested as follows:

* Run a Mumbai instance to the tip and check current processing for
milestones and checkpoints

# Post merge steps

* Produce and release snapshots for mumbai and bor mainnet
* Check existing node upgrades
* Remove --bor.waypoints flags
  • Loading branch information
mh0lt authored Apr 29, 2024
1 parent 287e4a2 commit 714c259
Show file tree
Hide file tree
Showing 73 changed files with 2,364 additions and 1,346 deletions.
14 changes: 12 additions & 2 deletions cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,12 @@ func (c *DumpSnapshots) Run(ctx *Context) error {
return
})

salt := freezeblocks.GetIndicesSalt(dirs.Snap)
salt, err := snaptype.GetIndexSalt(dirs.Snap)

if err != nil {
return err
}

return freezeblocks.DumpBeaconBlocks(ctx, db, 0, to, salt, dirs, estimate.CompressSnapshot.Workers(), log.LvlInfo, log.Root())
}

Expand Down Expand Up @@ -977,7 +982,12 @@ func (c *DumpBlobsSnapshots) Run(ctx *Context) error {
})
from := ((beaconConfig.DenebForkEpoch * beaconConfig.SlotsPerEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit

salt := freezeblocks.GetIndicesSalt(dirs.Snap)
salt, err := snaptype.GetIndexSalt(dirs.Snap)

if err != nil {
return err
}

return freezeblocks.DumpBlobsSidecar(ctx, blobStorage, db, from, to, salt, dirs, estimate.CompressSnapshot.Workers(), log.LvlInfo, log.Root())
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/devnet/devnetutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dir"

"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/log/v3"
Expand All @@ -23,7 +24,7 @@ var ErrInvalidEnodeString = errors.New("invalid enode string")
func ClearDevDB(dataDir string, logger log.Logger) error {
logger.Info("Deleting nodes' data folders")

files, err := os.ReadDir(dataDir)
files, err := dir.ReadDir(dataDir)

if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion cmd/devnet/services/polygon/proofgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"

"github.com/ledgerwatch/erigon/cl/merkle_tree"
bortypes "github.com/ledgerwatch/erigon/polygon/bor/types"
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon-lib/chain/networkname"
Expand Down Expand Up @@ -264,7 +265,7 @@ type receiptProof struct {
}

func getReceiptProof(ctx context.Context, node requests.RequestGenerator, receipt *types.Receipt, block *requests.Block, receipts []*types.Receipt) (*receiptProof, error) {
stateSyncTxHash := types.ComputeBorTxHash(block.Number.Uint64(), block.Hash)
stateSyncTxHash := bortypes.ComputeBorTxHash(block.Number.Uint64(), block.Hash)
receiptsTrie := trie.New(trie.EmptyRoot)

if len(receipts) == 0 {
Expand Down
1 change: 1 addition & 0 deletions cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
pruneHBefore, pruneRBefore uint64
pruneTBefore, pruneCBefore uint64
experiments []string
unwindTypes []string
chain string // Which chain to use (mainnet, goerli, sepolia, etc.)
outputCsvFile string

Expand Down
11 changes: 6 additions & 5 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ var cmdStageBorHeimdall = &cobra.Command{
}
defer db.Close()

if err := stageBorHeimdall(db, cmd.Context(), logger); err != nil {
if err := stageBorHeimdall(db, cmd.Context(), unwindTypes, logger); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error(err.Error())
}
Expand Down Expand Up @@ -711,6 +711,7 @@ func init() {
cmdSetPrune.Flags().Uint64Var(&pruneTBefore, "prune.t.before", 0, "")
cmdSetPrune.Flags().Uint64Var(&pruneCBefore, "prune.c.before", 0, "")
cmdSetPrune.Flags().StringSliceVar(&experiments, "experiments", nil, "Storage mode to override database")
cmdSetPrune.Flags().StringSliceVar(&unwindTypes, "unwind.types", nil, "Types to unwind for bor heimdall")
rootCmd.AddCommand(cmdSetPrune)
}

Expand Down Expand Up @@ -821,7 +822,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
})
}

func stageBorHeimdall(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, logger log.Logger) error {
engine, _, sync, _, miningState := newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig := fromdb.ChainConfig(db)

Expand Down Expand Up @@ -852,7 +853,7 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, logger log.Logger) error
}

unwindState := sync.NewUnwindState(stages.BorHeimdall, stageState.BlockNumber-unwind, stageState.BlockNumber)
cfg := stagedsync.StageBorHeimdallCfg(db, nil, miningState, *chainConfig, nil, nil, nil, nil, nil, nil, nil)
cfg := stagedsync.StageBorHeimdallCfg(db, nil, miningState, *chainConfig, nil, nil, nil, nil, nil, nil, nil, false, unwindTypes)
if err := stagedsync.BorHeimdallUnwind(unwindState, ctx, stageState, tx, cfg); err != nil {
return err
}
Expand Down Expand Up @@ -881,7 +882,7 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, logger log.Logger) error
recents = bor.Recents
signatures = bor.Signatures
}
cfg := stagedsync.StageBorHeimdallCfg(db, snapDb, miningState, *chainConfig, heimdallClient, blockReader, nil, nil, nil, recents, signatures)
cfg := stagedsync.StageBorHeimdallCfg(db, snapDb, miningState, *chainConfig, heimdallClient, blockReader, nil, nil, nil, recents, signatures, false, unwindTypes)

stageState := stage(sync, tx, nil, stages.BorHeimdall)
if err := stagedsync.BorHeimdallForward(stageState, sync, ctx, tx, cfg, logger); err != nil {
Expand Down Expand Up @@ -1670,7 +1671,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
cfg.Sync,
stagedsync.MiningStages(ctx,
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, dirs.Tmp, blockReader),
stagedsync.StageBorHeimdallCfg(db, snapDb, miner, *chainConfig, heimdallClient, blockReader, nil, nil, nil, recents, signatures),
stagedsync.StageBorHeimdallCfg(db, snapDb, miner, *chainConfig, heimdallClient, blockReader, nil, nil, nil, recents, signatures, false, unwindTypes),
stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, dirs.Tmp, nil, 0, nil, nil, blockReader),
stagedsync.StageHashStateCfg(db, dirs, historyV3),
stagedsync.StageTrieCfg(db, false, true, false, dirs.Tmp, blockReader, nil, historyV3, agg),
Expand Down
4 changes: 4 additions & 0 deletions cmd/rpcdaemon/rpcservices/eth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/types/known/emptypb"

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/ledgerwatch/erigon/turbo/services"
)

var _ services.FullBlockReader = &RemoteBackend{}

type RemoteBackend struct {
remoteEthBackend remote.ETHBACKENDClient
log log.Logger
Expand Down Expand Up @@ -92,6 +95,7 @@ func (back *RemoteBackend) BlockByHash(ctx context.Context, db kv.Tx, hash commo
func (back *RemoteBackend) TxsV3Enabled() bool { panic("not implemented") }
func (back *RemoteBackend) Snapshots() services.BlockSnapshots { panic("not implemented") }
func (back *RemoteBackend) BorSnapshots() services.BlockSnapshots { panic("not implemented") }
func (back *RemoteBackend) AllTypes() []snaptype.Type { panic("not implemented") }
func (back *RemoteBackend) FrozenBlocks() uint64 { return back.blockReader.FrozenBlocks() }
func (back *RemoteBackend) FrozenBorBlocks() uint64 { return back.blockReader.FrozenBorBlocks() }
func (back *RemoteBackend) FrozenFiles() (list []string) { return back.blockReader.FrozenFiles() }
Expand Down
19 changes: 2 additions & 17 deletions cmd/silkworm_api/snapshot_idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func buildIndex(cliCtx *cli.Context, dataDir string, snapshotPaths []string, min
g.SetLimit(workers)

dirs := datadir.New(dataDir)
salt := freezeblocks.GetIndicesSalt(dirs.Snap)

chainDB := mdbx.NewMDBX(logger).Path(dirs.Chaindata).MustOpen()
defer chainDB.Close()
Expand All @@ -92,26 +91,12 @@ func buildIndex(cliCtx *cli.Context, dataDir string, snapshotPaths []string, min
}

switch segment.Type.Enum() {
case snaptype.Enums.Headers:
case snaptype.Enums.Headers, snaptype.Enums.Bodies, snaptype.Enums.Transactions:
g.Go(func() error {
jobProgress := &background.Progress{}
ps.Add(jobProgress)
defer ps.Delete(jobProgress)
return freezeblocks.HeadersIdx(ctx, segment, salt, dirs.Tmp, jobProgress, logLevel, logger)
})
case snaptype.Enums.Bodies:
g.Go(func() error {
jobProgress := &background.Progress{}
ps.Add(jobProgress)
defer ps.Delete(jobProgress)
return freezeblocks.BodiesIdx(ctx, segment, salt, dirs.Tmp, jobProgress, logLevel, logger)
})
case snaptype.Enums.Transactions:
g.Go(func() error {
jobProgress := &background.Progress{}
ps.Add(jobProgress)
defer ps.Delete(jobProgress)
return freezeblocks.TransactionsIdx(ctx, chainConfig, segment, salt, dirs.Tmp, jobProgress, logLevel, logger)
return segment.Type.BuildIndexes(ctx, segment, chainConfig, dirs.Tmp, jobProgress, logLevel, logger)
})
}
}
Expand Down
14 changes: 6 additions & 8 deletions cmd/snapshots/cmp/cmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ledgerwatch/erigon/cmd/snapshots/flags"
"github.com/ledgerwatch/erigon/cmd/snapshots/sync"
"github.com/ledgerwatch/erigon/cmd/utils"
coresnaptype "github.com/ledgerwatch/erigon/core/snaptype"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/params"
Expand Down Expand Up @@ -614,8 +615,8 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en
}()

logger.Info(fmt.Sprintf("Indexing %s", ent1.Body.Name()))
salt := freezeblocks.GetIndicesSalt(info.Dir())
return freezeblocks.BodiesIdx(ctx, info, salt, c.session1.LocalFsRoot(), nil, log.LvlDebug, logger)

return coresnaptype.Bodies.BuildIndexes(ctx, info, c.chainConfig(), c.session1.LocalFsRoot(), nil, log.LvlDebug, logger)
})

g.Go(func() error {
Expand Down Expand Up @@ -653,8 +654,7 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en
}()

logger.Info(fmt.Sprintf("Indexing %s", ent1.Transactions.Name()))
salt := freezeblocks.GetIndicesSalt(info.Dir())
return freezeblocks.TransactionsIdx(ctx, c.chainConfig(), info, salt, c.session1.LocalFsRoot(), nil, log.LvlDebug, logger)
return coresnaptype.Transactions.BuildIndexes(ctx, info, c.chainConfig(), c.session1.LocalFsRoot(), nil, log.LvlDebug, logger)
})

b2err := make(chan error, 1)
Expand Down Expand Up @@ -690,8 +690,7 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en
}()

logger.Info(fmt.Sprintf("Indexing %s", ent2.Body.Name()))
salt := freezeblocks.GetIndicesSalt(info.Dir())
return freezeblocks.BodiesIdx(ctx, info, salt, c.session1.LocalFsRoot(), nil, log.LvlDebug, logger)
return coresnaptype.Bodies.BuildIndexes(ctx, info, c.chainConfig(), c.session1.LocalFsRoot(), nil, log.LvlDebug, logger)
})

g.Go(func() error {
Expand Down Expand Up @@ -732,8 +731,7 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en
}()

logger.Info(fmt.Sprintf("Indexing %s", ent2.Transactions.Name()))
salt := freezeblocks.GetIndicesSalt(info.Dir())
return freezeblocks.TransactionsIdx(ctx, c.chainConfig(), info, salt, c.session2.LocalFsRoot(), nil, log.LvlDebug, logger)
return coresnaptype.Transactions.BuildIndexes(ctx, info, c.chainConfig(), c.session2.LocalFsRoot(), nil, log.LvlDebug, logger)
})

if err := g.Wait(); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,12 @@ var (
Value: true,
}

WithHeimdallWaypoints = cli.BoolFlag{
Name: "bor.waypoints",
Usage: "Enabling bor waypont recording",
Value: false,
}

PolygonSyncFlag = cli.BoolFlag{
Name: "polygon.sync",
Usage: "Enabling syncing using the new polygon sync component",
Expand Down Expand Up @@ -1580,6 +1586,7 @@ func setBorConfig(ctx *cli.Context, cfg *ethconfig.Config) {
cfg.HeimdallURL = ctx.String(HeimdallURLFlag.Name)
cfg.WithoutHeimdall = ctx.Bool(WithoutHeimdallFlag.Name)
cfg.WithHeimdallMilestones = ctx.Bool(WithHeimdallMilestones.Name)
cfg.WithHeimdallWaypointRecording = ctx.Bool(WithHeimdallWaypoints.Name)
cfg.PolygonSync = ctx.Bool(PolygonSyncFlag.Name)
}

Expand Down
3 changes: 2 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/core/vm/evmtypes"
"github.com/ledgerwatch/erigon/eth/ethutils"
bortypes "github.com/ledgerwatch/erigon/polygon/bor/types"
"github.com/ledgerwatch/erigon/rlp"
)

Expand Down Expand Up @@ -192,7 +193,7 @@ func ExecuteBlockEphemerally(
stateSyncReceipt.Logs = blockLogs[len(logs):] // get state-sync logs from `state.Logs()`

// fill the state sync with the correct information
types.DeriveFieldsForBorReceipt(stateSyncReceipt, block.Hash(), block.NumberU64(), receipts)
bortypes.DeriveFieldsForBorReceipt(stateSyncReceipt, block.Hash(), block.NumberU64(), receipts)
stateSyncReceipt.Status = types.ReceiptStatusSuccessful
}
}
Expand Down
53 changes: 53 additions & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
Expand All @@ -41,6 +42,7 @@ import (

"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/cbor"
"github.com/ledgerwatch/erigon/polygon/heimdall"
"github.com/ledgerwatch/erigon/rlp"
)

Expand Down Expand Up @@ -1150,6 +1152,57 @@ func PruneBorBlocks(tx kv.RwTx, blockTo uint64, blocksDeleteLimit int, SpanIdAt
}
counter--
}

checkpointCursor, err := tx.RwCursor(kv.BorCheckpoints)
if err != nil {
return err
}

defer checkpointCursor.Close()
lastCheckpointToRemove, err := heimdall.CheckpointIdAt(tx, blockTo)

if err != nil {
return err
}

var checkpointIdBytes [8]byte
binary.BigEndian.PutUint64(checkpointIdBytes[:], uint64(lastCheckpointToRemove))
for k, _, err := checkpointCursor.Seek(checkpointIdBytes[:]); err == nil && k != nil; k, _, err = checkpointCursor.Prev() {
if err = checkpointCursor.DeleteCurrent(); err != nil {
return err
}
}

milestoneCursor, err := tx.RwCursor(kv.BorMilestones)

if err != nil {
return err
}

defer milestoneCursor.Close()

var lastMilestoneToRemove heimdall.MilestoneId

for blockCount := 1; err != nil && blockCount < blocksDeleteLimit; blockCount++ {
lastMilestoneToRemove, err = heimdall.MilestoneIdAt(tx, blockTo-uint64(blockCount))

if !errors.Is(err, heimdall.ErrMilestoneNotFound) {
return err
} else {
if blockCount == blocksDeleteLimit-1 {
return nil
}
}
}

var milestoneIdBytes [8]byte
binary.BigEndian.PutUint64(milestoneIdBytes[:], uint64(lastMilestoneToRemove))
for k, _, err := milestoneCursor.Seek(milestoneIdBytes[:]); err == nil && k != nil; k, _, err = milestoneCursor.Prev() {
if err = milestoneCursor.DeleteCurrent(); err != nil {
return err
}
}

return nil
}

Expand Down
7 changes: 4 additions & 3 deletions core/rawdb/bor_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/dbutils"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/cbor"
bortypes "github.com/ledgerwatch/erigon/polygon/bor/types"
"github.com/ledgerwatch/erigon/rlp"
)

var (
// bor receipt key
borReceiptKey = types.BorReceiptKey
borReceiptKey = bortypes.BorReceiptKey
)

// HasBorReceipts verifies the existence of all block receipt belonging to a block.
Expand Down Expand Up @@ -78,7 +79,7 @@ func ReadBorReceipt(db kv.Tx, blockHash libcommon.Hash, blockNumber uint64, rece
}
}

types.DeriveFieldsForBorReceipt(borReceipt, blockHash, blockNumber, receipts)
bortypes.DeriveFieldsForBorReceipt(borReceipt, blockHash, blockNumber, receipts)

return borReceipt, nil
}
Expand Down Expand Up @@ -126,7 +127,7 @@ func ReadBorTransactionForBlock(db kv.Tx, blockNum uint64) types.Transaction {
if !HasBorReceipts(db, blockNum) {
return nil
}
return types.NewBorTransaction()
return bortypes.NewBorTransaction()
}

// TruncateBorReceipts removes all bor receipt for given block number or newer
Expand Down
Loading

0 comments on commit 714c259

Please sign in to comment.