Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add(scan): Implement DeleteKeys ScanService request #8217

Merged
merged 18 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions zebra-node-services/src/scan_service/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ pub enum Request {
/// TODO: Accept `ViewingKeyWithHash`es and return Ok(()) if successful or an error
RegisterKeys(Vec<()>),

/// TODO: Accept `KeyHash`es and return Ok(`Vec<KeyHash>`) with hashes of deleted keys
DeleteKeys(Vec<()>),
/// Deletes viewing keys and their results from the database.
DeleteKeys(Vec<String>),

/// TODO: Accept `KeyHash`es and return `Transaction`s
Results(Vec<()>),
Expand Down
3 changes: 3 additions & 0 deletions zebra-node-services/src/scan_service/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub enum Response {
/// Response to Results request
Results(Vec<Transaction>),

/// Response to DeleteKeys request
DeletedKeys,

/// Response to SubscribeResults request
SubscribeResults(mpsc::Receiver<Arc<Transaction>>),
}
1 change: 1 addition & 0 deletions zebra-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ categories = ["cryptography::cryptocurrencies"]
[[bin]] # Bin to run the Scanner gRPC server
name = "scanner-grpc-server"
path = "src/bin/rpc_server.rs"
required-features = ["proptest-impl"]

[features]

Expand Down
9 changes: 5 additions & 4 deletions zebra-scan/src/bin/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

use tower::ServiceBuilder;

use zebra_scan::service::ScanService;
use zebra_scan::{service::ScanService, storage::Storage};

#[tokio::main]
/// Runs an RPC server with a mock ScanTask
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (config, network) = Default::default();
let scan_service = ServiceBuilder::new()
.buffer(10)
.service(ScanService::new_with_mock_scanner(&config, network));

let (scan_service, _cmd_receiver) =
ScanService::new_with_mock_scanner(Storage::new(&config, network, false));
let scan_service = ServiceBuilder::new().buffer(10).service(scan_service);

// Start the gRPC server.
zebra_grpc::server::init(scan_service).await?;
Expand Down
62 changes: 37 additions & 25 deletions zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum ScanTaskCommand {
done_tx: oneshot::Sender<()>,

/// Key hashes that are to be removed
key_hashes: Vec<()>,
keys: Vec<String>,
},

/// Start sending results for key hashes to `result_sender`
Expand All @@ -36,25 +36,29 @@ pub enum ScanTaskCommand {
},
}

#[derive(Debug)]
#[derive(Debug, Clone)]
/// Scan task handle and command channel sender
pub struct ScanTask {
/// [`JoinHandle`] of scan task
pub handle: JoinHandle<Result<(), Report>>,
pub handle: Arc<JoinHandle<Result<(), Report>>>,

/// Task command channel sender
cmd_sender: mpsc::Sender<ScanTaskCommand>,
pub cmd_sender: mpsc::Sender<ScanTaskCommand>,
}

