diff --git a/cl/antiquary/antiquary.go b/cl/antiquary/antiquary.go index d6013c59c59..d8f09bde2bb 100644 --- a/cl/antiquary/antiquary.go +++ b/cl/antiquary/antiquary.go @@ -111,29 +111,31 @@ func doesSnapshotDirHaveBeaconBlocksFiles(snapshotDir string) bool { // Antiquate is the function that starts transactions seeding and shit, very cool but very shit too as a name. func (a *Antiquary) Loop() error { - if a.downloader == nil || !a.blocks { + if !a.blocks { return nil // Just skip if we don't have a downloader } // Skip if we don't support backfilling for the current network if !clparams.SupportBackfilling(a.cfg.DepositNetworkID) { return nil } - completedReply, err := a.downloader.Completed(a.ctx, &proto_downloader.CompletedRequest{}) - if err != nil { - return err - } - reCheckTicker := time.NewTicker(3 * time.Second) - defer reCheckTicker.Stop() + if a.downloader != nil { + completedReply, err := a.downloader.Completed(a.ctx, &proto_downloader.CompletedRequest{}) + if err != nil { + return err + } + reCheckTicker := time.NewTicker(3 * time.Second) + defer reCheckTicker.Stop() - // Fist part of the antiquate is to download caplin snapshots - for (!completedReply.Completed || !doesSnapshotDirHaveBeaconBlocksFiles(a.dirs.Snap)) && !a.backfilled.Load() { - select { - case <-reCheckTicker.C: - completedReply, err = a.downloader.Completed(a.ctx, &proto_downloader.CompletedRequest{}) - if err != nil { - return err + // Fist part of the antiquate is to download caplin snapshots + for (!completedReply.Completed || !doesSnapshotDirHaveBeaconBlocksFiles(a.dirs.Snap)) && !a.backfilled.Load() { + select { + case <-reCheckTicker.C: + completedReply, err = a.downloader.Completed(a.ctx, &proto_downloader.CompletedRequest{}) + if err != nil { + return err + } + case <-a.ctx.Done(): } - case <-a.ctx.Done(): } } @@ -320,9 +322,11 @@ func (a *Antiquary) antiquate(from, to uint64) error { Path: path, } } - // Notify bittorent to seed the new snapshots - if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil { - a.logger.Warn("[Antiquary] Failed to add items to bittorent", "err", err) + if a.downloader != nil { + // Notify bittorent to seed the new snapshots + if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil { + a.logger.Warn("[Antiquary] Failed to add items to bittorent", "err", err) + } } return tx.Commit() @@ -398,9 +402,11 @@ func (a *Antiquary) antiquateBlobs() error { Path: path, } } - // Notify bittorent to seed the new snapshots - if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil { - a.logger.Warn("[Antiquary] Failed to add items to bittorent", "err", err) + if a.downloader != nil { + // Notify bittorent to seed the new snapshots + if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil { + a.logger.Warn("[Antiquary] Failed to add items to bittorent", "err", err) + } } roTx, err = a.mainDB.BeginRo(a.ctx) diff --git a/cl/phase1/stages/stage_history_download.go b/cl/phase1/stages/stage_history_download.go index d24b86a5b8b..08d5f738d27 100644 --- a/cl/phase1/stages/stage_history_download.go +++ b/cl/phase1/stages/stage_history_download.go @@ -196,6 +196,9 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co if speed == 0 { continue } + if cfg.sn != nil && cfg.sn.SegmentsMax() == 0 { + cfg.sn.ReopenFolder() + } logArgs = append(logArgs, "slot", currProgress, "blockNumber", currEth1Progress.Load(), diff --git a/cmd/capcli/cli.go b/cmd/capcli/cli.go index 3496925a69d..f439afbf6dd 100644 --- a/cmd/capcli/cli.go +++ b/cmd/capcli/cli.go @@ -427,7 +427,7 @@ func (c *CheckSnapshots) Run(ctx *Context) error { return err } - to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit + to = (to / snaptype.CaplinMergeLimit) * snaptype.CaplinMergeLimit csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs, log.Root()) if err := csn.ReopenFolder(); err != nil { @@ -512,7 +512,7 @@ func (c *LoopSnapshots) Run(ctx *Context) error { return err } - to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit + to = (to / snaptype.CaplinMergeLimit) * snaptype.CaplinMergeLimit csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs, log.Root()) if err := csn.ReopenFolder(); err != nil { @@ -976,7 +976,7 @@ func (c *DumpBlobsSnapshots) Run(ctx *Context) error { to = c.To return }) - from := ((beaconConfig.DenebForkEpoch * beaconConfig.SlotsPerEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit + from := ((beaconConfig.DenebForkEpoch * beaconConfig.SlotsPerEpoch) / snaptype.CaplinMergeLimit) * snaptype.CaplinMergeLimit salt, err := snaptype.GetIndexSalt(dirs.Snap) diff --git a/erigon-lib/chain/snapcfg/util.go b/erigon-lib/chain/snapcfg/util.go index 7ad182c3b50..b011a2fde23 100644 --- a/erigon-lib/chain/snapcfg/util.go +++ b/erigon-lib/chain/snapcfg/util.go @@ -407,7 +407,10 @@ func (c Cfg) MergeLimit(t snaptype.Enum, fromBlock uint64) uint64 { // not the same as other snapshots which follow a block based sharding scheme // TODO: If we add any more sharding schemes (we currently have blocks, state & beacon block schemes) // - we may need to add some kind of sharding scheme identifier to snaptype.Type - if hasType || snaptype.IsCaplinType(t) { + if hasType { + if snaptype.IsCaplinType(t) { + return snaptype.CaplinMergeLimit + } return snaptype.Erigon2MergeLimit } diff --git a/erigon-lib/downloader/snaptype/files.go b/erigon-lib/downloader/snaptype/files.go index 85f7cbd34c7..b835167e123 100644 --- a/erigon-lib/downloader/snaptype/files.go +++ b/erigon-lib/downloader/snaptype/files.go @@ -232,6 +232,7 @@ const Erigon3SeedableSteps = 64 // less files - means small files will be removed after merge (no peers for this files). const Erigon2OldMergeLimit = 500_000 const Erigon2MergeLimit = 100_000 +const CaplinMergeLimit = 10_000 const Erigon2MinSegmentSize = 1_000 var MergeSteps = []uint64{100_000, 10_000} diff --git a/turbo/snapshotsync/freezeblocks/caplin_snapshots.go b/turbo/snapshotsync/freezeblocks/caplin_snapshots.go index 3391f07a170..28d7f744a34 100644 --- a/turbo/snapshotsync/freezeblocks/caplin_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/caplin_snapshots.go @@ -542,8 +542,8 @@ func dumpBeaconBlocksRange(ctx context.Context, db kv.RoDB, fromSlot uint64, toS return err } } - if sn.Count() != snaptype.Erigon2MergeLimit { - return fmt.Errorf("expected %d blocks, got %d", snaptype.Erigon2MergeLimit, sn.Count()) + if sn.Count() != snaptype.CaplinMergeLimit { + return fmt.Errorf("expected %d blocks, got %d", snaptype.CaplinMergeLimit, sn.Count()) } if err := sn.Compress(); err != nil { return fmt.Errorf("compress: %w", err) @@ -791,7 +791,7 @@ func (s *CaplinSnapshots) FrozenBlobs() uint64 { if s.beaconCfg.DenebForkEpoch == math.MaxUint64 { return 0 } - minSegFrom := ((s.beaconCfg.SlotsPerEpoch * s.beaconCfg.DenebForkEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit + minSegFrom := ((s.beaconCfg.SlotsPerEpoch * s.beaconCfg.DenebForkEpoch) / snaptype.CaplinMergeLimit) * snaptype.CaplinMergeLimit foundMinSeg := false ret := uint64(0) for _, seg := range s.BlobSidecars.VisibleSegments {