Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Shred Repair Request #34771

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/admin_rpc_post_init.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use {
crate::repair::{outstanding_requests, serve_repair},
solana_gossip::cluster_info::ClusterInfo,
solana_runtime::bank_forks::BankForks,
solana_sdk::{pubkey::Pubkey, quic::NotifyKeyUpdate},
std::{
collections::HashSet,
net::UdpSocket,
sync::{Arc, RwLock},
},
};
Expand All @@ -15,4 +17,7 @@ pub struct AdminRpcRequestMetadataPostInit {
pub vote_account: Pubkey,
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
pub notifies: Vec<Arc<dyn NotifyKeyUpdate + Sync + Send>>,
pub repair_socket: Arc<UdpSocket>,
pub outstanding_repair_requests:
Arc<RwLock<outstanding_requests::OutstandingRequests<serve_repair::ShredRepairType>>>,
}
124 changes: 123 additions & 1 deletion core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ use {
outstanding_requests::OutstandingRequests,
quic_endpoint::LocalRequest,
repair_weight::RepairWeight,
serve_repair::{self, ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY},
serve_repair::{
self, RepairProtocol, RepairRequestHeader, ServeRepair, ShredRepairType,
REPAIR_PEERS_CACHE_CAPACITY,
},
},
},
crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender},
lru::LruCache,
solana_client::connection_cache::Protocol,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::{Blockstore, SlotMeta},
Expand Down Expand Up @@ -678,6 +682,70 @@ impl RepairService {
}
}

pub fn request_repair_for_shred_from_peer(
cluster_info: Arc<ClusterInfo>,
pubkey: Pubkey,
slot: u64,
shred_index: u64,
repair_socket: &UdpSocket,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
) {
let peer_repair_addr = cluster_info
.lookup_contact_info(&pubkey, |node| node.serve_repair(Protocol::UDP))
.unwrap()
.unwrap();
Self::request_repair_for_shred_from_address(
cluster_info,
pubkey,
peer_repair_addr,
slot,
shred_index,
repair_socket,
outstanding_repair_requests,
);
}

fn request_repair_for_shred_from_address(
cluster_info: Arc<ClusterInfo>,
pubkey: Pubkey,
address: SocketAddr,
slot: u64,
shred_index: u64,
repair_socket: &UdpSocket,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
) {
// Setup repair request
let identity_keypair = cluster_info.keypair();
let repair_request = ShredRepairType::Shred(slot, shred_index);
let nonce = outstanding_repair_requests
.write()
.unwrap()
.add_request(repair_request, timestamp());

// Create repair request
let header = RepairRequestHeader::new(cluster_info.id(), pubkey, timestamp(), nonce);
let request_proto = RepairProtocol::WindowIndex {
AshwinSekar marked this conversation as resolved.
Show resolved Hide resolved
header,
slot,
shred_index,
};
let packet_buf =
ServeRepair::repair_proto_to_bytes(&request_proto, &identity_keypair).unwrap();

// Prepare packet batch to send
let reqs = vec![(packet_buf, address)];

// Send packet batch
match batch_send(repair_socket, &reqs[..]) {
Ok(()) => {
trace!("successfully sent repair request!");
}
Err(SendPktsError::IoError(err, _num_failed)) => {
error!("batch_send failed to send packet - error = {:?}", err);
}
}
}