impl ScanTask {
/// Spawns a new [`ScanTask`] for tests.
pub fn mock() -> Self {
let (cmd_sender, _cmd_receiver) = mpsc::channel();

Self {
handle: tokio::spawn(std::future::pending()),
cmd_sender,
}
#[cfg(any(test, feature = "proptest-impl"))]
pub fn mock() -> (Self, mpsc::Receiver<ScanTaskCommand>) {
let (cmd_sender, cmd_receiver) = mpsc::channel();

(
Self {
handle: Arc::new(tokio::spawn(std::future::pending())),
cmd_sender,
},
cmd_receiver,
)
}

/// Spawns a new [`ScanTask`].
Expand All @@ -64,11 +68,16 @@ impl ScanTask {
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
// TODO: Pass `_cmd_receiver` to `scan::start()` to pass it new keys after it's been spawned
let (cmd_sender, _cmd_receiver) = mpsc::channel();
let (cmd_sender, cmd_receiver) = mpsc::channel();

Self {
handle: scan::spawn_init(config, network, state, chain_tip_change),
handle: Arc::new(scan::spawn_init(
config,
network,
state,
chain_tip_change,
cmd_receiver,
)),
cmd_sender,
}
}
Expand All @@ -80,18 +89,21 @@ impl ScanTask {
) -> Result<(), mpsc::SendError<ScanTaskCommand>> {
self.cmd_sender.send(command)
}
}

/// Initialize the scanner based on its config, and spawn a task for it.
///
/// TODO: add a test for this function.
pub fn spawn_init(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> JoinHandle<Result<(), Report>> {
scan::spawn_init(config, network, state, chain_tip_change)
/// Sends a message to the scan task to remove the provided viewing keys.
pub fn remove_keys(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional Suggestion

I think it would be good to place this function into a different file since the filename is init.rs, but let's do that in a different PR?

&mut self,
keys: &[String],
) -> Result<oneshot::Receiver<()>, mpsc::SendError<ScanTaskCommand>> {
let (done_tx, done_rx) = oneshot::channel();

self.send(ScanTaskCommand::RemoveKeys {
keys: keys.to_vec(),
done_tx,
})?;

Ok(done_rx)
}
}

/// Initialize [`ScanService`] based on its config.
Expand Down
2 changes: 1 addition & 1 deletion zebra-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ pub mod service;
pub mod tests;

pub use config::Config;
pub use init::{init, spawn_init};
pub use init::{init, ScanTask};
69 changes: 64 additions & 5 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
sync::{
mpsc::{Receiver, TryRecvError},
Arc,
},
time::Duration,
};

Expand Down Expand Up @@ -37,8 +40,9 @@ use zebra_chain::{
use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex};

use crate::{
init::ScanTaskCommand,
storage::{SaplingScanningKey, Storage},
Config,
Config, ScanTask,
};

/// The generic state type used by the scanner.
Expand Down Expand Up @@ -66,6 +70,7 @@ pub async fn start(
state: State,
chain_tip_change: ChainTipChange,
storage: Storage,
cmd_receiver: Receiver<ScanTaskCommand>,
) -> Result<(), Report> {
let network = storage.network();
let sapling_activation_height = storage.min_sapling_birthday_height();
Expand Down Expand Up @@ -102,12 +107,14 @@ pub async fn start(
Ok::<_, Report>((key.clone(), parsed_keys))
})
.try_collect()?;
let parsed_keys = Arc::new(parsed_keys);
let mut parsed_keys = Arc::new(parsed_keys);

// Give empty states time to verify some blocks before we start scanning.
tokio::time::sleep(INITIAL_WAIT).await;

loop {
parsed_keys = ScanTask::process_msgs(&cmd_receiver, parsed_keys)?;

let scanned_height = scan_height_and_store_results(
height,
state.clone(),
Expand All @@ -130,6 +137,56 @@ pub async fn start(
}
}

impl ScanTask {
/// Accepts the scan task's `parsed_key` collection and a reference to the command channel receiver
///
/// Processes messages in the scan task channel, updating `parsed_keys` if required.
///
/// Returns the updated `parsed_keys`
fn process_msgs(
cmd_receiver: &Receiver<ScanTaskCommand>,
mut parsed_keys: Arc<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
>,
) -> Result<
Arc<HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>>,
Report,
> {
loop {
let cmd = match cmd_receiver.try_recv() {
Ok(cmd) => cmd,

Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
// Return early if the sender has been dropped.
return Err(eyre!("command channel disconnected"));
}
};

match cmd {
ScanTaskCommand::RemoveKeys { done_tx, keys } => {
// TODO: Replace with Arc::unwrap_or_clone() when it stabilises:
// https://github.com/rust-lang/rust/issues/93610
let mut updated_parsed_keys =
Arc::try_unwrap(parsed_keys).unwrap_or_else(|arc| (*arc).clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional Suggestion

Arc::get_mut might also have been an option here.


for key in keys {
updated_parsed_keys.remove(&key);
}

parsed_keys = Arc::new(updated_parsed_keys);

// Ignore send errors for the done notification
let _ = done_tx.send(());
}

_ => continue,
}
}

Ok(parsed_keys)
}
}
/// Get the block at `height` from `state`, scan it with the keys in `parsed_keys`, and store the
/// results in `storage`. If `height` is lower than the `key_birthdays` for that key, skip it.
///
Expand Down Expand Up @@ -445,11 +502,12 @@ pub fn spawn_init(
network: Network,
state: State,
chain_tip_change: ChainTipChange,
cmd_receiver: Receiver<ScanTaskCommand>,
) -> JoinHandle<Result<(), Report>> {
let config = config.clone();

// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(init(config, network, state, chain_tip_change).in_current_span())
tokio::spawn(init(config, network, state, chain_tip_change, cmd_receiver).in_current_span())
}

/// Initialize the scanner based on its config.
Expand All @@ -460,11 +518,12 @@ pub async fn init(
network: Network,
state: State,
chain_tip_change: ChainTipChange,
cmd_receiver: Receiver<ScanTaskCommand>,
) -> Result<(), Report> {
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
.wait_for_panics()
.await;

// TODO: add more tasks here?
start(state, chain_tip_change, storage).await
start(state, chain_tip_change, storage, cmd_receiver).await
}
62 changes: 51 additions & 11 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! [`tower::Service`] for zebra-scan.

use std::{future::Future, pin::Pin, task::Poll};
use std::{future::Future, pin::Pin, task::Poll, time::Duration};

use futures::future::FutureExt;
use tower::Service;
Expand All @@ -10,16 +10,25 @@ use zebra_state::ChainTipChange;

use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response};

#[cfg(test)]
mod tests;

/// Zebra-scan [`tower::Service`]
#[derive(Debug)]
pub struct ScanService {
/// On-disk storage
db: Storage,
pub db: Storage,

/// Handle to scan task that's responsible for writing results
scan_task: ScanTask,
}

/// A timeout applied to `DeleteKeys` requests.
const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15);

