Skip to content

Commit

Permalink
reduces allocations when ingesting gossip pull requests (#4881)
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri authored Feb 11, 2025
1 parent da466b3 commit 522b46d
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 82 deletions.
78 changes: 24 additions & 54 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use {
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{
get_max_bloom_filter_bytes, CrdsFilter, CrdsTimeouts, ProcessPullStats,
get_max_bloom_filter_bytes, CrdsFilter, CrdsTimeouts, ProcessPullStats, PullRequest,
CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
},
crds_value::{CrdsValue, CrdsValueLabel},
Expand Down Expand Up @@ -158,12 +158,6 @@ pub struct ClusterInfo {
socket_addr_space: SocketAddrSpace,
}

struct PullData {
from_addr: SocketAddr,
caller: CrdsValue,
filter: CrdsFilter,
}

// Returns false if the CRDS value should be discarded.
#[inline]
#[must_use]
Expand Down Expand Up @@ -1621,41 +1615,13 @@ impl ClusterInfo {

fn handle_batch_pull_requests(
&self,
// from address, crds filter, caller contact info
requests: Vec<(SocketAddr, CrdsFilter, CrdsValue)>,
requests: Vec<PullRequest>,
thread_pool: &ThreadPool,
recycler: &PacketBatchRecycler,
stakes: &HashMap<Pubkey, u64>,
response_sender: &PacketBatchSender,
) {
let _st = ScopedTimer::from(&self.stats.handle_batch_pull_requests_time);
if requests.is_empty() {
return;
}
let self_pubkey = self.id();
let requests: Vec<_> = thread_pool.install(|| {
requests
.into_par_iter()
.with_min_len(1024)
.filter(|(_, _, caller)| match caller.data() {
CrdsData::LegacyContactInfo(_) | CrdsData::ContactInfo(_) => {
if caller.pubkey() == self_pubkey {
warn!("PullRequest ignored, I'm talking to myself");
self.stats.window_request_loopback.add_relaxed(1);
false
} else {
true
}
}
_ => false,
})
.map(|(from_addr, filter, caller)| PullData {
from_addr,
caller,
filter,
})
.collect()
});
if !requests.is_empty() {
self.stats
.pull_requests_count
Expand Down Expand Up @@ -1693,7 +1659,7 @@ impl ClusterInfo {
now: Instant,
rng: &'a mut R,
packet_batch: &'a mut PacketBatch,
) -> impl FnMut(&PullData) -> bool + 'a
) -> impl FnMut(&PullRequest) -> bool + 'a
where
R: Rng + CryptoRng,
{
Expand All @@ -1719,8 +1685,8 @@ impl ClusterInfo {
// incoming pull-requests, pings are also sent to request.from_addr (as
// opposed to caller.gossip address).
move |request| {
ContactInfo::is_valid_address(&request.from_addr, &self.socket_addr_space) && {
let node = (request.caller.pubkey(), request.from_addr);
ContactInfo::is_valid_address(&request.addr, &self.socket_addr_space) && {
let node = (request.pubkey, request.addr);
*cache.entry(node).or_insert_with(|| hard_check(node))
}
}
Expand All @@ -1732,7 +1698,7 @@ impl ClusterInfo {
&self,
thread_pool: &ThreadPool,
recycler: &PacketBatchRecycler,
requests: Vec<PullData>,
mut requests: Vec<PullRequest>,
stakes: &HashMap<Pubkey, u64>,
) -> PacketBatch {
const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT;
Expand All @@ -1741,22 +1707,17 @@ impl ClusterInfo {
let mut packet_batch =
PacketBatch::new_unpinned_with_recycler(recycler, 64, "handle_pull_requests");
let mut rng = rand::thread_rng();
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
let check_pull_request =
self.check_pull_request(Instant::now(), &mut rng, &mut packet_batch);
requests
.into_iter()
.filter(check_pull_request)
.map(|r| ((r.caller, r.filter), r.from_addr))
.unzip()
};
requests.retain({
let now = Instant::now();
self.check_pull_request(now, &mut rng, &mut packet_batch)
});
let now = timestamp();
let self_id = self.id();
let pull_responses = {
let _st = ScopedTimer::from(&self.stats.generate_pull_responses);
self.gossip.generate_pull_responses(
thread_pool,
&caller_and_filters,
&requests,
output_size_limit,
now,
|value| {
Expand Down Expand Up @@ -1787,10 +1748,10 @@ impl ClusterInfo {
score
};
let mut num_crds_values = 0;
let (scores, mut pull_responses): (Vec<_>, Vec<_>) = addrs
let (scores, mut pull_responses): (Vec<_>, Vec<_>) = requests
.iter()
.zip(pull_responses)
.flat_map(|(addr, values)| {
.flat_map(|(PullRequest { addr, .. }, values)| {
num_crds_values += values.len();
split_gossip_messages(PULL_RESPONSE_MAX_PAYLOAD_SIZE, values).map(move |values| {
let score = values.iter().map(get_score).max().unwrap_or_default();
Expand Down Expand Up @@ -2085,6 +2046,7 @@ impl ClusterInfo {
should_check_duplicate_instance: bool,
) -> Result<(), GossipError> {
let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time);
let self_pubkey = self.id();
// Filter out values if the shred-versions are different.
let self_shred_version = self.my_shred_version();
let packets = if self_shred_version == 0 {
Expand Down Expand Up @@ -2150,8 +2112,16 @@ impl ClusterInfo {
for (from_addr, packet) in packets {
match packet {
Protocol::PullRequest(filter, caller) => {
if verify_gossip_addr(&caller) {
pull_requests.push((from_addr, filter, caller))
let request = PullRequest {
pubkey: caller.pubkey(),
addr: from_addr,
wallclock: caller.wallclock(),
filter,
};
if request.pubkey == self_pubkey {
self.stats.window_request_loopback.add_relaxed(1);
} else {
pull_requests.push(request);
}
}
Protocol::PullResponse(_, mut data) => {
Expand Down
8 changes: 5 additions & 3 deletions gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use {
crds::{Crds, GossipRoute},
crds_data::CrdsData,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, CrdsTimeouts, ProcessPullStats},
crds_gossip_pull::{
CrdsFilter, CrdsGossipPull, CrdsTimeouts, ProcessPullStats, PullRequest,
},
crds_gossip_push::CrdsGossipPush,
crds_value::CrdsValue,
duplicate_shred::{self, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
Expand Down Expand Up @@ -231,7 +233,7 @@ impl CrdsGossip {
pub fn generate_pull_responses(
&self,
thread_pool: &ThreadPool,
filters: &[(CrdsValue, CrdsFilter)],
requests: &[PullRequest],
output_size_limit: usize, // Limit number of crds values returned.
now: u64,
should_retain_crds_value: impl Fn(&CrdsValue) -> bool + Sync,
Expand All @@ -240,7 +242,7 @@ impl CrdsGossip {
CrdsGossipPull::generate_pull_responses(
thread_pool,
&self.crds,
filters,
requests,
output_size_limit,
now,
should_retain_crds_value,
Expand Down
63 changes: 43 additions & 20 deletions gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ pub struct CrdsFilter {
mask_bits: u32,
}

// Incoming gossip pull request from a remote node.
pub struct PullRequest {
pub pubkey: Pubkey, // remote node's pubkey
pub addr: SocketAddr, // socket-addr the request was received from
pub wallclock: u64, // remote node's wallclock
pub filter: CrdsFilter,
}

impl Default for CrdsFilter {
fn default() -> Self {
CrdsFilter {
Expand Down Expand Up @@ -298,7 +306,7 @@ impl CrdsGossipPull {
pub(crate) fn generate_pull_responses(
thread_pool: &ThreadPool,
crds: &RwLock<Crds>,
requests: &[(CrdsValue, CrdsFilter)],
requests: &[PullRequest],
output_size_limit: usize, // Limit number of crds values returned.
now: u64,
should_retain_crds_value: impl Fn(&CrdsValue) -> bool + Sync,
Expand Down Expand Up @@ -454,7 +462,7 @@ impl CrdsGossipPull {
fn filter_crds_values(
thread_pool: &ThreadPool,
crds: &RwLock<Crds>,
filters: &[(CrdsValue, CrdsFilter)],
requests: &[PullRequest],
output_size_limit: usize, // Limit number of crds values returned.
now: u64,
// Predicate returning false if the CRDS value should be discarded.
Expand All @@ -471,11 +479,12 @@ impl CrdsGossipPull {
let output_size_limit = output_size_limit.try_into().unwrap_or(i64::MAX);
let output_size_limit = AtomicI64::new(output_size_limit);
let crds = crds.read().unwrap();
let apply_filter = |caller: &CrdsValue, filter: &CrdsFilter| {
let apply_filter = |request: &PullRequest| {
if output_size_limit.load(Ordering::Relaxed) <= 0 {
return Vec::default();
}
let caller_wallclock = caller.wallclock();
let filter = &request.filter;
let caller_wallclock = request.wallclock;
if !caller_wallclock_window.contains(&caller_wallclock) {
dropped_requests.fetch_add(1, Ordering::Relaxed);
return Vec::default();
Expand All @@ -501,12 +510,7 @@ impl CrdsGossipPull {
output_size_limit.fetch_sub(out.len() as i64, Ordering::Relaxed);
out
};
let ret: Vec<_> = thread_pool.install(|| {
filters
.par_iter()
.map(|(caller, filter)| apply_filter(caller, filter))
.collect()
});
let ret: Vec<_> = thread_pool.install(|| requests.par_iter().map(apply_filter).collect());
stats
.filter_crds_values_dropped_requests
.add_relaxed(dropped_requests.into_inner() as u64);
Expand Down Expand Up @@ -1059,11 +1063,18 @@ pub(crate) mod tests {

let dest_crds = RwLock::<Crds>::default();
let filters = req.unwrap().into_iter().flat_map(|(_, filters)| filters);
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let mut requests: Vec<_> = filters
.map(|filter| PullRequest {
pubkey: caller.pubkey(),
addr: SocketAddr::from(([0; 4], 0)),
wallclock: caller.wallclock(),
filter,
})
.collect();
let rsp = CrdsGossipPull::generate_pull_responses(
&thread_pool,
&dest_crds,
&filters,
&requests,
usize::MAX, // output_size_limit
now,
|_| true, // should_retain_crds_value
Expand All @@ -1087,28 +1098,33 @@ pub(crate) mod tests {
let rsp = CrdsGossipPull::generate_pull_responses(
&thread_pool,
&dest_crds,
&filters,
&requests,
usize::MAX, // output_size_limit
now,
|_| true, // should_retain_crds_value
&GossipStats::default(),
);
assert_eq!(rsp[0].len(), 0);
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS);
filters.extend({
assert_eq!(requests.len(), MIN_NUM_BLOOM_FILTERS);
requests.extend({
// Should return new value since caller is new.
let now = now + 1;
let caller = ContactInfo::new_localhost(&Pubkey::new_unique(), now);
let caller = CrdsValue::new_unsigned(CrdsData::from(caller));
filters
requests
.iter()
.map(|(_, filter)| (caller.clone(), filter.clone()))
.map(|PullRequest { filter, .. }| PullRequest {
pubkey: caller.pubkey(),
addr: SocketAddr::from(([0; 4], 0)),
wallclock: caller.wallclock(),
filter: filter.clone(),
})
.collect::<Vec<_>>()
});
let rsp = CrdsGossipPull::generate_pull_responses(
&thread_pool,
&dest_crds,
&filters,
&requests,
usize::MAX, // output_size_limit
now,
|_| true, // should_retain_crds_value
Expand Down Expand Up @@ -1190,11 +1206,18 @@ pub(crate) mod tests {
&SocketAddrSpace::Unspecified,
);
let filters = req.unwrap().into_iter().flat_map(|(_, filters)| filters);
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let requests: Vec<_> = filters
.map(|filter| PullRequest {
pubkey: caller.pubkey(),
addr: SocketAddr::from(([0; 4], 0)),
wallclock: caller.wallclock(),
filter,
})
.collect();
let rsp = CrdsGossipPull::generate_pull_responses(
&thread_pool,
&dest_crds,
&filters,
&requests,
usize::MAX, // output_size_limit
0, // now
|_| true, // should_retain_crds_value
Expand Down
17 changes: 12 additions & 5 deletions gossip/tests/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use {
crds_data::CrdsData,
crds_gossip::*,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsTimeouts, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_gossip_pull::{
CrdsTimeouts, ProcessPullStats, PullRequest, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
},
crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
crds_value::{CrdsValue, CrdsValueLabel},
},
Expand All @@ -25,7 +27,7 @@ use {
solana_streamer::socket::SocketAddrSpace,
std::{
collections::{HashMap, HashSet},
net::Ipv4Addr,
net::{Ipv4Addr, SocketAddr},
ops::Deref,
sync::{Arc, Mutex},
time::{Duration, Instant},
Expand Down Expand Up @@ -574,17 +576,22 @@ fn network_run_pull(
.map(|f| f.filter.bits.len() as usize / 8)
.sum::<usize>();
bytes += caller_info.bincode_serialized_size();
let filters: Vec<_> = filters
let requests: Vec<_> = filters
.into_iter()
.map(|f| (caller_info.clone(), f))
.map(|filter| PullRequest {
pubkey: from,
addr: SocketAddr::from(([0; 4], 0)),
wallclock: now,
filter,
})
.collect();
let rsp: Vec<_> = network
.get(&to)
.map(|node| {
node.gossip
.generate_pull_responses(
thread_pool,
&filters,
&requests,
usize::MAX, // output_size_limit
now,
|_| true, // should_retain_crds_value
Expand Down

0 comments on commit 522b46d

Please sign in to comment.