Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wallet: add full scan functionality #174

Merged
merged 5 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions app/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, net::SocketAddr, path::Path, time::Duration};
use std::{future::Future, net::SocketAddr, path::Path, str::FromStr, time::Duration};

use bdk_wallet::bip39::{Language, Mnemonic};
use bip300301::MainClient;
Expand Down Expand Up @@ -547,8 +547,14 @@ async fn main() -> Result<()> {
(Some(mnemonic_path), _) => {
tracing::debug!("Reading mnemonic from file: {}", mnemonic_path.display());

let mnemonic_str = std::fs::read_to_string(mnemonic_path)
.map_err(|err| miette!("failed to read mnemonic file: {}", err))?;
let mnemonic_str =
std::fs::read_to_string(mnemonic_path.clone()).map_err(|err| {
miette!(
"failed to read mnemonic file `{}`: {}",
mnemonic_path.display(),
err
)
})?;

let mnemonic = Mnemonic::parse_in(Language::English, &mnemonic_str)
.map_err(|err| miette!("invalid mnemonic: {}", err))?;
Expand All @@ -564,6 +570,11 @@ async fn main() -> Result<()> {
wallet.create_wallet(mnemonic, None).await?;
}

if cli.wallet_opts.full_scan && wallet.is_initialized().await {
tracing::info!("full wallet scan enabled, starting...");
wallet.full_scan().await?;
}

Either::Right(wallet)
} else {
Either::Left(validator)
Expand Down
6 changes: 5 additions & 1 deletion lib/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ pub enum WalletSyncSource {

#[derive(Clone, Args)]
pub struct WalletConfig {
/// If true, the wallet will perform a full scan of the blockchain on startup, before
/// proceeding with the normal operations of the wallet.
#[arg(long = "wallet-full-scan", default_value_t = false)]
pub full_scan: bool,
/// If no existing wallet is found, automatically create and load
/// a new, unencrypted wallet from a randomly generated BIP39 mnemonic.
#[arg(
Expand All @@ -251,7 +255,7 @@ pub struct WalletConfig {
pub auto_create: bool,
/// URL of the Esplora server to use for the wallet.
///
/// Signet: https://mempool.drivechain.live/api
/// Signet: http://172.105.148.135:3000
/// Regtest: http://localhost:3003
#[arg(long = "wallet-esplora-url")]
pub esplora_url: Option<url::Url>,
Expand Down
2 changes: 1 addition & 1 deletion lib/validator/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ where
})?
{
if let Some(latest_missing_header_height) = latest_missing_header_height {
tracing::debug!("Syncing header #{latest_missing_header_height} `{latest_missing_header}` -> `{main_tip}`");
tracing::trace!("Syncing header #{latest_missing_header_height} `{latest_missing_header}` -> `{main_tip}`");
} else {
tracing::debug!("Syncing header `{latest_missing_header}` -> `{main_tip}`");
}
Expand Down
10 changes: 8 additions & 2 deletions lib/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ struct WalletInner {
impl WalletInner {
async fn init_esplora_client(config: &WalletConfig, network: Network) -> Result<EsploraClient> {
let default_url = match network {
Network::Signet => "https://mempool.drivechain.live/api",
Network::Signet => "http://172.105.148.135:3000",
Network::Regtest => "http://localhost:3003",
_ => return Err(miette!("esplora: unsupported network: {network}")),
};
Expand All @@ -98,7 +98,9 @@ impl WalletInner {

tracing::info!(esplora_url = %esplora_url, "creating esplora client");

let client = esplora_client::Builder::new(esplora_url.as_str())
// URLs with a port number at the end get a `/` when turned back into a string, for
// some reason. The Esplora library doesn't like that! Remove it.
let client = esplora_client::Builder::new(esplora_url.as_str().trim_end_matches("/"))
.build_async()
.into_diagnostic()?;

Expand Down Expand Up @@ -777,6 +779,10 @@ impl Wallet {
})
}

pub async fn full_scan(&self) -> miette::Result<()> {
self.inner.full_scan().await
}

pub async fn is_initialized(&self) -> bool {
self.inner.bitcoin_wallet.read().await.is_some()
}
Expand Down
152 changes: 144 additions & 8 deletions lib/wallet/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
use std::time::SystemTime;

use async_lock::{MutexGuard, RwLockWriteGuard};
use bdk_electrum::electrum_client::ElectrumApi;
use bdk_esplora::EsploraAsyncExt as _;
use bdk_wallet::{file_store::Store, ChangeSet, FileStoreError};
use either::Either;
use miette::{miette, IntoDiagnostic};
use tokio::time::Instant;

use crate::{
types::WithdrawalBundleEventKind,
Expand Down Expand Up @@ -34,6 +37,8 @@ impl SyncWriteGuard<'_> {
}
}

const ESPLORA_PARALLEL_REQUESTS: usize = 25;

impl WalletInner {
pub(in crate::wallet) async fn handle_connect_block(
&self,
Expand Down Expand Up @@ -104,17 +109,21 @@ impl WalletInner {
outpoints = request.progress().outpoints_remaining,
"Requesting sync via chain source"
);
const PARALLEL_REQUESTS: usize = 5;
const BATCH_SIZE: usize = 5;
const FETCH_PREV_TXOUTS: bool = false;
let (source, update) = match &self.chain_source {
Either::Left(electrum_client) => (
"electrum",
electrum_client.sync(request, BATCH_SIZE, FETCH_PREV_TXOUTS)?,
),
Either::Left(electrum_client) => {
const BATCH_SIZE: usize = 5;
const FETCH_PREV_TXOUTS: bool = false;
(
"electrum",
electrum_client.sync(request, BATCH_SIZE, FETCH_PREV_TXOUTS)?,
)
}

Either::Right(esplora_client) => (
"esplora",
esplora_client.sync(request, PARALLEL_REQUESTS).await?,
esplora_client
.sync(request, ESPLORA_PARALLEL_REQUESTS)
.await?,
),
};
tracing::trace!("Fetched update from {source}, applying update");
Expand All @@ -132,13 +141,140 @@ impl WalletInner {
}))
}

async fn address_has_txs(&self, address: &bitcoin::Address) -> miette::Result<bool> {
let res = match &self.chain_source {
Either::Left(electrum_client) => electrum_client
.inner
.script_get_history(&address.script_pubkey())
.map(|txs| !txs.is_empty())
.map_err(|err| miette!(err)),

Either::Right(esplora_client) => esplora_client
.get_address_txs(address, None)
.await
.map(|txs| !txs.is_empty())
.map_err(|err| miette!(err)),
};
res.map_err(|err| miette!("failed to get address txs for `{address}`: {err:#}`"))
}

// TODO: is this actually correct? Need help from the Rust grownups!
#[allow(clippy::significant_drop_tightening, reason = "false positive")]
pub(in crate::wallet) async fn full_scan(&self) -> miette::Result<()> {
tracing::info!("starting wallet full scan");

let mut start = SystemTime::now();

let wallet_read = self.read_wallet_upgradable().await?;
let mut reveal_map = std::collections::HashMap::new();

for (keychain, _) in wallet_read.spk_index().keychains() {
let mut last_used_index = 0;
let step = 1000;

// First find upper bound by incrementing by 1000 until we find unused
loop {
let address = wallet_read.peek_address(keychain, last_used_index);
let has_txs = self.address_has_txs(&address).await?;

if !has_txs {
break;
}
last_used_index += step;
}

// Now binary search between last_used_index - step and last_used_index
let mut high = last_used_index;
let mut low = last_used_index.saturating_sub(step);

while low < high {
let mid = low + (high - low) / 2;
let address = wallet_read.peek_address(keychain, mid);
let has_txs = self.address_has_txs(&address).await?;

if !has_txs {
high = mid;
} else {
low = mid + 1;
}
}

tracing::info!(
"Found last used address at index {} for keychain {:?}: {} (next: {})",
low.saturating_sub(1),
keychain,
wallet_read.peek_address(keychain, low.saturating_sub(1)),
wallet_read.peek_address(keychain, low)
);

reveal_map.insert(keychain, low);
}

// Now upgrade to write lock and reveal all addresses
let mut wallet_write = RwLockUpgradableReadGuardSome::upgrade(wallet_read).await;

for (keychain, index) in reveal_map {
// Reveal the addresses, so that when we persist later the wallet
// will know which index we're at.
let _addresses =
wallet_write.with_mut(|wallet| wallet.reveal_addresses_to(keychain, index));
}

// TODO: even a simple revealed SPK scan results in long-running anchor persistence jobs. Is it
// possible to pre-populate this?
let request = wallet_write.start_sync_with_revealed_spks();

let update = match &self.chain_source {
Either::Left(electrum_client) => {
const BATCH_SIZE: usize = 100;
const FETCH_PREV_TXOUTS: bool = true;
electrum_client
.sync(request, BATCH_SIZE, FETCH_PREV_TXOUTS)
.into_diagnostic()?
}

Either::Right(esplora_client) => esplora_client
.sync(request, ESPLORA_PARALLEL_REQUESTS)
.await
.into_diagnostic()?,
};

tracing::info!(
"wallet full scan complete in {:?}",
start.elapsed().unwrap_or_default(),
);

start = SystemTime::now();

let mut bdk_db = self.bitcoin_db.lock().await;

wallet_write
.with_mut(|wallet| {
wallet
.apply_update(update)
.map(|_| wallet.persist(&mut bdk_db))
})
.into_diagnostic()?
.map_err(|err| miette!("failed to persist wallet: {err:#}"))?;
drop(wallet_write);

tracing::info!(
"wallet full scan result persisted in {:?}",
start.elapsed().unwrap_or_default(),
);

Ok(())
}

/// Sync the wallet if the wallet is not locked, committing changes
#[allow(clippy::significant_drop_in_scrutinee, reason = "false positive")]
pub(in crate::wallet) async fn sync(&self) -> Result<(), error::WalletSync> {
match self.sync_lock().await? {
Some(sync_write) => {
let start = Instant::now();
tracing::trace!("obtained sync lock, committing changes");
let () = sync_write.commit()?;
tracing::trace!("sync lock commit complete in {:?}", start.elapsed());
Ok(())
}
None => {
Expand Down
4 changes: 2 additions & 2 deletions lib/wallet/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ mod rwlock_write_guard_some {

impl<T> RwLockWriteGuardSome<'_, T> {
/// Use the mutable inner value
pub fn with_mut<F, Output>(&mut self, f: F) -> Output
pub fn with_mut<'a, F, Output>(&'a mut self, f: F) -> Output
where
F: FnOnce(&mut T) -> Output,
F: FnOnce(&'a mut T) -> Output,
{
self.0.with_inner_mut(|inner| f(*inner))
}
Expand Down