Skip to content

Commit

Permalink
moves process_messages() method to scan_task.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
arya2 committed Feb 1, 2024
1 parent a9bcfae commit a6946a3
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 58 deletions.
57 changes: 2 additions & 55 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
use std::{
collections::{BTreeMap, HashMap},
sync::{
mpsc::{Receiver, TryRecvError},
Arc,
},
sync::{mpsc::Receiver, Arc},
time::Duration,
};

Expand Down Expand Up @@ -113,7 +110,7 @@ pub async fn start(
tokio::time::sleep(INITIAL_WAIT).await;

loop {
parsed_keys = ScanTask::process_msgs(&cmd_receiver, parsed_keys)?;
parsed_keys = ScanTask::process_messages(&cmd_receiver, parsed_keys)?;

let scanned_height = scan_height_and_store_results(
height,
Expand All @@ -137,56 +134,6 @@ pub async fn start(
}
}

impl ScanTask {
/// Accepts the scan task's `parsed_key` collection and a reference to the command channel receiver
///
/// Processes messages in the scan task channel, updating `parsed_keys` if required.
///
/// Returns the updated `parsed_keys`
fn process_msgs(
cmd_receiver: &Receiver<ScanTaskCommand>,
mut parsed_keys: Arc<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
>,
) -> Result<
Arc<HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>>,
Report,
> {
loop {
let cmd = match cmd_receiver.try_recv() {
Ok(cmd) => cmd,

Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
// Return early if the sender has been dropped.
return Err(eyre!("command channel disconnected"));
}
};

match cmd {
ScanTaskCommand::RemoveKeys { done_tx, keys } => {
// TODO: Replace with Arc::unwrap_or_clone() when it stabilises:
// https://github.com/rust-lang/rust/issues/93610
let mut updated_parsed_keys =
Arc::try_unwrap(parsed_keys).unwrap_or_else(|arc| (*arc).clone());

for key in keys {
updated_parsed_keys.remove(&key);
}

parsed_keys = Arc::new(updated_parsed_keys);

// Ignore send errors for the done notification
let _ = done_tx.send(());
}

_ => continue,
}
}

Ok(parsed_keys)
}
}
/// Get the block at `height` from `state`, scan it with the keys in `parsed_keys`, and store the
/// results in `storage`. If `height` is lower than the `key_birthdays` for that key, skip it.
///
Expand Down
62 changes: 59 additions & 3 deletions zebra-scan/src/service/scan_task.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
//! Types and method implementations for [`ScanTask`]
use std::sync::{mpsc, Arc};
use std::{
collections::HashMap,
sync::{
mpsc::{self, Receiver, TryRecvError},
Arc,
},
};

use color_eyre::Report;
use color_eyre::{eyre::eyre, Report};
use tokio::{sync::oneshot, task::JoinHandle};
use tower::ServiceBuilder;

use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey};
use zebra_chain::{parameters::Network, transaction::Transaction};
use zebra_state::ChainTipChange;
use zebra_state::{ChainTipChange, SaplingScanningKey};

use crate::{scan, service::ScanService, Config};

Expand Down Expand Up @@ -82,6 +89,55 @@ impl ScanTask {
}
}

/// Accepts the scan task's `parsed_key` collection and a reference to the command channel receiver
///
/// Processes messages in the scan task channel, updating `parsed_keys` if required.
///
/// Returns the updated `parsed_keys`
pub fn process_messages(
cmd_receiver: &Receiver<ScanTaskCommand>,
mut parsed_keys: Arc<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
>,
) -> Result<
Arc<HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>>,
Report,
> {
loop {
let cmd = match cmd_receiver.try_recv() {
Ok(cmd) => cmd,

Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
// Return early if the sender has been dropped.
return Err(eyre!("command channel disconnected"));
}
};

match cmd {
ScanTaskCommand::RemoveKeys { done_tx, keys } => {
// TODO: Replace with Arc::unwrap_or_clone() when it stabilises:
// https://github.com/rust-lang/rust/issues/93610
let mut updated_parsed_keys =
Arc::try_unwrap(parsed_keys).unwrap_or_else(|arc| (*arc).clone());

for key in keys {
updated_parsed_keys.remove(&key);
}

parsed_keys = Arc::new(updated_parsed_keys);

// Ignore send errors for the done notification
let _ = done_tx.send(());
}

_ => continue,
}
}

Ok(parsed_keys)
}

/// Sends a command to the scan task
pub fn send(
&mut self,
Expand Down

0 comments on commit a6946a3

Please sign in to comment.