Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Reduce attestation subscription spam from VC #4806

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 177 additions & 15 deletions validator_client/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,19 @@ use eth2::types::{
};
use futures::{stream, StreamExt};
use parking_lot::RwLock;
use safe_arith::ArithError;
use safe_arith::{ArithError, SafeArith};
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::cmp::min;
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use sync::poll_sync_committee_duties;
use sync::SyncDutiesMap;
use tokio::{sync::mpsc::Sender, time::sleep};
use types::{ChainSpec, Epoch, EthSpec, Hash256, PublicKeyBytes, SelectionProof, Slot};

/// Since the BN does not like it when we subscribe to slots that are close to the current time, we
/// will only subscribe to slots which are further than `SUBSCRIPTION_BUFFER_SLOTS` away.
///
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
/// bringing in the entire crate.
const SUBSCRIPTION_BUFFER_SLOTS: u64 = 2;

/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
const HISTORICAL_DUTIES_EPOCHS: u64 = 2;

Expand All @@ -62,6 +55,36 @@ const VALIDATOR_METRICS_MIN_COUNT: usize = 64;
/// reduces the amount of data that needs to be transferred.
const INITIAL_DUTIES_QUERY_SIZE: usize = 1;

/// Offsets from the attestation duty slot at which a subscription should be sent.
const ATTESTATION_SUBSCRIPTION_OFFSETS: [u64; 8] = [3, 4, 5, 6, 7, 8, 16, 32];

/// Check that `ATTESTATION_SUBSCRIPTION_OFFSETS` is sorted ascendingly.
const _: () = assert!({
let mut i = 0;
loop {
let prev = if i > 0 {
ATTESTATION_SUBSCRIPTION_OFFSETS[i - 1]
} else {
0
};
let curr = ATTESTATION_SUBSCRIPTION_OFFSETS[i];
if curr < prev {
break false;
}
i += 1;
if i == ATTESTATION_SUBSCRIPTION_OFFSETS.len() {
break true;
}
}
});
/// Since the BN does not like it when we subscribe to slots that are close to the current time, we
/// will only subscribe to slots which are further than 2 slots away.
///
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
/// bringing in the entire crate.
const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > 2);

#[derive(Debug)]
pub enum Error {
UnableToReadSlotClock,
Expand All @@ -84,6 +107,16 @@ pub struct DutyAndProof {
pub duty: AttesterData,
/// This value is only set to `Some` if the proof indicates that the validator is an aggregator.
pub selection_proof: Option<SelectionProof>,
/// Track which slots we should send subscriptions at for this duty.
///
/// This value is updated after each subscription is successfully sent.
pub subscription_slots: Arc<SubscriptionSlots>,
}

/// Tracker containing the slots at which an attestation subscription should be sent.
pub struct SubscriptionSlots {
/// Pairs of `(slot, already_sent)` in slot-descending order.
slots: Vec<(Slot, AtomicBool)>,
}

impl DutyAndProof {
Expand Down Expand Up @@ -111,17 +144,55 @@ impl DutyAndProof {
}
})?;

let subscription_slots = SubscriptionSlots::new(duty.slot);

Ok(Self {
duty,
selection_proof,
subscription_slots,
})
}

/// Create a new `DutyAndProof` with the selection proof waiting to be filled in.
pub fn new_without_selection_proof(duty: AttesterData) -> Self {
let subscription_slots = SubscriptionSlots::new(duty.slot);
Self {
duty,
selection_proof: None,
subscription_slots,
}
}
}

impl SubscriptionSlots {
fn new(duty_slot: Slot) -> Arc<Self> {
let slots = ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.filter_map(|offset| duty_slot.safe_sub(offset).ok())
.map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false)))
.collect();
Arc::new(Self { slots })
}

/// Return `true` if we should send a subscription at `slot`.
fn should_send_subscription_at(&self, slot: Slot) -> bool {
// Iterate slots from smallest to largest looking for one that hasn't been completed yet.
self.slots
.iter()
.rev()
.any(|(scheduled_slot, already_sent)| {
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
})
}

/// Update our record of subscribed slots to account for successful subscription at `slot`.
fn record_successful_subscription_at(&self, slot: Slot) {
for (scheduled_slot, already_sent) in self.slots.iter().rev() {
if slot >= *scheduled_slot {
already_sent.store(true, Ordering::Relaxed);
} else {
break;
}
}
}
}
Expand Down Expand Up @@ -574,8 +645,24 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
let subscriptions_timer =
metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]);

