Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
sync: import extra requests in background worker
Browse files Browse the repository at this point in the history
  • Loading branch information
andresilva committed Jan 8, 2020
1 parent 43589c1 commit 1473374
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 55 deletions.
8 changes: 4 additions & 4 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,16 +452,16 @@ impl<B: BlockT> ChainSync<B> {

/// Schedule a justification request for the given block.
pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let client = &self.client;
self.extra_justifications.schedule((*hash, number), |base, block| {
let client = self.client.clone();
self.extra_justifications.schedule((*hash, number), move |base, block| {
client.is_descendent_of(base, block)
})
}

/// Schedule a finality proof request for the given block.
pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let client = &self.client;
self.extra_finality_proofs.schedule((*hash, number), |base, block| {
let client = self.client.clone();
self.extra_finality_proofs.schedule((*hash, number), move |base, block| {
client.is_descendent_of(base, block)
})
}
Expand Down
105 changes: 68 additions & 37 deletions client/network/src/protocol/sync/extra_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,22 @@ use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant};

use std::sync::Arc;
use std::sync::mpsc::{self, Sender};
use parking_lot::Mutex;
use std::thread;

// Time to wait before trying to get the same extra data from the same peer.
const EXTRA_RETRY_WAIT: Duration = Duration::from_secs(10);

/// Pending extra data request for the given block (hash and number).
pub(crate) type ExtraRequest<B> = (<B as BlockT>::Hash, NumberFor<B>);

type WorkPackage<B> = (
ExtraRequest<B>,
Box<dyn Fn(&<B as BlockT>::Hash, &<B as BlockT>::Hash) -> Result<bool, ClientError> + Send>,
);

/// Manages pending block extra data (e.g. justification) requests.
///
/// Multiple extras may be requested for competing forks, or for the same branch
Expand All @@ -37,11 +47,12 @@ pub(crate) type ExtraRequest<B> = (<B as BlockT>::Hash, NumberFor<B>);
/// competing fork).
#[derive(Debug)]
pub(crate) struct ExtraRequests<B: BlockT> {
tree: ForkTree<B::Hash, NumberFor<B>, ()>,
tree: Arc<Mutex<ForkTree<B::Hash, NumberFor<B>, ()>>>,
import_worker: Sender<WorkPackage<B>>,
/// best finalized block number that we have seen since restart
best_seen_finalized_number: NumberFor<B>,
/// requests which have been queued for later processing
pending_requests: VecDeque<ExtraRequest<B>>,
pending_requests: Arc<Mutex<VecDeque<ExtraRequest<B>>>>,
/// requests which are currently underway to some peer
active_requests: HashMap<PeerId, ExtraRequest<B>>,
/// previous requests without response
Expand All @@ -52,10 +63,40 @@ pub(crate) struct ExtraRequests<B: BlockT> {

impl<B: BlockT> ExtraRequests<B> {
pub(crate) fn new() -> Self {
let (sender, receiver) = mpsc::channel::<WorkPackage<B>>();
let tree = Arc::new(Mutex::new(ForkTree::new()));
let pending_requests = Arc::new(Mutex::new(VecDeque::new()));

// KUSAMA HACK: import extra requests in the background to avoid slow
// IO-bound import to block the network future
let t = tree.clone();
let p = pending_requests.clone();
thread::spawn(move || {
loop {
let (request, is_descendent_of) = receiver.recv().unwrap();
match t.lock().import(request.0, request.1, (), &is_descendent_of) {
Ok(true) => {
// this is a new root so we add it to the current `pending_requests`
p.lock().push_back((request.0, request.1));
}
Err(fork_tree::Error::Revert) => {
// we have finalized further than the given request, presumably
// by some other part of the system (not sync). we can safely
// ignore the `Revert` error.
},
Err(err) => {
debug!(target: "sync", "Failed to insert request {:?} into tree: {:?}", request, err);
}
_ => ()
}
}
});

ExtraRequests {
tree: ForkTree::new(),
tree,
import_worker: sender,
best_seen_finalized_number: Zero::zero(),
pending_requests: VecDeque::new(),
pending_requests,
active_requests: HashMap::new(),
failed_requests: HashMap::new(),
importing_requests: HashSet::new(),
Expand All @@ -64,8 +105,8 @@ impl<B: BlockT> ExtraRequests<B> {

/// Reset all state as if returned from `new`.
pub(crate) fn reset(&mut self) {
self.tree = ForkTree::new();
self.pending_requests.clear();
*self.tree.lock() = ForkTree::new();
self.pending_requests.lock().clear();
self.active_requests.clear();
self.failed_requests.clear();
}
Expand All @@ -78,31 +119,15 @@ impl<B: BlockT> ExtraRequests<B> {

/// Queue an extra data request to be considered by the `Matcher`.
pub(crate) fn schedule<F>(&mut self, request: ExtraRequest<B>, is_descendent_of: F)
where F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>
where F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError> + Send + 'static
{
match self.tree.import(request.0, request.1, (), &is_descendent_of) {
Ok(true) => {
// this is a new root so we add it to the current `pending_requests`
self.pending_requests.push_back((request.0, request.1));
}
Err(fork_tree::Error::Revert) => {
// we have finalized further than the given request, presumably
// by some other part of the system (not sync). we can safely
// ignore the `Revert` error.
return;
},
Err(err) => {
debug!(target: "sync", "Failed to insert request {:?} into tree: {:?}", request, err);
return;
}
_ => ()
}
self.import_worker.send((request, Box::new(is_descendent_of)));
}

/// Retry any pending request if a peer disconnected.
pub(crate) fn peer_disconnected(&mut self, who: &PeerId) {
if let Some(request) = self.active_requests.remove(who) {
self.pending_requests.push_front(request);
self.pending_requests.lock().push_front(request);
}
}

Expand All @@ -117,7 +142,7 @@ impl<B: BlockT> ExtraRequests<B> {
return Some((who, request.0, request.1, r))
}
self.failed_requests.entry(request).or_insert(Vec::new()).push((who, Instant::now()));
self.pending_requests.push_front(request);
self.pending_requests.lock().push_front(request);
}
None
}
Expand All @@ -137,11 +162,13 @@ impl<B: BlockT> ExtraRequests<B> {
return Ok(())
}

let mut tree = self.tree.lock();

if best_finalized_number > self.best_seen_finalized_number {
// normally we'll receive finality notifications for every block => finalize would be enough
// but if many blocks are finalized at once, some notifications may be omitted
// => let's use finalize_with_ancestors here
match self.tree.finalize_with_ancestors(
match tree.finalize_with_ancestors(
best_finalized_hash,
best_finalized_number,
&is_descendent_of,
Expand All @@ -157,9 +184,9 @@ impl<B: BlockT> ExtraRequests<B> {
self.best_seen_finalized_number = best_finalized_number;
}

let roots = self.tree.roots().collect::<HashSet<_>>();
let roots = tree.roots().collect::<HashSet<_>>();

self.pending_requests.retain(|(h, n)| roots.contains(&(h, n, &())));
self.pending_requests.lock().retain(|(h, n)| roots.contains(&(h, n, &())));
self.active_requests.retain(|_, (h, n)| roots.contains(&(h, n, &())));
self.failed_requests.retain(|(h, n), _| roots.contains(&(h, n, &())));

Expand All @@ -184,25 +211,27 @@ impl<B: BlockT> ExtraRequests<B> {
Ok(req) => (req.0, req.1),
Err(_) => {
if reschedule_on_failure {
self.pending_requests.push_front(request);
self.pending_requests.lock().push_front(request);
}
return true
}
};

if self.tree.finalize_root(&finalized_hash).is_none() {
let mut tree = self.tree.lock();
if tree.finalize_root(&finalized_hash).is_none() {
warn!(target: "sync", "Imported {:?} {:?} which isn't a root in the tree: {:?}",
finalized_hash,
finalized_number,
self.tree.roots().collect::<Vec<_>>()
tree.roots().collect::<Vec<_>>()
);
return true
}

self.failed_requests.clear();
self.active_requests.clear();
self.pending_requests.clear();
self.pending_requests.extend(self.tree.roots().map(|(&h, &n, _)| (h, n)));
let mut pending_requests = self.pending_requests.lock();
pending_requests.clear();
pending_requests.extend(tree.roots().map(|(&h, &n, _)| (h, n)));
self.best_seen_finalized_number = finalized_number;

true
Expand All @@ -220,8 +249,9 @@ pub(crate) struct Matcher<'a, B: BlockT> {

impl<'a, B: BlockT> Matcher<'a, B> {
fn new(extras: &'a mut ExtraRequests<B>) -> Self {
let remaining = extras.pending_requests.lock().len();
Matcher {
remaining: extras.pending_requests.len(),
remaining,
extras
}
}
Expand Down Expand Up @@ -250,7 +280,8 @@ impl<'a, B: BlockT> Matcher<'a, B> {
requests.retain(|(_, instant)| instant.elapsed() < EXTRA_RETRY_WAIT);
}

while let Some(request) = self.extras.pending_requests.pop_front() {
let mut pending_requests = self.extras.pending_requests.lock();
while let Some(request) = pending_requests.pop_front() {
for (peer, sync) in peers.iter().filter(|(_, sync)| sync.state == PeerSyncState::Available) {
// only ask peers that have synced at least up to the block number that we're asking the extra for
if sync.best_number < request.1 {
Expand All @@ -268,7 +299,7 @@ impl<'a, B: BlockT> Matcher<'a, B> {
return Some((peer.clone(), request))
}

self.extras.pending_requests.push_back(request);
pending_requests.push_back(request);
self.remaining -= 1;

if self.remaining == 0 {
Expand Down
18 changes: 4 additions & 14 deletions primitives/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ impl<B: BlockT> BlockImportWorker<B> {
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
) -> (impl Future<Output = ()> + Send, mpsc::UnboundedSender<ToWorkerMsg<B>>) {
use std::thread;

let (sender, mut port) = mpsc::unbounded();

let mut worker = BlockImportWorker {
Expand All @@ -159,19 +157,11 @@ impl<B: BlockT> BlockImportWorker<B> {
};

// Let's initialize `justification_import` and `finality_proof_import`.

// KUSAMA HACK: request justifications asynchronously to avoid startup hang
let justifications = worker.justification_import.as_mut().map(|i| i.on_start());
let mut result_sender = worker.result_sender.clone();
thread::spawn(move || {
if let Some(justifications) = justifications {
for (hash, number) in justifications {
result_sender.request_justification(&hash, number);
thread::sleep(Duration::from_millis(200));
}
if let Some(justification_import) = worker.justification_import.as_mut() {
for (hash, number) in justification_import.on_start() {
worker.result_sender.request_justification(&hash, number);
}
});

}
if let Some(finality_proof_import) = worker.finality_proof_import.as_mut() {
for (hash, number) in finality_proof_import.on_start() {
worker.result_sender.request_finality_proof(&hash, number);
Expand Down

0 comments on commit 1473374

Please sign in to comment.