Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caplin: fix goddamn forward sync #13831

Merged
merged 1 commit into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions cl/phase1/network/beacon_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type peerAndBlocks struct {
}

func (f *ForwardBeaconDownloader) RequestMore(ctx context.Context) {
count := uint64(32)
count := uint64(16)
var atomicResp atomic.Value
atomicResp.Store(peerAndBlocks{})
reqInterval := time.NewTicker(300 * time.Millisecond)
Expand All @@ -96,11 +96,14 @@ Loop:
}
// double the request count every 10 seconds. This is inspired by the mekong network, which has many consecutive missing blocks.
reqCount := count
if !f.highestSlotUpdateTime.IsZero() {
multiplier := int(time.Since(f.highestSlotUpdateTime).Seconds()) / 10
multiplier = min(multiplier, 6)
reqCount *= uint64(1 << uint(multiplier))
}
// NEED TO COMMENT THIS BC IT CAUSES ISSUES ON MAINNET

// if !f.highestSlotUpdateTime.IsZero() {
// multiplier := int(time.Since(f.highestSlotUpdateTime).Seconds()) / 10
// multiplier = min(multiplier, 6)
// reqCount *= uint64(1 << uint(multiplier))
// }

// leave a warning if we are stuck for more than 90 seconds
if time.Since(f.highestSlotUpdateTime) > 90*time.Second {
log.Trace("Forward beacon downloader gets stuck", "time", time.Since(f.highestSlotUpdateTime).Seconds(), "highestSlotProcessed", f.highestSlotProcessed)
Expand Down
51 changes: 41 additions & 10 deletions cl/phase1/stages/forward_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ func shouldProcessBlobs(blocks []*cltypes.SignedBeaconBlock, cfg *Cfg) bool {
}
// Check if the requested blocks are too old to request blobs
// https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#the-reqresp-domain
highestEpoch := highestSlot / cfg.beaconCfg.SlotsPerEpoch
currentEpoch := cfg.ethClock.GetCurrentEpoch()
minEpochDist := uint64(0)
if currentEpoch > cfg.beaconCfg.MinEpochsForBlobSidecarsRequests {
minEpochDist = currentEpoch - cfg.beaconCfg.MinEpochsForBlobSidecarsRequests
}
finalizedEpoch := currentEpoch - 2
if highestEpoch < max(cfg.beaconCfg.DenebForkEpoch, minEpochDist, finalizedEpoch) {
return false
}

// this is bad
// highestEpoch := highestSlot / cfg.beaconCfg.SlotsPerEpoch
// currentEpoch := cfg.ethClock.GetCurrentEpoch()
// minEpochDist := uint64(0)
// if currentEpoch > cfg.beaconCfg.MinEpochsForBlobSidecarsRequests {
// minEpochDist = currentEpoch - cfg.beaconCfg.MinEpochsForBlobSidecarsRequests
// }
// finalizedEpoch := currentEpoch - 2
// if highestEpoch < max(cfg.beaconCfg.DenebForkEpoch, minEpochDist, finalizedEpoch) {
// return false
// }

return blobsExist
}
Expand All @@ -67,6 +69,7 @@ func downloadAndProcessEip4844DA(ctx context.Context, logger log.Logger, cfg *Cf
err = fmt.Errorf("failed to get blob identifiers: %w", err)
return
}

// If there are no blobs to retrieve, return the highest slot processed
if ids.Len() == 0 {
return highestSlotProcessed, nil
Expand Down Expand Up @@ -98,6 +101,30 @@ func downloadAndProcessEip4844DA(ctx context.Context, logger log.Logger, cfg *Cf
return highestProcessed - 1, err
}

func filterUnneededBlocks(ctx context.Context, blocks []*cltypes.SignedBeaconBlock, cfg *Cfg) []*cltypes.SignedBeaconBlock {
filtered := make([]*cltypes.SignedBeaconBlock, 0, len(blocks))
// Find the latest block in the list
for _, block := range blocks {
blockRoot, err := block.Block.HashSSZ()
if err != nil {
panic(err)
}
_, hasInFcu := cfg.forkChoice.GetHeader(blockRoot)

var hasSignedHeaderInDB bool
if err = cfg.indiciesDB.View(ctx, func(tx kv.Tx) error {
_, hasSignedHeaderInDB, err = beacon_indicies.ReadSignedHeaderByBlockRoot(ctx, tx, blockRoot)
return err
}); err != nil {
panic(err)
}
if !hasInFcu || !hasSignedHeaderInDB {
filtered = append(filtered, block)
}
}
return filtered
}

// processDownloadedBlockBatches processes a batch of downloaded blocks.
// It takes the highest block processed, a flag to determine if insertion is needed, and a list of signed beacon blocks as input.
// It returns the new highest block processed and an error if any.
Expand All @@ -107,6 +134,9 @@ func processDownloadedBlockBatches(ctx context.Context, logger log.Logger, cfg *
return blocks[i].Block.Slot < blocks[j].Block.Slot
})

// Filter out blocks that are already in the FCU or have a signed header in the DB
blocks = filterUnneededBlocks(ctx, blocks, cfg)

var (
blockRoot common.Hash
st *state.CachingBeaconState
Expand Down Expand Up @@ -141,6 +171,7 @@ func processDownloadedBlockBatches(ctx context.Context, logger log.Logger, cfg *

// Process the block
if err = processBlock(ctx, cfg, cfg.indiciesDB, block, false, true, true); err != nil {
fmt.Println("EIP-4844 data not available", err, block.Block.Slot)
if errors.Is(err, forkchoice.ErrEIP4844DataNotAvailable) {
// Return an error if EIP-4844 data is not available
logger.Trace("[Caplin] forward sync EIP-4844 data not available", "blockSlot", block.Block.Slot)
Expand Down
Loading