From bf4b9846155d90b2e88bf1ba200adfe0a14c0380 Mon Sep 17 00:00:00 2001 From: Ash Manning Date: Fri, 14 Feb 2025 10:22:17 +0100 Subject: [PATCH 1/5] wallet: lifetime fix for with_mut --- lib/wallet/util.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/wallet/util.rs b/lib/wallet/util.rs index de89358..a1f9d16 100644 --- a/lib/wallet/util.rs +++ b/lib/wallet/util.rs @@ -36,9 +36,9 @@ mod rwlock_write_guard_some { impl RwLockWriteGuardSome<'_, T> { /// Use the mutable inner value - pub fn with_mut(&mut self, f: F) -> Output + pub fn with_mut<'a, F, Output>(&'a mut self, f: F) -> Output where - F: FnOnce(&mut T) -> Output, + F: FnOnce(&'a mut T) -> Output, { self.0.with_inner_mut(|inner| f(*inner)) } From c6554c80a2609233d703744c9106e5dd0f608a94 Mon Sep 17 00:00:00 2001 From: Torkel Rogstad Date: Fri, 14 Feb 2025 10:17:24 +0100 Subject: [PATCH 2/5] wallet: add full scan functionality This is intended to be used for importing a wallet from an existing seed. --- app/main.rs | 5 ++ lib/cli.rs | 4 ++ lib/wallet/mod.rs | 4 ++ lib/wallet/sync.rs | 137 ++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 149 insertions(+), 1 deletion(-) diff --git a/app/main.rs b/app/main.rs index 16fd6cf..2bcfa90 100644 --- a/app/main.rs +++ b/app/main.rs @@ -564,6 +564,11 @@ async fn main() -> Result<()> { wallet.create_wallet(mnemonic, None).await?; } + if cli.wallet_opts.full_scan && wallet.is_initialized().await { + tracing::info!("full wallet scan enabled, starting..."); + wallet.full_scan().await?; + } + Either::Right(wallet) } else { Either::Left(validator) diff --git a/lib/cli.rs b/lib/cli.rs index d9a759d..70da3e1 100644 --- a/lib/cli.rs +++ b/lib/cli.rs @@ -241,6 +241,10 @@ pub enum WalletSyncSource { #[derive(Clone, Args)] pub struct WalletConfig { + /// If true, the wallet will perform a full scan of the blockchain on startup, before + /// proceeding with the normal operations of the wallet. + #[arg(long = "wallet-full-scan", default_value_t = false)] + pub full_scan: bool, /// If no existing wallet is found, automatically create and load /// a new, unencrypted wallet from a randomly generated BIP39 mnemonic. #[arg( diff --git a/lib/wallet/mod.rs b/lib/wallet/mod.rs index 50b1668..59ad0ed 100644 --- a/lib/wallet/mod.rs +++ b/lib/wallet/mod.rs @@ -777,6 +777,10 @@ impl Wallet { }) } + pub async fn full_scan(&self) -> miette::Result<()> { + self.inner.full_scan().await + } + pub async fn is_initialized(&self) -> bool { self.inner.bitcoin_wallet.read().await.is_some() } diff --git a/lib/wallet/sync.rs b/lib/wallet/sync.rs index b1a5d66..e87e7c6 100644 --- a/lib/wallet/sync.rs +++ b/lib/wallet/sync.rs @@ -1,11 +1,14 @@ //! Wallet synchronization -use std::time::SystemTime; +use std::{collections::HashSet, time::SystemTime}; use async_lock::{MutexGuard, RwLockWriteGuard}; +use bdk_electrum::electrum_client::ElectrumApi; use bdk_esplora::EsploraAsyncExt as _; use bdk_wallet::{file_store::Store, ChangeSet, FileStoreError}; use either::Either; +use miette::{miette, IntoDiagnostic}; +use tokio::time::Instant; use crate::{ types::WithdrawalBundleEventKind, @@ -132,13 +135,145 @@ impl WalletInner { })) } + async fn address_has_txs(&self, address: &bitcoin::Address) -> miette::Result { + match &self.chain_source { + Either::Left(electrum_client) => electrum_client + .inner + .script_get_history(&address.script_pubkey()) + .map(|txs| !txs.is_empty()) + .map_err(|err| miette!("failed to get address txs: {err:#}")), + + Either::Right(esplora_client) => esplora_client + .get_address_txs(address, None) + .await + .map(|txs| !txs.is_empty()) + .map_err(|err| miette!("failed to get address txs: {err:#}")), + } + } + + // TODO: is this actually correct? Need help from the Rust grownups! + #[allow(clippy::significant_drop_tightening, reason = "false positive")] + pub(in crate::wallet) async fn full_scan(&self) -> miette::Result<()> { + tracing::info!("starting wallet full scan"); + + let mut start = SystemTime::now(); + + let wallet_read = self.read_wallet_upgradable().await?; + let mut reveal_map = std::collections::HashMap::new(); + + for (keychain, _) in wallet_read.spk_index().keychains() { + let mut last_used_index = 0; + let step = 1000; + + // First find upper bound by incrementing by 1000 until we find unused + loop { + let address = wallet_read.peek_address(keychain, last_used_index); + let has_txs = self.address_has_txs(&address).await?; + + if !has_txs { + break; + } + last_used_index += step; + } + + // Now binary search between last_used_index - step and last_used_index + let mut high = last_used_index; + let mut low = last_used_index.saturating_sub(step); + + while low < high { + let mid = low + (high - low) / 2; + let address = wallet_read.peek_address(keychain, mid); + let has_txs = self.address_has_txs(&address).await?; + + if !has_txs { + high = mid; + } else { + low = mid + 1; + } + } + + tracing::info!( + "Found last used address at index {} for keychain {:?}: {} (next: {})", + low.saturating_sub(1), + keychain, + wallet_read.peek_address(keychain, low.saturating_sub(1)), + wallet_read.peek_address(keychain, low) + ); + + reveal_map.insert(keychain, low); + } + + // Now upgrade to write lock and reveal all addresses + let mut wallet_write = RwLockUpgradableReadGuardSome::upgrade(wallet_read).await; + + for (keychain, index) in reveal_map { + // Reveal the addresses, so that when we persist later the wallet + // will know which index we're at. + let _addresses = + wallet_write.with_mut(|wallet| wallet.reveal_addresses_to(keychain, index)); + } + + let request = wallet_write.start_full_scan().inspect({ + let mut once = HashSet::::new(); + move |k, spk_i, _| { + if once.insert(k) { + tracing::info!("scanning keychain [{:?}]", k); + } + tracing::info!("scanning spk no. {:<3}", spk_i); + } + }); + + const STOP_GAP: usize = 20; + const BATCH_SIZE: usize = 10; + const FETCH_PREV_TXOUTS: bool = true; + + let update = match &self.chain_source { + Either::Left(electrum_client) => electrum_client + .full_scan(request, STOP_GAP, BATCH_SIZE, FETCH_PREV_TXOUTS) + .into_diagnostic()?, + + Either::Right(esplora_client) => esplora_client + .full_scan(request, STOP_GAP, BATCH_SIZE) + .await + .into_diagnostic()?, + }; + + tracing::info!( + "wallet full scan complete in {:?}", + start.elapsed().unwrap_or_default(), + ); + + start = SystemTime::now(); + + let mut bdk_db = self.bitcoin_db.lock().await; + + wallet_write + .with_mut(|wallet| { + wallet + .apply_update(update) + .map(|_| wallet.persist(&mut bdk_db)) + }) + .into_diagnostic()? + .map_err(|err| miette!("failed to persist wallet: {err:#}"))?; + drop(wallet_write); + + tracing::info!( + "wallet full scan result persisted in {:?}", + start.elapsed().unwrap_or_default(), + ); + + Ok(()) + } + /// Sync the wallet if the wallet is not locked, committing changes #[allow(clippy::significant_drop_in_scrutinee, reason = "false positive")] pub(in crate::wallet) async fn sync(&self) -> Result<(), error::WalletSync> { match self.sync_lock().await? { Some(sync_write) => { + let start = Instant::now(); tracing::trace!("obtained sync lock, committing changes"); let () = sync_write.commit()?; + tracing::trace!("sync lock commit complete in {:?}", start.elapsed()); Ok(()) } None => { From f32eed6c89d8817912e3987fee3b5a0200770dff Mon Sep 17 00:00:00 2001 From: Torkel Rogstad Date: Fri, 14 Feb 2025 19:03:37 +0100 Subject: [PATCH 3/5] multi: adjust logs, error messages --- app/main.rs | 12 +++++++++--- lib/validator/task/mod.rs | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/app/main.rs b/app/main.rs index 2bcfa90..b487bb0 100644 --- a/app/main.rs +++ b/app/main.rs @@ -1,4 +1,4 @@ -use std::{future::Future, net::SocketAddr, path::Path, time::Duration}; +use std::{future::Future, net::SocketAddr, path::Path, str::FromStr, time::Duration}; use bdk_wallet::bip39::{Language, Mnemonic}; use bip300301::MainClient; @@ -547,8 +547,14 @@ async fn main() -> Result<()> { (Some(mnemonic_path), _) => { tracing::debug!("Reading mnemonic from file: {}", mnemonic_path.display()); - let mnemonic_str = std::fs::read_to_string(mnemonic_path) - .map_err(|err| miette!("failed to read mnemonic file: {}", err))?; + let mnemonic_str = + std::fs::read_to_string(mnemonic_path.clone()).map_err(|err| { + miette!( + "failed to read mnemonic file `{}`: {}", + mnemonic_path.display(), + err + ) + })?; let mnemonic = Mnemonic::parse_in(Language::English, &mnemonic_str) .map_err(|err| miette!("invalid mnemonic: {}", err))?; diff --git a/lib/validator/task/mod.rs b/lib/validator/task/mod.rs index 40bd622..193f601 100644 --- a/lib/validator/task/mod.rs +++ b/lib/validator/task/mod.rs @@ -798,7 +798,7 @@ where })? { if let Some(latest_missing_header_height) = latest_missing_header_height { - tracing::debug!("Syncing header #{latest_missing_header_height} `{latest_missing_header}` -> `{main_tip}`"); + tracing::trace!("Syncing header #{latest_missing_header_height} `{latest_missing_header}` -> `{main_tip}`"); } else { tracing::debug!("Syncing header `{latest_missing_header}` -> `{main_tip}`"); } From 0b405fd4f9cd9528c3905c99ef19fafe264e1147 Mon Sep 17 00:00:00 2001 From: Torkel Rogstad Date: Fri, 14 Feb 2025 19:04:11 +0100 Subject: [PATCH 4/5] wallet: change default esplora signet URL --- lib/cli.rs | 2 +- lib/wallet/mod.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/cli.rs b/lib/cli.rs index 70da3e1..7409668 100644 --- a/lib/cli.rs +++ b/lib/cli.rs @@ -255,7 +255,7 @@ pub struct WalletConfig { pub auto_create: bool, /// URL of the Esplora server to use for the wallet. /// - /// Signet: https://mempool.drivechain.live/api + /// Signet: http://172.105.148.135:3000 /// Regtest: http://localhost:3003 #[arg(long = "wallet-esplora-url")] pub esplora_url: Option, diff --git a/lib/wallet/mod.rs b/lib/wallet/mod.rs index 59ad0ed..01392bc 100644 --- a/lib/wallet/mod.rs +++ b/lib/wallet/mod.rs @@ -88,7 +88,7 @@ struct WalletInner { impl WalletInner { async fn init_esplora_client(config: &WalletConfig, network: Network) -> Result { let default_url = match network { - Network::Signet => "https://mempool.drivechain.live/api", + Network::Signet => "http://172.105.148.135:3000", Network::Regtest => "http://localhost:3003", _ => return Err(miette!("esplora: unsupported network: {network}")), }; @@ -98,7 +98,9 @@ impl WalletInner { tracing::info!(esplora_url = %esplora_url, "creating esplora client"); - let client = esplora_client::Builder::new(esplora_url.as_str()) + // URLs with a port number at the end get a `/` when turned back into a string, for + // some reason. The Esplora library doesn't like that! Remove it. + let client = esplora_client::Builder::new(esplora_url.as_str().trim_end_matches("/")) .build_async() .into_diagnostic()?; From 882586fc0145ff9897069e7251cdfe916a008c38 Mon Sep 17 00:00:00 2001 From: Torkel Rogstad Date: Fri, 14 Feb 2025 19:04:31 +0100 Subject: [PATCH 5/5] wallet: change sync from full to revealed SPK --- lib/wallet/sync.rs | 61 +++++++++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/lib/wallet/sync.rs b/lib/wallet/sync.rs index e87e7c6..385bc3e 100644 --- a/lib/wallet/sync.rs +++ b/lib/wallet/sync.rs @@ -1,6 +1,6 @@ //! Wallet synchronization -use std::{collections::HashSet, time::SystemTime}; +use std::time::SystemTime; use async_lock::{MutexGuard, RwLockWriteGuard}; use bdk_electrum::electrum_client::ElectrumApi; @@ -37,6 +37,8 @@ impl SyncWriteGuard<'_> { } } +const ESPLORA_PARALLEL_REQUESTS: usize = 25; + impl WalletInner { pub(in crate::wallet) async fn handle_connect_block( &self, @@ -107,17 +109,21 @@ impl WalletInner { outpoints = request.progress().outpoints_remaining, "Requesting sync via chain source" ); - const PARALLEL_REQUESTS: usize = 5; - const BATCH_SIZE: usize = 5; - const FETCH_PREV_TXOUTS: bool = false; let (source, update) = match &self.chain_source { - Either::Left(electrum_client) => ( - "electrum", - electrum_client.sync(request, BATCH_SIZE, FETCH_PREV_TXOUTS)?, - ), + Either::Left(electrum_client) => { + const BATCH_SIZE: usize = 5; + const FETCH_PREV_TXOUTS: bool = false; + ( + "electrum", + electrum_client.sync(request, BATCH_SIZE, FETCH_PREV_TXOUTS)?, + ) + } + Either::Right(esplora_client) => ( "esplora", - esplora_client.sync(request, PARALLEL_REQUESTS).await?, + esplora_client + .sync(request, ESPLORA_PARALLEL_REQUESTS) + .await?, ), }; tracing::trace!("Fetched update from {source}, applying update"); @@ -136,19 +142,20 @@ impl WalletInner { } async fn address_has_txs(&self, address: &bitcoin::Address) -> miette::Result { - match &self.chain_source { + let res = match &self.chain_source { Either::Left(electrum_client) => electrum_client .inner .script_get_history(&address.script_pubkey()) .map(|txs| !txs.is_empty()) - .map_err(|err| miette!("failed to get address txs: {err:#}")), + .map_err(|err| miette!(err)), Either::Right(esplora_client) => esplora_client .get_address_txs(address, None) .await .map(|txs| !txs.is_empty()) - .map_err(|err| miette!("failed to get address txs: {err:#}")), - } + .map_err(|err| miette!(err)), + }; + res.map_err(|err| miette!("failed to get address txs for `{address}`: {err:#}`")) } // TODO: is this actually correct? Need help from the Rust grownups! @@ -213,27 +220,21 @@ impl WalletInner { wallet_write.with_mut(|wallet| wallet.reveal_addresses_to(keychain, index)); } - let request = wallet_write.start_full_scan().inspect({ - let mut once = HashSet::::new(); - move |k, spk_i, _| { - if once.insert(k) { - tracing::info!("scanning keychain [{:?}]", k); - } - tracing::info!("scanning spk no. {:<3}", spk_i); - } - }); - - const STOP_GAP: usize = 20; - const BATCH_SIZE: usize = 10; - const FETCH_PREV_TXOUTS: bool = true; + // TODO: even a simple revealed SPK scan results in long-running anchor persistence jobs. Is it + // possible to pre-populate this? + let request = wallet_write.start_sync_with_revealed_spks(); let update = match &self.chain_source { - Either::Left(electrum_client) => electrum_client - .full_scan(request, STOP_GAP, BATCH_SIZE, FETCH_PREV_TXOUTS) - .into_diagnostic()?, + Either::Left(electrum_client) => { + const BATCH_SIZE: usize = 100; + const FETCH_PREV_TXOUTS: bool = true; + electrum_client + .sync(request, BATCH_SIZE, FETCH_PREV_TXOUTS) + .into_diagnostic()? + } Either::Right(esplora_client) => esplora_client - .full_scan(request, STOP_GAP, BATCH_SIZE) + .sync(request, ESPLORA_PARALLEL_REQUESTS) .await .into_diagnostic()?, };