/// The maximum number of keys that may be included in a request to the scan service
const MAX_REQUEST_KEYS: usize = 1000;

impl ScanService {
/// Create a new [`ScanService`].
pub fn new(
Expand All @@ -35,11 +44,15 @@ impl ScanService {
}

/// Create a new [`ScanService`] with a mock `ScanTask`
pub fn new_with_mock_scanner(config: &Config, network: Network) -> Self {
Self {
db: Storage::new(config, network, false),
scan_task: ScanTask::mock(),
}
#[cfg(any(test, feature = "proptest-impl"))]
pub fn new_with_mock_scanner(
db: Storage,
) -> (
Self,
std::sync::mpsc::Receiver<crate::init::ScanTaskCommand>,
) {
let (scan_task, cmd_receiver) = ScanTask::mock();
(Self { db, scan_task }, cmd_receiver)
}
}

Expand Down Expand Up @@ -84,10 +97,37 @@ impl Service<Request> for ScanService {
// - send new keys to scan task
}

Request::DeleteKeys(_key_hashes) => {
// TODO:
// - delete these keys and their results from db
// - send deleted keys to scan task
Request::DeleteKeys(keys) => {
let mut db = self.db.clone();
let mut scan_task = self.scan_task.clone();

return async move {
if keys.len() > MAX_REQUEST_KEYS {
return Err(format!(
"maximum number of keys per request is {MAX_REQUEST_KEYS}"
)
.into());
}

// Wait for a message to confirm that the scan task has removed the key up to `DELETE_KEY_TIMEOUT`
let remove_keys_result =
tokio::time::timeout(DELETE_KEY_TIMEOUT, scan_task.remove_keys(&keys)?)
.await
.map_err(|_| "timeout waiting for delete keys done notification");

// Delete the key from the database after either confirmation that it's been removed from the scan task, or
// waiting `DELETE_KEY_TIMEOUT`.
let delete_key_task = tokio::task::spawn_blocking(move || {
db.delete_sapling_results(keys);
});

// Return timeout errors or `RecvError`s, or wait for the key to be deleted from the database.
remove_keys_result??;
delete_key_task.await?;

Ok(Response::DeletedKeys)
}
.boxed();
}

Request::Results(_key_hashes) => {
Expand Down
Loading
Loading