/// Generate repairs for all slots `x` in the repair_range.start <= x <= repair_range.end
#[cfg(test)]
pub fn generate_repairs_in_range(
Expand Down Expand Up @@ -859,6 +927,7 @@ pub(crate) fn sleep_shred_deferment_period() {
mod test {
use {
super::*,
crate::repair::quic_endpoint::RemoteRequest,
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_ledger::{
blockstore::{
Expand All @@ -883,6 +952,59 @@ mod test {
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified)
}

#[test]
pub fn test_request_repair_for_shred_from_address() {
// Setup cluster and repair info
let cluster_info = Arc::new(new_test_cluster_info());
let pubkey = cluster_info.id();
let slot = 100;
let shred_index = 50;
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let address = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let outstanding_repair_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default()));

// Send a repair request
RepairService::request_repair_for_shred_from_address(
cluster_info.clone(),
pubkey,
address,
slot,
shred_index,
&sender,
outstanding_repair_requests,
);

// Receive and translate repair packet
let mut packets = vec![solana_sdk::packet::Packet::default(); 1];
let _recv_count = solana_streamer::recvmmsg::recv_mmsg(&reader, &mut packets[..]).unwrap();
let packet = &packets[0];
let Some(bytes) = packet.data(..).map(Vec::from) else {
panic!("packet data not found");
};
let remote_request = RemoteRequest {
remote_pubkey: None,
remote_address: packet.meta().socket_addr(),
bytes,
response_sender: None,
};

// Deserialize and check the request
let deserialized =
serve_repair::deserialize_request::<RepairProtocol>(&remote_request).unwrap();
match deserialized {
RepairProtocol::WindowIndex {
slot: deserialized_slot,
shred_index: deserialized_shred_index,
..
} => {
assert_eq!(deserialized_slot, slot);
assert_eq!(deserialized_shred_index, shred_index);
}
_ => panic!("unexpected repair protocol"),
}
}

#[test]
pub fn test_repair_orphan() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
Expand Down
4 changes: 3 additions & 1 deletion core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,9 @@ pub(crate) fn get_repair_protocol(_: ClusterType) -> Protocol {
Protocol::UDP
}

fn deserialize_request<T>(request: &RemoteRequest) -> std::result::Result<T, bincode::Error>
pub(crate) fn deserialize_request<T>(
request: &RemoteRequest,
) -> std::result::Result<T, bincode::Error>
where
T: serde::de::DeserializeOwned,
{
Expand Down
9 changes: 8 additions & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use {
consensus::{tower_storage::TowerStorage, Tower},
cost_update_service::CostUpdateService,
drop_bank_service::DropBankService,
repair::{quic_endpoint::LocalRequest, repair_service::RepairInfo},
repair::{
quic_endpoint::LocalRequest,
repair_service::{OutstandingShredRepairs, RepairInfo},
},
replay_stage::{ReplayStage, ReplayStageConfig},
rewards_recorder_service::RewardsRecorderSender,
shred_fetch_stage::ShredFetchStage,
Expand Down Expand Up @@ -138,6 +141,7 @@ impl Tvu {
turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
repair_quic_endpoint_sender: AsyncSender<LocalRequest>,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
) -> Result<Self, String> {
let TvuSockets {
repair: repair_socket,
Expand Down Expand Up @@ -228,6 +232,7 @@ impl Tvu {
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
outstanding_repair_requests,
)
};

Expand Down Expand Up @@ -442,6 +447,7 @@ pub mod tests {
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let outstanding_repair_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
Expand Down Expand Up @@ -496,6 +502,7 @@ pub mod tests {
turbine_quic_endpoint_sender,
turbine_quic_endpoint_receiver,
repair_quic_endpoint_sender,
outstanding_repair_requests,
)
.expect("assume success");
exit.store(true, Ordering::Relaxed);
Expand Down
8 changes: 7 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1255,13 +1255,16 @@ impl Validator {
};
let last_vote = tower.last_vote();

let outstanding_repair_requests =
Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();

let tvu = Tvu::new(
vote_account,
authorized_voter_keypairs,
&bank_forks,
&cluster_info,
TvuSockets {
repair: node.sockets.repair,
repair: node.sockets.repair.try_clone().unwrap(),
retransmit: node.sockets.retransmit_sockets,
fetch: node.sockets.tvu,
ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
Expand Down Expand Up @@ -1307,6 +1310,7 @@ impl Validator {
turbine_quic_endpoint_sender.clone(),
turbine_quic_endpoint_receiver,
repair_quic_endpoint_sender,
outstanding_repair_requests.clone(),
)?;

if in_wen_restart {
Expand Down Expand Up @@ -1383,6 +1387,8 @@ impl Validator {
vote_account: *vote_account,
repair_whitelist: config.repair_whitelist.clone(),
notifies: key_notifies,
repair_socket: Arc::new(node.sockets.repair),
outstanding_repair_requests,
});

Ok(Self {
Expand Down
7 changes: 3 additions & 4 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,8 @@ impl WindowService {
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
dumped_slots_receiver: DumpedSlotsReceiver,
popular_pruned_forks_sender: PopularPrunedForksSender,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
) -> WindowService {
let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();

let cluster_info = repair_info.cluster_info.clone();
let bank_forks = repair_info.bank_forks.clone();

Expand All @@ -401,7 +400,7 @@ impl WindowService {
repair_quic_endpoint_response_sender,
repair_info,
verified_vote_receiver,
outstanding_requests.clone(),
outstanding_repair_requests.clone(),
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
Expand All @@ -426,7 +425,7 @@ impl WindowService {
duplicate_sender,
completed_data_sets_sender,
retransmit_sender,
outstanding_requests,
outstanding_repair_requests,
);

WindowService {
Expand Down
36 changes: 36 additions & 0 deletions validator/src/admin_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use {
solana_core::{
admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
consensus::{tower_storage::TowerStorage, Tower},
repair::repair_service,
validator::ValidatorStartProgress,
},
solana_geyser_plugin_manager::GeyserPluginManagerRequest,
Expand Down Expand Up @@ -207,6 +208,15 @@ pub trait AdminRpc {
#[rpc(meta, name = "contactInfo")]
fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;

#[rpc(meta, name = "repairShredFromPeer")]
fn repair_shred_from_peer(
&self,
meta: Self::Metadata,
pubkey: Pubkey,
slot: u64,
shred_index: u64,
) -> Result<()>;

#[rpc(meta, name = "repairWhitelist")]
fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;

Expand Down Expand Up @@ -487,6 +497,28 @@ impl AdminRpc for AdminRpcImpl {
meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
}

fn repair_shred_from_peer(
&self,
meta: Self::Metadata,
pubkey: Pubkey,
slot: u64,
shred_index: u64,
) -> Result<()> {
debug!("repair_shred_from_peer request received");

meta.with_post_init(|post_init| {
repair_service::RepairService::request_repair_for_shred_from_peer(
post_init.cluster_info.clone(),
pubkey,
slot,
shred_index,
&post_init.repair_socket.clone(),
post_init.outstanding_repair_requests.clone(),
);
Ok(())
})
}

fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
debug!("repair_whitelist request received");

Expand Down Expand Up @@ -895,6 +927,10 @@ mod tests {
vote_account,
repair_whitelist,
notifies: Vec::new(),
repair_socket: Arc::new(std::net::UdpSocket::bind("0.0.0.0:0").unwrap()),
outstanding_repair_requests: Arc::<
RwLock<repair_service::OutstandingShredRepairs>,
>::default(),
}))),
staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
rpc_to_plugin_manager_sender: None,
Expand Down
27 changes: 27 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,33 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.help("Output display mode")
)
)
.subcommand(SubCommand::with_name("repair-shred-from-peer")
.about("Request a repair from the specified validator")
.arg(
Arg::with_name("pubkey")
.long("pubkey")
.value_name("PUBKEY")
.takes_value(true)
.validator(is_pubkey)
.help("Identity pubkey of the validator to repair from")
)
.arg(
Arg::with_name("slot")
.long("slot")
.value_name("SLOT")
.takes_value(true)
.validator(is_parsable::<u64>)
.help("Slot to repair")
)
.arg(
Arg::with_name("shred")
.long("shred")
.value_name("SHRED")
.takes_value(true)
.validator(is_parsable::<u64>)
.help("Shred to repair")
)
)
.subcommand(
SubCommand::with_name("repair-whitelist")
.about("Manage the validator's repair protocol whitelist")
Expand Down
Loading