Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent VC from sending expired subscriptions #5296

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions validator_client/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,29 @@ impl SubscriptionSlots {

/// Return `true` if we should send a subscription at `slot`.
fn should_send_subscription_at(&self, slot: Slot) -> bool {
// Prevent subscriptions from being sent for duties which are in the past.
// If there are no subscription slots (should be impossible currently) then considering them
// expired is a safe fallback.
let expired = self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can log a warn (or have a metric) here for expired == true. If we are bundling this PR with #5305 , we should ideally be getting should_send_subscription_at calls for expired slots only after a restart.
So getting a barrage of warns that are not right after a restart might alert us to a potential issue in duties calculation logic?

.slots
.first()
.map(|(latest_subscription_slot, _)| slot > *latest_subscription_slot)
.unwrap_or(true);

// Iterate slots from smallest to largest looking for one that hasn't been completed yet.
self.slots
.iter()
.rev()
.any(|(scheduled_slot, already_sent)| {
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
})
let unsent_subscription_exists =
self.slots
.iter()
.rev()
.any(|(scheduled_slot, already_sent)| {
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
});

!expired && unsent_subscription_exists
}

/// Update our record of subscribed slots to account for successful subscription at `slot`.
fn record_successful_subscription_at(&self, slot: Slot) {
fn record_subscription_sent_at(&self, slot: Slot) {
for (scheduled_slot, already_sent) in self.slots.iter().rev() {
if slot >= *scheduled_slot {
already_sent.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -736,12 +748,17 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
"Broadcast attestation subscriptions";
"count" => subscriptions.len(),
);
for subscription_slots in subscription_slots_to_confirm {
subscription_slots.record_successful_subscription_at(current_slot);
}
}
}

// Regardless of whether the subscription attempt succeeded or failed, mark the subscription
// slot as sent. We made an attempt, and if it failed we will retry at the next scheduled
// subscription slot. This is preferable to getting stuck on trying to subscribe at old slots
// which the BN will continually reject.
for subscription_slots in subscription_slots_to_confirm {
subscription_slots.record_subscription_sent_at(current_slot);
}

drop(subscriptions_timer);

// Prune old duties.
Expand Down Expand Up @@ -1343,7 +1360,7 @@ mod test {
.rev()
.enumerate()
{
subscription_slots.record_successful_subscription_at(duty_slot - offset);
subscription_slots.record_subscription_sent_at(duty_slot - offset);
for lower_offset in ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.rev()
Expand All @@ -1363,7 +1380,7 @@ mod test {
let duty_slot = Slot::new(64);
let subscription_slots = SubscriptionSlots::new(duty_slot);

subscription_slots.record_successful_subscription_at(duty_slot - offset);
subscription_slots.record_subscription_sent_at(duty_slot - offset);

// All past offsets (earlier slots) should be marked as complete.
for (j, other_offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() {
Expand Down
Loading