// This vector is likely to be a little oversized, but it won't reallocate.
let mut subscriptions = Vec::with_capacity(local_pubkeys.len() * 2);
// This vector is intentionally oversized by 10% so that it won't reallocate.
// Each validator has 2 attestation duties occuring in the current and next epoch, for which
// they must send `ATTESTATION_SUBSCRIPTION_OFFSETS.len()` subscriptions. These subscription
// slots are approximately evenly distributed over the two epochs, usually with a slight lag
// that balances out (some subscriptions for the current epoch were sent in the previous, and
// some subscriptions for the next next epoch will be sent in the next epoch but aren't included
// in our calculation). We cancel the factor of 2 from the formula for simplicity.
let overallocation_numerator = 110;
let overallocation_denominator = 100;
let num_expected_subscriptions = overallocation_numerator
* std::cmp::max(
1,
local_pubkeys.len() * ATTESTATION_SUBSCRIPTION_OFFSETS.len()
/ E::slots_per_epoch() as usize,
)
/ overallocation_denominator;
let mut subscriptions = Vec::with_capacity(num_expected_subscriptions);
let mut subscription_slots_to_confirm = Vec::with_capacity(num_expected_subscriptions);

// For this epoch and the next epoch, produce any beacon committee subscriptions.
//
Expand All @@ -588,10 +675,10 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
.read()
.iter()
.filter_map(|(_, map)| map.get(epoch))
// The BN logs a warning if we try and subscribe to current or near-by slots. Give it a
// buffer.
.filter(|(_, duty_and_proof)| {
current_slot + SUBSCRIPTION_BUFFER_SLOTS < duty_and_proof.duty.slot
duty_and_proof
.subscription_slots
.should_send_subscription_at(current_slot)
})
.for_each(|(_, duty_and_proof)| {
let duty = &duty_and_proof.duty;
Expand All @@ -603,7 +690,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
committees_at_slot: duty.committees_at_slot,
slot: duty.slot,
is_aggregator,
})
});
subscription_slots_to_confirm.push(duty_and_proof.subscription_slots.clone());
});
}

Expand Down Expand Up @@ -632,6 +720,16 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
"Failed to subscribe validators";
"error" => %e
)
} else {
// Record that subscriptions were successfully sent.
debug!(
log,
"Broadcast attestation subscriptions";
"count" => subscriptions.len(),
);
for subscription_slots in subscription_slots_to_confirm {
subscription_slots.record_successful_subscription_at(current_slot);
}
}
}

Expand Down Expand Up @@ -1200,3 +1298,67 @@ async fn notify_block_production_service<T: SlotClock + 'static, E: EthSpec>(
};
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn subscription_slots_exact() {
for duty_slot in [
Slot::new(32),
Slot::new(47),
Slot::new(99),
Slot::new(1002003),
] {
let subscription_slots = SubscriptionSlots::new(duty_slot);

// Run twice to check idempotence (subscription slots shouldn't be marked as done until
// we mark them manually).
for _ in 0..2 {
for offset in ATTESTATION_SUBSCRIPTION_OFFSETS {
assert!(subscription_slots.should_send_subscription_at(duty_slot - offset));
}
}

// Mark each slot as complete and check that all prior slots are still marked
// incomplete.
for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.rev()
.enumerate()
{
subscription_slots.record_successful_subscription_at(duty_slot - offset);
for lower_offset in ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.rev()
.skip(i + 1)
{
assert!(lower_offset < offset);
assert!(
subscription_slots.should_send_subscription_at(duty_slot - lower_offset)
);
}
}
}
}
#[test]
fn subscription_slots_mark_multiple() {
for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() {
let duty_slot = Slot::new(64);
let subscription_slots = SubscriptionSlots::new(duty_slot);

subscription_slots.record_successful_subscription_at(duty_slot - offset);

// All past offsets (earlier slots) should be marked as complete.
for (j, other_offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() {
let past = j >= i;
assert_eq!(other_offset >= offset, past);
assert_eq!(
subscription_slots.should_send_subscription_at(duty_slot - other_offset),
!past
);
}
}
}
}