From 2da2e149e6537d5e248b2735033646897c104a9b Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 12 Aug 2024 12:31:18 +1000 Subject: [PATCH] Add lcli command for manual rescue sync (#5458) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Rescue CLI * Allow tweaking start block * More caching * Merge branch 'unstable' into rescue-cli # Conflicts: # lcli/src/main.rs * Add `--known–common-ancestor` flag to optimise for download speed. * Rename rescue command to `http-sync` * Add logging * Add optional `--block-cache-dir` cli arg and create directory if it doesn't already exist. * Lint fix. * Merge branch 'unstable' into rescue-cli --- lcli/src/http_sync.rs | 152 ++++++++++++++++++++++++++++++++++++++++++ lcli/src/main.rs | 74 ++++++++++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 lcli/src/http_sync.rs diff --git a/lcli/src/http_sync.rs b/lcli/src/http_sync.rs new file mode 100644 index 00000000000..1ef40e63978 --- /dev/null +++ b/lcli/src/http_sync.rs @@ -0,0 +1,152 @@ +use clap::ArgMatches; +use clap_utils::{parse_optional, parse_required}; +use environment::Environment; +use eth2::{ + types::{BlockId, ChainSpec, ForkName, PublishBlockRequest, SignedBlockContents}, + BeaconNodeHttpClient, Error, SensitiveUrl, Timeouts, +}; +use eth2_network_config::Eth2NetworkConfig; +use ssz::Encode; +use std::fs; +use std::fs::File; +use std::io::{Read, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Duration; +use types::EthSpec; + +const HTTP_TIMEOUT: Duration = Duration::from_secs(3600); +const DEFAULT_CACHE_DIR: &str = "./cache"; + +pub fn run( + env: Environment, + network_config: Eth2NetworkConfig, + matches: &ArgMatches, +) -> Result<(), String> { + let executor = env.core_context().executor; + executor + .handle() + .ok_or("shutdown in progress")? + .block_on(async move { run_async::(network_config, matches).await }) +} + +pub async fn run_async( + network_config: Eth2NetworkConfig, + matches: &ArgMatches, +) -> Result<(), String> { + let spec = &network_config.chain_spec::()?; + let source_url: SensitiveUrl = parse_required(matches, "source-url")?; + let target_url: SensitiveUrl = parse_required(matches, "target-url")?; + let start_block: BlockId = parse_required(matches, "start-block")?; + let maybe_common_ancestor_block: Option = + parse_optional(matches, "known–common-ancestor")?; + let cache_dir_path: PathBuf = + parse_optional(matches, "block-cache-dir")?.unwrap_or(DEFAULT_CACHE_DIR.into()); + + let source = BeaconNodeHttpClient::new(source_url, Timeouts::set_all(HTTP_TIMEOUT)); + let target = BeaconNodeHttpClient::new(target_url, Timeouts::set_all(HTTP_TIMEOUT)); + + if !cache_dir_path.exists() { + fs::create_dir_all(&cache_dir_path) + .map_err(|e| format!("Unable to create block cache dir: {:?}", e))?; + } + + // 1. Download blocks back from head, looking for common ancestor. + let mut blocks = vec![]; + let mut next_block_id = start_block; + loop { + println!("downloading {next_block_id:?}"); + + let publish_block_req = + get_block_from_source::(&source, next_block_id, spec, &cache_dir_path).await; + let block = publish_block_req.signed_block(); + + next_block_id = BlockId::Root(block.parent_root()); + blocks.push((block.slot(), publish_block_req)); + + if let Some(ref common_ancestor_block) = maybe_common_ancestor_block { + if common_ancestor_block == &next_block_id { + println!("reached known common ancestor: {next_block_id:?}"); + break; + } + } + + let block_exists_in_target = target + .get_beacon_blocks_ssz::(next_block_id, spec) + .await + .unwrap() + .is_some(); + if block_exists_in_target { + println!("common ancestor found: {next_block_id:?}"); + break; + } + } + + // 2. Apply blocks to target. + for (slot, block) in blocks.iter().rev() { + println!("posting block at slot {slot}"); + if let Err(e) = target.post_beacon_blocks(block).await { + if let Error::ServerMessage(ref e) = e { + if e.code == 202 { + println!("duplicate block detected while posting block at slot {slot}"); + continue; + } + } + return Err(format!("error posting {slot}: {e:?}")); + } else { + println!("success"); + } + } + + println!("SYNCED!!!!"); + + Ok(()) +} + +async fn get_block_from_source( + source: &BeaconNodeHttpClient, + block_id: BlockId, + spec: &ChainSpec, + cache_dir_path: &Path, +) -> PublishBlockRequest { + let mut cache_path = cache_dir_path.join(format!("block_{block_id}")); + + if cache_path.exists() { + let mut f = File::open(&cache_path).unwrap(); + let mut bytes = vec![]; + f.read_to_end(&mut bytes).unwrap(); + PublishBlockRequest::from_ssz_bytes(&bytes, ForkName::Deneb).unwrap() + } else { + let block_from_source = source + .get_beacon_blocks_ssz::(block_id, spec) + .await + .unwrap() + .unwrap(); + let blobs_from_source = source + .get_blobs::(block_id, None) + .await + .unwrap() + .unwrap() + .data; + + let (kzg_proofs, blobs): (Vec<_>, Vec<_>) = blobs_from_source + .iter() + .cloned() + .map(|sidecar| (sidecar.kzg_proof, sidecar.blob.clone())) + .unzip(); + + let block_root = block_from_source.canonical_root(); + let block_contents = SignedBlockContents { + signed_block: Arc::new(block_from_source), + kzg_proofs: kzg_proofs.into(), + blobs: blobs.into(), + }; + let publish_block_req = PublishBlockRequest::BlockContents(block_contents); + + cache_path = cache_dir_path.join(format!("block_{block_root:?}")); + let mut f = File::create(&cache_path).unwrap(); + f.write_all(&publish_block_req.as_ssz_bytes()).unwrap(); + + publish_block_req + } +} diff --git a/lcli/src/main.rs b/lcli/src/main.rs index 85898b60ee4..380aeb6aceb 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -1,6 +1,7 @@ mod block_root; mod check_deposit_data; mod generate_bootnode_enr; +mod http_sync; mod indexed_attestations; mod mnemonic_validators; mod mock_el; @@ -552,6 +553,74 @@ fn main() { .display_order(0) ) ) + .subcommand( + Command::new("http-sync") + .about("Manual sync") + .arg( + Arg::new("start-block") + .long("start-block") + .value_name("BLOCK_ID") + .action(ArgAction::Set) + .help("Block ID of source's head") + .default_value("head") + .required(true) + .display_order(0) + ) + .arg( + Arg::new("source-url") + .long("source-url") + .value_name("URL") + .action(ArgAction::Set) + .help("URL to a synced beacon-API provider") + .required(true) + .display_order(0) + ) + .arg( + Arg::new("target-url") + .long("target-url") + .value_name("URL") + .action(ArgAction::Set) + .help("URL to an unsynced beacon-API provider") + .required(true) + .display_order(0) + ) + .arg( + Arg::new("testnet-dir") + .short('d') + .long("testnet-dir") + .value_name("PATH") + .action(ArgAction::Set) + .global(true) + .help("The testnet dir.") + .display_order(0) + ) + .arg( + Arg::new("network") + .long("network") + .value_name("NAME") + .action(ArgAction::Set) + .global(true) + .help("The network to use. Defaults to mainnet.") + .conflicts_with("testnet-dir") + .display_order(0) + ) + .arg( + Arg::new("known-common-ancestor") + .long("known-common-ancestor") + .value_name("BLOCK_ID") + .action(ArgAction::Set) + .help("Block ID of common ancestor, if known.") + .display_order(0) + ) + .arg( + Arg::new("block-cache-dir") + .long("block-cache-dir") + .value_name("PATH") + .action(ArgAction::Set) + .help("Directory to keep a cache of the downloaded SSZ blocks.") + .display_order(0) + ) + ) .get_matches(); let result = matches @@ -656,6 +725,11 @@ fn run(env_builder: EnvironmentBuilder, matches: &ArgMatches) -> } Some(("mock-el", matches)) => mock_el::run::(env, matches) .map_err(|e| format!("Failed to run mock-el command: {}", e)), + Some(("http-sync", matches)) => { + let network_config = get_network_config()?; + http_sync::run::(env, network_config, matches) + .map_err(|e| format!("Failed to run http-sync command: {}", e)) + } Some((other, _)) => Err(format!("Unknown subcommand {}. See --help.", other)), _ => Err("No subcommand provided. See --help.".to_string()), }