Skip to content

Commit

Permalink
V1.13: increases retransmit-stage deduper capacity and reset-cycle (b…
Browse files Browse the repository at this point in the history
…ackport of #30758) (#30857)

For duplicate block detection, for each (slot, shred-index, shred-type)
we need to allow 2 different shreds to be retransmitted.
The commit implements this using two bloom-filter dedupers:
* Shreds are deduplicated using the 1st deduper.
* If a shred is not a duplicate, then we check if:
      (slot, shred-index, shred-type, k)
  is not a duplicate for either k = 0  or k = 1 using the 2nd deduper,
  and if so then the shred is retransmitted.

This allows to achieve larger capactiy compared to current LRU-cache.

(cherry picked from commit 5d9aba5)
  • Loading branch information
behzadnouri authored Mar 22, 2023
1 parent ef01acd commit 88aeaa8
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 108 deletions.
1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub mod ledger_metric_report_service;
pub mod multi_iterator_scanner;
pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;
pub mod packet_hasher;
pub mod packet_threshold;
pub mod progress_map;
pub mod qos_service;
Expand Down
40 changes: 0 additions & 40 deletions core/src/packet_hasher.rs

This file was deleted.

126 changes: 61 additions & 65 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use {
cluster_slots::ClusterSlots,
cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver},
completed_data_sets_service::CompletedDataSetsSender,
packet_hasher::PacketHasher,
repair_service::{DuplicateSlotsResetSender, RepairInfo},
window_service::{should_retransmit_and_persist, WindowService},
},
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
lru::LruCache,
rand::Rng,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_client::rpc_response::SlotUpdate,
solana_gossip::{
Expand All @@ -27,7 +27,7 @@ use {
shred::{Shred, ShredId},
},
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_perf::{packet::PacketBatch, sigverify::Deduper},
solana_rayon_threadlimit::get_thread_count,
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_runtime::{bank::Bank, bank_forks::BankForks},
Expand All @@ -36,7 +36,7 @@ use {
std::{
collections::{BTreeSet, HashMap, HashSet},
net::UdpSocket,
ops::{AddAssign, DerefMut},
ops::AddAssign,
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
Expand All @@ -47,7 +47,9 @@ use {
};

const MAX_DUPLICATE_COUNT: usize = 2;
const DEFAULT_LRU_SIZE: usize = 10_000;
const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);

const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -125,33 +127,38 @@ impl RetransmitStats {
}
}

// Map of shred (slot, index, type) => list of hash values seen for that key.
type ShredFilter = LruCache<ShredId, Vec<u64>>;

type ShredFilterAndHasher = (ShredFilter, PacketHasher);

// Returns true if shred is already received and should skip retransmit.
fn should_skip_retransmit(shred: &Shred, shreds_received: &Mutex<ShredFilterAndHasher>) -> bool {
let key = shred.id();
let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut();
match cache.get_mut(&key) {
Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => true,
Some(sent) => {
let hash = hasher.hash_shred(shred);
if sent.contains(&hash) {
true
} else {
sent.push(hash);
false
}
}
None => {
let hash = hasher.hash_shred(shred);
cache.put(key, vec![hash]);
false
struct ShredDeduper<const K: usize> {
deduper: Deduper<K, /*shred:*/ [u8]>,
shred_id_filter: Deduper<K, (ShredId, /*0..MAX_DUPLICATE_COUNT:*/ usize)>,
}

impl<const K: usize> ShredDeduper<K> {
fn new<R: Rng>(rng: &mut R, num_bits: u64) -> Self {
Self {
deduper: Deduper::new(rng, num_bits),
shred_id_filter: Deduper::new(rng, num_bits),
}
}

fn maybe_reset<R: Rng>(
&mut self,
rng: &mut R,
false_positive_rate: f64,
reset_cycle: Duration,
) {
self.deduper
.maybe_reset(rng, false_positive_rate, reset_cycle);
self.shred_id_filter
.maybe_reset(rng, false_positive_rate, reset_cycle);
}

fn dedup(&self, shred: &Shred, max_duplicate_count: usize) -> bool {
// In order to detect duplicate blocks across cluster, we retransmit
// max_duplicate_count different shreds for each ShredId.
let key = shred.id();
self.deduper.dedup(&shred.payload)
|| (0..max_duplicate_count).all(|i| self.shred_id_filter.dedup(&(key, i)))
}
}

// Returns true if this is the first time receiving a shred for `shred_slot`.
Expand All @@ -177,20 +184,6 @@ fn check_if_first_shred_received(
}
}

fn maybe_reset_shreds_received_cache(
shreds_received: &Mutex<ShredFilterAndHasher>,
hasher_reset_ts: &mut Instant,
) {
const UPDATE_INTERVAL: Duration = Duration::from_secs(1);
if hasher_reset_ts.elapsed() >= UPDATE_INTERVAL {
*hasher_reset_ts = Instant::now();
let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut();
cache.clear();
hasher.reset();
}
}

#[allow(clippy::too_many_arguments)]
fn retransmit(
thread_pool: &ThreadPool,
Expand All @@ -201,8 +194,7 @@ fn retransmit(
sockets: &[UdpSocket],
stats: &mut RetransmitStats,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
hasher_reset_ts: &mut Instant,
shreds_received: &Mutex<ShredFilterAndHasher>,
shred_deduper: &mut ShredDeduper<2>,
max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
rpc_subscriptions: Option<&RpcSubscriptions>,
Expand All @@ -223,13 +215,17 @@ fn retransmit(
stats.epoch_fetch += epoch_fetch.as_us();

let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts);
shred_deduper.maybe_reset(
&mut rand::thread_rng(),
DEDUPER_FALSE_POSITIVE_RATE,
DEDUPER_RESET_CYCLE,
);
epoch_cache_update.stop();
stats.epoch_cache_update += epoch_cache_update.as_us();

let socket_addr_space = cluster_info.socket_addr_space();
let retransmit_shred = |shred: &Shred, socket: &UdpSocket| {
if should_skip_retransmit(shred, shreds_received) {
if shred_deduper.dedup(shred, MAX_DUPLICATE_COUNT) {
stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed);
return None;
}
Expand Down Expand Up @@ -346,9 +342,9 @@ pub fn retransmitter(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
);
let mut hasher_reset_ts = Instant::now();
let mut rng = rand::thread_rng();
let mut shred_deduper = ShredDeduper::<2>::new(&mut rng, DEDUPER_NUM_BITS);
let mut stats = RetransmitStats::new(Instant::now());
let shreds_received = Mutex::new((LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default()));
let first_shreds_received = Mutex::<BTreeSet<Slot>>::default();
let num_threads = get_thread_count().min(8).max(sockets.len());
let thread_pool = ThreadPoolBuilder::new()
Expand All @@ -370,8 +366,7 @@ pub fn retransmitter(
&sockets,
&mut stats,
&cluster_nodes_cache,
&mut hasher_reset_ts,
&shreds_received,
&mut shred_deduper,
&max_slots,
&first_shreds_received,
rpc_subscriptions.as_deref(),
Expand Down Expand Up @@ -615,46 +610,47 @@ impl RetransmitSlotStats {

#[cfg(test)]
mod tests {
use super::*;
use {super::*, rand::SeedableRng, rand_chacha::ChaChaRng};

#[test]
fn test_already_received() {
let slot = 1;
let index = 5;
let version = 0x40;
let shred = Shred::new_from_data(slot, index, 0, None, true, true, 0, version, 0);
let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default())));
let mut rng = ChaChaRng::from_seed([0xa5; 32]);
let shred_deduper = ShredDeduper::<2>::new(&mut rng, /*num_bits:*/ 640_007);
// unique shred for (1, 5) should pass
assert!(!should_skip_retransmit(&shred, &shreds_received));
assert!(!shred_deduper.dedup(&shred, MAX_DUPLICATE_COUNT));
// duplicate shred for (1, 5) blocked
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(shred_deduper.dedup(&shred, MAX_DUPLICATE_COUNT));

let shred = Shred::new_from_data(slot, index, 2, None, true, true, 0, version, 0);
// first duplicate shred for (1, 5) passed
assert!(!should_skip_retransmit(&shred, &shreds_received));
assert!(!shred_deduper.dedup(&shred, MAX_DUPLICATE_COUNT));
// then blocked
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(shred_deduper.dedup(&shred, MAX_DUPLICATE_COUNT));

let shred = Shred::new_from_data(slot, index, 8, None, true, true, 0, version, 0);
// 2nd duplicate shred for (1, 5) blocked
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(shred_deduper.dedup(&shred, MAX_DUPLICATE_COUNT));
assert!(shred_deduper.dedup(&shred, MAX_DUPLICATE_COUNT));

let shred = Shred::new_empty_coding(slot, index, 0, 1, 1, 0, version);
// Coding at (1, 5) passes
assert!(!should_skip_retransmit(&shred, &shreds_received));
assert!(!shred_deduper.dedup(&shred, MAX_DUPLICATE_COUNT));
// then blocked
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(shred_deduper.dedup(&shred, MAX_DUPLICATE_COUNT));

let shred = Shred::new_empty_coding(slot, index, 2, 1, 1, 0, version);
// 2nd unique coding at (1, 5) passes
assert!(!should_skip_retransmit(&shred, &shreds_received));
assert!(!shred_deduper.dedup(&shred, MAX_DUPLICATE_COUNT));
// same again is blocked
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(shred_deduper.dedup(&shred, MAX_DUPLICATE_COUNT));

let shred = Shred::new_empty_coding(slot, index, 3, 1, 1, 0, version);
// Another unique coding at (1, 5) always blocked
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(shred_deduper.dedup(&shred, MAX_DUPLICATE_COUNT));
assert!(shred_deduper.dedup(&shred, MAX_DUPLICATE_COUNT));
}
}
2 changes: 1 addition & 1 deletion ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3479,7 +3479,7 @@ impl Blockstore {
// given slot and index as this implies the leader generated two different shreds with
// the same slot and index
pub fn is_shred_duplicate(&self, shred: ShredId, mut payload: Vec<u8>) -> Option<Vec<u8>> {
let (slot, index, shred_type) = shred.unwrap();
let (slot, index, shred_type) = shred.unpack();
let existing_shred = match shred_type {
ShredType::Data => self.get_data_shred(slot, index as u64),
ShredType::Code => self.get_coding_shred(slot, index as u64),
Expand Down
2 changes: 1 addition & 1 deletion ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl ShredId {
ShredId(slot, index, shred_type)
}

pub(crate) fn unwrap(&self) -> (Slot, /*shred index:*/ u32, ShredType) {
pub(crate) fn unpack(&self) -> (Slot, /*shred index:*/ u32, ShredType) {
(self.0, self.1, self.2)
}
}
Expand Down

0 comments on commit 88aeaa8

Please sign in to comment.