Skip to content

Commit

Permalink
moves ScanTask to its own module in service module
Browse files Browse the repository at this point in the history
  • Loading branch information
arya2 committed Feb 1, 2024
1 parent 80827f5 commit a9bcfae
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 111 deletions.
102 changes: 2 additions & 100 deletions zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,111 +1,13 @@
//! Initializing the scanner.
use std::sync::{mpsc, Arc};
//! Initializing the scanner and RPC server.
use color_eyre::Report;
use tokio::{sync::oneshot, task::JoinHandle};
use tower::ServiceBuilder;

use zebra_chain::{parameters::Network, transaction::Transaction};
use zebra_chain::parameters::Network;
use zebra_state::ChainTipChange;

use crate::{scan, service::ScanService, Config};

#[derive(Debug)]
/// Commands that can be sent to [`ScanTask`]
pub enum ScanTaskCommand {
/// Start scanning for new viewing keys
RegisterKeys(Vec<()>), // TODO: send `ViewingKeyWithHash`es

/// Stop scanning for deleted viewing keys
RemoveKeys {
/// Notify the caller once the key is removed (so the caller can wait before clearing results)
done_tx: oneshot::Sender<()>,

/// Key hashes that are to be removed
keys: Vec<String>,
},

/// Start sending results for key hashes to `result_sender`
SubscribeResults {
/// Sender for results
result_sender: mpsc::Sender<Arc<Transaction>>,

/// Key hashes to send the results of to result channel
key_hashes: Vec<()>,
},
}

#[derive(Debug, Clone)]
/// Scan task handle and command channel sender
pub struct ScanTask {
/// [`JoinHandle`] of scan task
pub handle: Arc<JoinHandle<Result<(), Report>>>,

/// Task command channel sender
pub cmd_sender: mpsc::Sender<ScanTaskCommand>,
}

impl ScanTask {
/// Spawns a new [`ScanTask`] for tests.
#[cfg(any(test, feature = "proptest-impl"))]
pub fn mock() -> (Self, mpsc::Receiver<ScanTaskCommand>) {
let (cmd_sender, cmd_receiver) = mpsc::channel();

(
Self {
handle: Arc::new(tokio::spawn(std::future::pending())),
cmd_sender,
},
cmd_receiver,
)
}

/// Spawns a new [`ScanTask`].
pub fn spawn(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
let (cmd_sender, cmd_receiver) = mpsc::channel();

Self {
handle: Arc::new(scan::spawn_init(
config,
network,
state,
chain_tip_change,
cmd_receiver,
)),
cmd_sender,
}
}

/// Sends a command to the scan task
pub fn send(
&mut self,
command: ScanTaskCommand,
) -> Result<(), mpsc::SendError<ScanTaskCommand>> {
self.cmd_sender.send(command)
}

/// Sends a message to the scan task to remove the provided viewing keys.
pub fn remove_keys(
&mut self,
keys: &[String],
) -> Result<oneshot::Receiver<()>, mpsc::SendError<ScanTaskCommand>> {
let (done_tx, done_rx) = oneshot::channel();

self.send(ScanTaskCommand::RemoveKeys {
keys: keys.to_vec(),
done_tx,
})?;

Ok(done_rx)
}
}

/// Initialize [`ScanService`] based on its config.
///
/// TODO: add a test for this function.
Expand Down
2 changes: 1 addition & 1 deletion zebra-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ pub mod service;
pub mod tests;

pub use config::Config;
pub use init::{init, ScanTask};
pub use init::init;
4 changes: 2 additions & 2 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ use zebra_chain::{
use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex};

use crate::{
init::ScanTaskCommand,
service::{ScanTask, ScanTaskCommand},
storage::{SaplingScanningKey, Storage},
Config, ScanTask,
Config,
};

/// The generic state type used by the scanner.
Expand Down
11 changes: 6 additions & 5 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ use tower::Service;
use zebra_chain::parameters::Network;
use zebra_state::ChainTipChange;

use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response};
use crate::{scan, storage::Storage, Config, Request, Response};

#[cfg(test)]
mod tests;

pub mod scan_task;

pub use scan_task::{ScanTask, ScanTaskCommand};

