Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Reduce attestation subscription spam from VC #4806

Closed
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 170 additions & 14 deletions validator_client/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,19 @@ use eth2::types::{
};
use futures::{stream, StreamExt};
use parking_lot::RwLock;
use safe_arith::ArithError;
use safe_arith::{ArithError, SafeArith};
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::cmp::min;
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use sync::poll_sync_committee_duties;
use sync::SyncDutiesMap;
use tokio::{sync::mpsc::Sender, time::sleep};
use types::{ChainSpec, Epoch, EthSpec, Hash256, PublicKeyBytes, SelectionProof, Slot};

/// Since the BN does not like it when we subscribe to slots that are close to the current time, we
/// will only subscribe to slots which are further than `SUBSCRIPTION_BUFFER_SLOTS` away.
///
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
/// bringing in the entire crate.
const SUBSCRIPTION_BUFFER_SLOTS: u64 = 2;

/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
const HISTORICAL_DUTIES_EPOCHS: u64 = 2;

Expand All @@ -62,6 +55,36 @@ const VALIDATOR_METRICS_MIN_COUNT: usize = 64;
/// reduces the amount of data that needs to be transferred.
const INITIAL_DUTIES_QUERY_SIZE: usize = 1;

/// Offsets from the attestation duty slot at which a subscription should be sent.
const ATTESTATION_SUBSCRIPTION_OFFSETS: [u64; 8] = [3, 4, 5, 6, 7, 8, 16, 32];

/// Check that `ATTESTATION_SUBSCRIPTION_OFFSETS` is sorted ascendingly.
const _: () = assert!({
let mut i = 0;
loop {
let prev = if i > 0 {
ATTESTATION_SUBSCRIPTION_OFFSETS[i - 1]
} else {
0
};
let curr = ATTESTATION_SUBSCRIPTION_OFFSETS[i];
if curr < prev {
break false;
}
i += 1;
if i == ATTESTATION_SUBSCRIPTION_OFFSETS.len() {
break true;
}
}
});
/// Since the BN does not like it when we subscribe to slots that are close to the current time, we
/// will only subscribe to slots which are further than 2 slots away.
///
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
/// bringing in the entire crate.
const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > 2);

#[derive(Debug)]
pub enum Error {
UnableToReadSlotClock,
Expand All @@ -84,6 +107,15 @@ pub struct DutyAndProof {
pub duty: AttesterData,
/// This value is only set to `Some` if the proof indicates that the validator is an aggregator.
pub selection_proof: Option<SelectionProof>,
/// Track which slots we should send subscriptions at for this duty.
///
/// This value is updated after each subscription is successfully sent.
pub subscription_slots: Arc<SubscriptionSlots>,
}

/// Tracker containing the slots at which an attestation subscription should be sent.
pub struct SubscriptionSlots {
slots: Vec<(Slot, AtomicBool)>,
}

impl DutyAndProof {
Expand Down Expand Up @@ -111,17 +143,55 @@ impl DutyAndProof {
}
})?;

let subscription_slots = SubscriptionSlots::new(duty.slot);

Ok(Self {
duty,
selection_proof,
subscription_slots,
})
}

/// Create a new `DutyAndProof` with the selection proof waiting to be filled in.
pub fn new_without_selection_proof(duty: AttesterData) -> Self {
let subscription_slots = SubscriptionSlots::new(duty.slot);
Self {
duty,
selection_proof: None,
subscription_slots,
}
}
}

impl SubscriptionSlots {
fn new(duty_slot: Slot) -> Arc<Self> {
let slots = ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.filter_map(|offset| duty_slot.safe_sub(offset).ok())
.map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false)))
.collect();
Arc::new(Self { slots })
}

/// Return `true` if we should send a subscription at `slot`.
fn should_send_subscription_at(&self, slot: Slot) -> bool {
// Iterate slots from smallest to largest looking for one that hasn't been completed yet.
self.slots
.iter()
.rev()
.any(|(subscribe_slot, already_sent)| {
slot >= *subscribe_slot && !already_sent.load(Ordering::Relaxed)
})
}

/// Update our record of subscribed slots to account for successful subscription at `slot`.
fn record_successful_subscription_at(&self, slot: Slot) {
for (scheduled_slot, already_sent) in self.slots.iter().rev() {
if slot >= *scheduled_slot {
already_sent.store(true, Ordering::Relaxed);
} else {
break;
}
}
}
}
Expand Down Expand Up @@ -575,7 +645,17 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]);

