Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lowered caplin's granularity of snashots production #12121

Merged
merged 12 commits into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 29 additions & 21 deletions cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package antiquary

import (
"context"
"fmt"
"io/ioutil"
"math"
"strings"
Expand Down Expand Up @@ -111,29 +112,32 @@ func doesSnapshotDirHaveBeaconBlocksFiles(snapshotDir string) bool {

// Antiquate is the function that starts transactions seeding and shit, very cool but very shit too as a name.
func (a *Antiquary) Loop() error {
if a.downloader == nil || !a.blocks {
if !a.blocks {
return nil // Just skip if we don't have a downloader
}
// Skip if we don't support backfilling for the current network
if !clparams.SupportBackfilling(a.cfg.DepositNetworkID) {
return nil
}
completedReply, err := a.downloader.Completed(a.ctx, &proto_downloader.CompletedRequest{})
if err != nil {
return err
}
reCheckTicker := time.NewTicker(3 * time.Second)
defer reCheckTicker.Stop()
fmt.Println("A", a.downloader == nil)
if a.downloader != nil {
completedReply, err := a.downloader.Completed(a.ctx, &proto_downloader.CompletedRequest{})
if err != nil {
return err
}
reCheckTicker := time.NewTicker(3 * time.Second)
defer reCheckTicker.Stop()

// Fist part of the antiquate is to download caplin snapshots
for (!completedReply.Completed || !doesSnapshotDirHaveBeaconBlocksFiles(a.dirs.Snap)) && !a.backfilled.Load() {
select {
case <-reCheckTicker.C:
completedReply, err = a.downloader.Completed(a.ctx, &proto_downloader.CompletedRequest{})
if err != nil {
return err
// Fist part of the antiquate is to download caplin snapshots
for (!completedReply.Completed || !doesSnapshotDirHaveBeaconBlocksFiles(a.dirs.Snap)) && !a.backfilled.Load() {
select {
case <-reCheckTicker.C:
completedReply, err = a.downloader.Completed(a.ctx, &proto_downloader.CompletedRequest{})
if err != nil {
return err
}
case <-a.ctx.Done():
}
case <-a.ctx.Done():
}
}

Expand Down Expand Up @@ -320,9 +324,11 @@ func (a *Antiquary) antiquate(from, to uint64) error {
Path: path,
}
}
// Notify bittorent to seed the new snapshots
if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil {
a.logger.Warn("[Antiquary] Failed to add items to bittorent", "err", err)
if a.downloader != nil {
// Notify bittorent to seed the new snapshots
if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil {
a.logger.Warn("[Antiquary] Failed to add items to bittorent", "err", err)
}
}

return tx.Commit()
Expand Down Expand Up @@ -398,9 +404,11 @@ func (a *Antiquary) antiquateBlobs() error {
Path: path,
}
}
// Notify bittorent to seed the new snapshots
if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil {
a.logger.Warn("[Antiquary] Failed to add items to bittorent", "err", err)
if a.downloader != nil {
// Notify bittorent to seed the new snapshots
if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil {
a.logger.Warn("[Antiquary] Failed to add items to bittorent", "err", err)
}
}

roTx, err = a.mainDB.BeginRo(a.ctx)
Expand Down
3 changes: 3 additions & 0 deletions cl/phase1/stages/stage_history_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
if speed == 0 {
continue
}
if cfg.sn != nil && cfg.sn.SegmentsMax() == 0 {
cfg.sn.ReopenFolder()
}
logArgs = append(logArgs,
"slot", currProgress,
"blockNumber", currEth1Progress.Load(),
Expand Down
6 changes: 3 additions & 3 deletions cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (c *CheckSnapshots) Run(ctx *Context) error {
return err
}

to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit
to = (to / snaptype.CaplinMergeLimit) * snaptype.CaplinMergeLimit

csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs, log.Root())
if err := csn.ReopenFolder(); err != nil {
Expand Down Expand Up @@ -512,7 +512,7 @@ func (c *LoopSnapshots) Run(ctx *Context) error {
return err
}

to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit
to = (to / snaptype.CaplinMergeLimit) * snaptype.CaplinMergeLimit

csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs, log.Root())
if err := csn.ReopenFolder(); err != nil {
Expand Down Expand Up @@ -976,7 +976,7 @@ func (c *DumpBlobsSnapshots) Run(ctx *Context) error {
to = c.To
return
})
from := ((beaconConfig.DenebForkEpoch * beaconConfig.SlotsPerEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit
from := ((beaconConfig.DenebForkEpoch * beaconConfig.SlotsPerEpoch) / snaptype.CaplinMergeLimit) * snaptype.CaplinMergeLimit

salt, err := snaptype.GetIndexSalt(dirs.Snap)

Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/chain/snapcfg/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (c Cfg) MergeLimit(t snaptype.Enum, fromBlock uint64) uint64 {
// TODO: If we add any more sharding schemes (we currently have blocks, state & beacon block schemes)
// - we may need to add some kind of sharding scheme identifier to snaptype.Type
if hasType || snaptype.IsCaplinType(t) {
return snaptype.Erigon2MergeLimit
return snaptype.CaplinMergeLimit
}

return c.MergeLimit(snaptype.MinCoreEnum, fromBlock)
Expand Down
1 change: 1 addition & 0 deletions erigon-lib/downloader/snaptype/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ const Erigon3SeedableSteps = 64
// less files - means small files will be removed after merge (no peers for this files).
const Erigon2OldMergeLimit = 500_000
const Erigon2MergeLimit = 100_000
const CaplinMergeLimit = 10_000
const Erigon2MinSegmentSize = 1_000

var MergeSteps = []uint64{100_000, 10_000}
Expand Down
7 changes: 4 additions & 3 deletions turbo/snapshotsync/freezeblocks/caplin_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ func (s *CaplinSnapshots) ReopenFolder() error {
_, fName := filepath.Split(f.Path)
list = append(list, fName)
}
fmt.Println(list)
return s.ReopenList(list, false)
}

Expand Down Expand Up @@ -542,8 +543,8 @@ func dumpBeaconBlocksRange(ctx context.Context, db kv.RoDB, fromSlot uint64, toS
return err
}
}
if sn.Count() != snaptype.Erigon2MergeLimit {
return fmt.Errorf("expected %d blocks, got %d", snaptype.Erigon2MergeLimit, sn.Count())
if sn.Count() != snaptype.CaplinMergeLimit {
return fmt.Errorf("expected %d blocks, got %d", snaptype.CaplinMergeLimit, sn.Count())
}
if err := sn.Compress(); err != nil {
return fmt.Errorf("compress: %w", err)
Expand Down Expand Up @@ -791,7 +792,7 @@ func (s *CaplinSnapshots) FrozenBlobs() uint64 {
if s.beaconCfg.DenebForkEpoch == math.MaxUint64 {
return 0
}
minSegFrom := ((s.beaconCfg.SlotsPerEpoch * s.beaconCfg.DenebForkEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit
minSegFrom := ((s.beaconCfg.SlotsPerEpoch * s.beaconCfg.DenebForkEpoch) / snaptype.CaplinMergeLimit) * snaptype.CaplinMergeLimit
foundMinSeg := false
ret := uint64(0)
for _, seg := range s.BlobSidecars.VisibleSegments {
Expand Down
Loading