/// Zebra-scan [`tower::Service`]
#[derive(Debug)]
pub struct ScanService {
Expand Down Expand Up @@ -47,10 +51,7 @@ impl ScanService {
#[cfg(any(test, feature = "proptest-impl"))]
pub fn new_with_mock_scanner(
db: Storage,
) -> (
Self,
std::sync::mpsc::Receiver<crate::init::ScanTaskCommand>,
) {
) -> (Self, std::sync::mpsc::Receiver<ScanTaskCommand>) {
let (scan_task, cmd_receiver) = ScanTask::mock();
(Self { db, scan_task }, cmd_receiver)
}
Expand Down
129 changes: 129 additions & 0 deletions zebra-scan/src/service/scan_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//! Types and method implementations for [`ScanTask`]
use std::sync::{mpsc, Arc};

use color_eyre::Report;
use tokio::{sync::oneshot, task::JoinHandle};
use tower::ServiceBuilder;

use zebra_chain::{parameters::Network, transaction::Transaction};
use zebra_state::ChainTipChange;

use crate::{scan, service::ScanService, Config};

#[derive(Debug)]
/// Commands that can be sent to [`ScanTask`]
pub enum ScanTaskCommand {
/// Start scanning for new viewing keys
RegisterKeys(Vec<()>), // TODO: send `ViewingKeyWithHash`es

/// Stop scanning for deleted viewing keys
RemoveKeys {
/// Notify the caller once the key is removed (so the caller can wait before clearing results)
done_tx: oneshot::Sender<()>,

/// Key hashes that are to be removed
keys: Vec<String>,
},

/// Start sending results for key hashes to `result_sender`
SubscribeResults {
/// Sender for results
result_sender: mpsc::Sender<Arc<Transaction>>,

/// Key hashes to send the results of to result channel
key_hashes: Vec<()>,
},
}

#[derive(Debug, Clone)]
/// Scan task handle and command channel sender
pub struct ScanTask {
/// [`JoinHandle`] of scan task
pub handle: Arc<JoinHandle<Result<(), Report>>>,

/// Task command channel sender
pub cmd_sender: mpsc::Sender<ScanTaskCommand>,
}

impl ScanTask {
/// Spawns a new [`ScanTask`] for tests.
#[cfg(any(test, feature = "proptest-impl"))]
pub fn mock() -> (Self, mpsc::Receiver<ScanTaskCommand>) {
let (cmd_sender, cmd_receiver) = mpsc::channel();

(
Self {
handle: Arc::new(tokio::spawn(std::future::pending())),
cmd_sender,
},
cmd_receiver,
)
}

/// Spawns a new [`ScanTask`].
pub fn spawn(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
let (cmd_sender, cmd_receiver) = mpsc::channel();

Self {
handle: Arc::new(scan::spawn_init(
config,
network,
state,
chain_tip_change,
cmd_receiver,
)),
cmd_sender,
}
}

/// Sends a command to the scan task
pub fn send(
&mut self,
command: ScanTaskCommand,
) -> Result<(), mpsc::SendError<ScanTaskCommand>> {
self.cmd_sender.send(command)
}

/// Sends a message to the scan task to remove the provided viewing keys.
pub fn remove_keys(
&mut self,
keys: &[String],
) -> Result<oneshot::Receiver<()>, mpsc::SendError<ScanTaskCommand>> {
let (done_tx, done_rx) = oneshot::channel();

self.send(ScanTaskCommand::RemoveKeys {
keys: keys.to_vec(),
done_tx,
})?;

Ok(done_rx)
}
}

/// Initialize [`ScanService`] based on its config.
///
/// TODO: add a test for this function.
pub async fn init(
config: Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Result<(), Report> {
let scan_service = ServiceBuilder::new().buffer(10).service(ScanService::new(
&config,
network,
state,
chain_tip_change,
));

// Start the gRPC server.
zebra_grpc::server::init(scan_service).await?;

Ok(())
}
3 changes: 1 addition & 2 deletions zebra-scan/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use zebra_node_services::scan_service::{request::Request, response::Response};
use zebra_state::TransactionIndex;

use crate::{
init::ScanTaskCommand,
service::ScanService,
service::{scan_task::ScanTaskCommand, ScanService},
storage::db::tests::{fake_sapling_results, new_test_storage},
tests::ZECPAGES_SAPLING_VIEWING_KEY,
};
Expand Down
2 changes: 1 addition & 1 deletion zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl StartCmd {
if !config.shielded_scan.sapling_keys_to_scan.is_empty() {
// TODO: log the number of keys and update the scan_task_starts() test
info!("spawning shielded scanner with configured viewing keys");
let scan_task = zebra_scan::ScanTask::spawn(
let scan_task = zebra_scan::service::scan_task::ScanTask::spawn(
&config.shielded_scan,
config.network.network,
state,
Expand Down

0 comments on commit a9bcfae

Please sign in to comment.