diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index ce54e01532d..83417c5b2d4 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -5,7 +5,7 @@ use std::time::Duration; use futures::{stream, StreamExt, TryStreamExt}; -use tokio::{sync::Mutex, time::sleep}; +use tokio::time::sleep; use tower::{timeout::Timeout, BoxError, Service, ServiceExt}; use zebra_network::{Request, Response}; @@ -29,8 +29,11 @@ const RATE_LIMIT_DELAY: Duration = Duration::from_secs(75); const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6); /// The mempool transaction crawler. +#[derive(Debug)] pub struct Crawler { - peer_set: Mutex>, + peer_set: Timeout, + // TODO: replace `()` with mempool downloader request type (#2606) + download_sender: tokio::sync::mpsc::Sender<()>, } impl Crawler @@ -40,15 +43,19 @@ where { /// Spawn an asynchronous task to run the mempool crawler. pub fn spawn(peer_set: S) { + // TODO: replace with sender from the mempool downloader (#2606) + let (download_sender, _download_receiver) = tokio::sync::mpsc::channel(FANOUT); + let crawler = Crawler { - peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)), + peer_set: Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT), + download_sender, }; tokio::spawn(crawler.run()); } /// Periodically crawl peers for transactions to include in the mempool. - pub async fn run(self) { + pub async fn run(mut self) { loop { self.wait_until_enabled().await; self.crawl_transactions().await; @@ -57,23 +64,24 @@ where } /// Wait until the mempool is enabled. - async fn wait_until_enabled(&self) { + async fn wait_until_enabled(&mut self) { // TODO: Check if synchronizing up to chain tip has finished (#2603). } /// Crawl peers for transactions. /// /// Concurrently request [`FANOUT`] peers for transactions to include in the mempool. - async fn crawl_transactions(&self) { + async fn crawl_transactions(&mut self) { let requests = stream::repeat(Request::MempoolTransactionIds).take(FANOUT); - let peer_set = self.peer_set.lock().await.clone(); + let peer_set = self.peer_set.clone(); + let download_sender = self.download_sender.clone(); trace!("Crawling for mempool transactions"); peer_set .call_all(requests) .unordered() - .and_then(|response| self.handle_response(response)) + .and_then(|response| Crawler::::handle_response(response, download_sender.clone())) // TODO: Reduce the log level of the errors (#2655). .inspect_err(|error| info!("Failed to crawl peer for mempool transactions: {}", error)) .for_each(|_| async {}) @@ -81,7 +89,10 @@ where } /// Handle a peer's response to the crawler's request for transactions. - async fn handle_response(&self, response: Response) -> Result<(), BoxError> { + async fn handle_response( + response: Response, + _sender: tokio::sync::mpsc::Sender<()>, + ) -> Result<(), BoxError> { let transaction_ids = match response { Response::TransactionIds(ids) => ids, _ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),