// This vector is likely to be a little oversized, but it won't reallocate.
let mut subscriptions = Vec::with_capacity(local_pubkeys.len() * 2);
// The calculation is based on the fact that every validator has
// `ATTESTATION_SUBSCRIPTION_OFFSETS.len()` subscriptions to send each epoch. We assume these
// subscriptions are distributed roughly evenly across the slots, but include an extra margin of
// 1/4 to hedge against imbalances.
let num_expected_subscriptions = std::cmp::max(
1,
5 * local_pubkeys.len() * ATTESTATION_SUBSCRIPTION_OFFSETS.len()
/ (4 * E::slots_per_epoch() as usize),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really that important, but I didn't follow this logic. Why 5x keys? Also the division by 4?
Naively i would expect 50% of keys to do ATTESTATION_SUBSCRIPTION_OFFSET.len() -1 and they lie in one half of the epoch.

It doesn't really matter tho, we're just trying to save some allocation, the larger the better, probably

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 5/4 is so the vec is 1/4 larger than it should be.

Other than the 5/4 the bound is quite tight because we are looking at 2 epochs total (current and next), there are 2 * local_pubkeys.len() attestation duties, and each duty gets sent at ATTESTATION_SUBSCRIPTION_OFFSETS.len() slots. So there are 2 * local_pubkeys.len() * ATTESTATION_SUBSCRIPTION_OFFSETS.len() subscription-slots to be distributed. They're approximately distributed over 2 epochs (current and next) but with a slight offset from the epoch start, e.g. some of the subscription-slots for the current epoch were in the previous epoch, but this is balanced out by some of the subscription-slots for the N + 2 epoch being in the next epoch (we just don't know it yet). Therefore the total number of subscription-slots per slot is: (2 * local_pubkeys.len() * ATTESTATION_SUBSCRIPTION_OFFSETS.len()) / (2 * E::slots_per_epoch()), and we can cancel the factor of 2.

Experimentally the number of subscriptions per slot is very close to this. For 10k validators, we'd expect 2.5k subscriptions without the 5/4 factor, and in practice we're very close to this:

DEBG Sent attestation subscriptions expected: 3124, count: 2512
DEBG Sent attestation subscriptions expected: 3124, count: 2560
DEBG Sent attestation subscriptions expected: 3124, count: 2535
DEBG Sent attestation subscriptions expected: 3124, count: 2483

);
let mut subscriptions = Vec::with_capacity(num_expected_subscriptions);
let mut subscription_slots_to_confirm = Vec::with_capacity(num_expected_subscriptions);

// For this epoch and the next epoch, produce any beacon committee subscriptions.
//
Expand All @@ -588,10 +668,10 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
.read()
.iter()
.filter_map(|(_, map)| map.get(epoch))
// The BN logs a warning if we try and subscribe to current or near-by slots. Give it a
// buffer.
.filter(|(_, duty_and_proof)| {
current_slot + SUBSCRIPTION_BUFFER_SLOTS < duty_and_proof.duty.slot
duty_and_proof
.subscription_slots
.should_send_subscription_at(current_slot)
})
.for_each(|(_, duty_and_proof)| {
let duty = &duty_and_proof.duty;
Expand All @@ -603,7 +683,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
committees_at_slot: duty.committees_at_slot,
slot: duty.slot,
is_aggregator,
})
});
subscription_slots_to_confirm.push(duty_and_proof.subscription_slots.clone());
});
}

Expand Down Expand Up @@ -632,6 +713,17 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
"Failed to subscribe validators";
"error" => %e
)
} else {
// Record that subscriptions were successfully sent.
debug!(
log,
"Sent attestation subscriptions";
"count" => subscriptions.len(),
"expected" => num_expected_subscriptions
);
for subscription_slots in subscription_slots_to_confirm {
subscription_slots.record_successful_subscription_at(current_slot);
}
}
}

Expand Down Expand Up @@ -1200,3 +1292,67 @@ async fn notify_block_production_service<T: SlotClock + 'static, E: EthSpec>(
};
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn subscription_slots_exact() {
for duty_slot in [
Slot::new(32),
Slot::new(47),
Slot::new(99),
Slot::new(1002003),
] {
let subscription_slots = SubscriptionSlots::new(duty_slot);

// Run twice to check idempotence (subscription slots shouldn't be marked as done until
// we mark them manually).
for _ in 0..2 {
for offset in ATTESTATION_SUBSCRIPTION_OFFSETS {
assert!(subscription_slots.should_send_subscription_at(duty_slot - offset));
}
}

// Mark each slot as complete and check that all prior slots are still marked
// incomplete.
for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.rev()
.enumerate()
{
subscription_slots.record_successful_subscription_at(duty_slot - offset);
for lower_offset in ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.rev()
.skip(i + 1)
{
assert!(lower_offset < offset);
assert!(
subscription_slots.should_send_subscription_at(duty_slot - lower_offset)
);
}
}
}
}
#[test]
fn subscription_slots_mark_multiple() {
for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() {
let duty_slot = Slot::new(64);
let subscription_slots = SubscriptionSlots::new(duty_slot);

subscription_slots.record_successful_subscription_at(duty_slot - offset);

// All past offsets (earlier slots) should be marked as complete.
for (j, other_offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() {
let past = j >= i;
assert_eq!(other_offset >= offset, past);
assert_eq!(
subscription_slots.should_send_subscription_at(duty_slot - other_offset),
!past
);
}
}
}
}