Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the Mutex from mempool::Crawler #2657

Closed
wants to merge 3 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions zebrad/src/components/mempool/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::time::Duration;

use futures::{stream, StreamExt, TryStreamExt};
use tokio::{sync::Mutex, time::sleep};
use tokio::time::sleep;
use tower::{timeout::Timeout, BoxError, Service, ServiceExt};

use zebra_network::{Request, Response};
Expand All @@ -29,8 +29,9 @@ const RATE_LIMIT_DELAY: Duration = Duration::from_secs(75);
const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);

/// The mempool transaction crawler.
#[derive(Debug)]
pub struct Crawler<S> {
peer_set: Mutex<Timeout<S>>,
peer_set: Timeout<S>,
}

impl<S> Crawler<S>
Expand All @@ -41,14 +42,14 @@ where
/// Spawn an asynchronous task to run the mempool crawler.
pub fn spawn(peer_set: S) {
let crawler = Crawler {
peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)),
peer_set: Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT),
};

tokio::spawn(crawler.run());
}

/// Periodically crawl peers for transactions to include in the mempool.
pub async fn run(self) {
pub async fn run(mut self) {
loop {
self.wait_until_enabled().await;
self.crawl_transactions().await;
Expand All @@ -57,43 +58,43 @@ where
}

/// Wait until the mempool is enabled.
async fn wait_until_enabled(&self) {
async fn wait_until_enabled(&mut self) {
// TODO: Check if synchronizing up to chain tip has finished (#2603).
}

/// Crawl peers for transactions.
///
/// Concurrently request [`FANOUT`] peers for transactions to include in the mempool.
async fn crawl_transactions(&self) {
async fn crawl_transactions(&mut self) {
let requests = stream::repeat(Request::MempoolTransactionIds).take(FANOUT);
let peer_set = self.peer_set.lock().await.clone();
let peer_set = self.peer_set.clone();

trace!("Crawling for mempool transactions");

peer_set
.call_all(requests)
.unordered()
.and_then(|response| self.handle_response(response))
.and_then(handle_response)
// TODO: Reduce the log level of the errors (#2655).
.inspect_err(|error| info!("Failed to crawl peer for mempool transactions: {}", error))
.for_each(|_| async {})
.await;
}
}

/// Handle a peer's response to the crawler's request for transactions.
async fn handle_response(&self, response: Response) -> Result<(), BoxError> {
let transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),
};
/// Handle a peer's response to the crawler's request for transactions.
async fn handle_response(response: Response) -> Result<(), BoxError> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't take &mut self here without cloning the crawler.
And generic type inference fails if it's a static method.

We'll probably just end up sending these responses to a channel anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think I'm not a fan of having this method become a function. To me it seems easier to have it as a method because it will need to use the channel that's likely to be stored inside the Crawler type.

Wouldn't it be simpler to just wait until we have updated the Tokio version, and see if that fixes the issue in a more straightforward manner?

I'm okay if you decide this is a better way to structure things, I'm just trying to figure out what's the best way forward 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think I'm not a fan of having this method become a function. To me it seems easier to have it as a method because it will need to use the channel that's likely to be stored inside the Crawler type.

Ok, I've changed it to a static method in commit 509f1fc.
And added a stub channel, to make sure that will all work.

What do you think?

Wouldn't it be simpler to just wait until we have updated the Tokio version, and see if that fixes the issue in a more straightforward manner?

I realise this question is a bit out of date - we're going to do the tokio upgrade after the mempool deadline.

I think having an instance method will work, but the ownership is a bit tricky to get right.
For an example, see PR #2661

We'll need to make sure all the fields are Clone and Sync.
But by the time we get around to that refactor, we'll have all the fields in place, so we'll know if that's reasonable.

Do you want to open a ticket for the post-tokio-upgrade refactor?

let transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),
};

trace!(
"Mempool crawler received {} transaction IDs",
transaction_ids.len()
);
trace!(
"Mempool crawler received {} transaction IDs",
transaction_ids.len()
);

// TODO: Download transactions and send them to the mempool (#2650)
// TODO: Download transactions and send them to the mempool (#2650)

Ok(())
}
Ok(())
}