Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create initial transaction crawler for the mempool #2646

Merged
merged 7 commits into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions zebrad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ once_cell = "1.8"
regex = "1.4.6"
semver = "1.0.3"
tempdir = "0.3.7"
tokio = { version = "0.3.6", features = ["full", "test-util"] }

proptest = "0.10"
proptest-derive = "0.3"
Expand Down
13 changes: 10 additions & 3 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@

use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
use color_eyre::eyre::{eyre, Report};
use futures::{select, FutureExt};
use tokio::sync::oneshot;
use tower::builder::ServiceBuilder;

use crate::components::{tokio::RuntimeRun, Inbound};
use crate::config::ZebradConfig;
use crate::{
components::{tokio::TokioComponent, ChainSync},
components::{mempool, tokio::TokioComponent, ChainSync},
prelude::*,
};

Expand Down Expand Up @@ -79,9 +80,15 @@ impl StartCmd {

info!("initializing syncer");
// TODO: use sync_length_receiver to activate the mempool (#2592)
let (syncer, _sync_length_receiver) = ChainSync::new(&config, peer_set, state, verifier);
let (syncer, _sync_length_receiver) =
ChainSync::new(&config, peer_set.clone(), state, verifier);

syncer.sync().await
select! {
result = syncer.sync().fuse() => result,
_ = mempool::Crawler::spawn(peer_set).fuse() => {
unreachable!("The mempool crawler only stops if it panics");
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions zebrad/src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! don't fit the async context well.

mod inbound;
pub mod mempool;
pub mod metrics;
mod sync;
pub mod tokio;
Expand Down
5 changes: 5 additions & 0 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//! Zebra mempool.

mod crawler;

pub use self::crawler::Crawler;
99 changes: 99 additions & 0 deletions zebrad/src/components/mempool/crawler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! Zebra Mempool crawler.
//!
//! The crawler periodically requests transactions from peers in order to populate the mempool.

use std::time::Duration;

use futures::{stream, StreamExt, TryStreamExt};
use tokio::{sync::Mutex, task::JoinHandle, time::sleep};
use tower::{timeout::Timeout, BoxError, Service, ServiceExt};

use zebra_network::{Request, Response};

#[cfg(test)]
mod tests;

/// The number of peers to request transactions from per crawl event.
const FANOUT: usize = 4;

/// The delay between crawl events.
const RATE_LIMIT_DELAY: Duration = Duration::from_secs(75);

/// The time to wait for a peer response.
///
/// # Correctness
///
/// If this timeout is removed or set too high, the crawler may hang waiting for a peer to respond.
///
/// If this timeout is set too low, the crawler may fail to populate the mempool.
const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);

/// The mempool transaction crawler.
pub struct Crawler<S> {
peer_set: Mutex<Timeout<S>>,
}

impl<S> Crawler<S>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
{
/// Spawn an asynchronous task to run the mempool crawler.
pub fn spawn(peer_set: S) -> JoinHandle<()> {
let crawler = Crawler {
peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)),
};

tokio::spawn(crawler.run())
}

/// Periodically crawl peers for transactions to include in the mempool.
pub async fn run(self) {
loop {
self.wait_until_enabled().await;
self.crawl_transactions().await;
sleep(RATE_LIMIT_DELAY).await;
}
}

/// Wait until the mempool is enabled.
async fn wait_until_enabled(&self) {
// TODO: Check if synchronizing up to chain tip has finished (#2603).
}

/// Crawl peers for transactions.
///
/// Concurrently request [`FANOUT`] peers for transactions to include in the mempool.
async fn crawl_transactions(&self) {
let requests = stream::repeat(Request::MempoolTransactionIds).take(FANOUT);
let peer_set = self.peer_set.lock().await.clone();

trace!("Crawling for mempool transactions");

peer_set
.call_all(requests)
.unordered()
.and_then(|response| self.handle_response(response))
// TODO: Reduce the log level of the errors (#2655).
.inspect_err(|error| info!("Failed to crawl peer for mempool transactions: {}", error))
.for_each(|_| async {})
.await;
}

/// Handle a peer's response to the crawler's request for transactions.
async fn handle_response(&self, response: Response) -> Result<(), BoxError> {
let transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),
};

trace!(
"Mempool crawler received {} transaction IDs",
transaction_ids.len()
);

// TODO: Download transactions and send them to the mempool (#2650)

Ok(())
}
}
76 changes: 76 additions & 0 deletions zebrad/src/components/mempool/crawler/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::time::Duration;

use tokio::{
sync::mpsc::{self, UnboundedReceiver},
time::{self, timeout},
};
use tower::{buffer::Buffer, util::BoxService, BoxError};

use zebra_network::{Request, Response};

use super::{Crawler, FANOUT, RATE_LIMIT_DELAY};

/// The number of iterations to crawl while testing.
///
/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test.
/// See more information in [`MAX_REQUEST_DELAY`].
const CRAWL_ITERATIONS: usize = 4;

/// The maximum time to wait for a request to arrive before considering it won't arrive.
///
/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test.
/// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for
/// at least `MAX_REQUEST_DELAY * CRAWL_ITERATIONS`.
const MAX_REQUEST_DELAY: Duration = Duration::from_millis(250);

/// The amount of time to advance beyond the expected instant that the crawler wakes up.
const ERROR_MARGIN: Duration = Duration::from_millis(100);

#[tokio::test]
async fn crawler_requests_for_transaction_ids() {
let (peer_set, mut requests) = mock_peer_set();

Crawler::spawn(peer_set);

time::pause();

for _ in 0..CRAWL_ITERATIONS {
for _ in 0..FANOUT {
let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;

assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds))));
}

let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;

assert!(extra_request.is_err());

time::advance(RATE_LIMIT_DELAY + ERROR_MARGIN).await;
}
}

/// Create a mock service to represent a [`PeerSet`][zebra_network::PeerSet] and intercept the
/// requests it receives.
///
/// The intercepted requests are sent through an unbounded channel to the receiver that's also
/// returned from this function.
fn mock_peer_set() -> (
Buffer<BoxService<Request, Response, BoxError>, Request>,
UnboundedReceiver<Request>,
) {
let (sender, receiver) = mpsc::unbounded_channel();

let proxy_service = tower::service_fn(move |request| {
let sender = sender.clone();

async move {
let _ = sender.send(request);

Ok(Response::TransactionIds(vec![]))
}
});

let service = Buffer::new(BoxService::new(proxy_service), 10);

(service, receiver)
}