From da095d06e14c53b4bbfa24c0db0c1c7b61e17579 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 22 Mar 2024 17:08:50 +1100 Subject: [PATCH 1/8] Rescue CLI --- lcli/src/main.rs | 44 ++++++++++++++++++++++ lcli/src/rescue.rs | 94 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+) create mode 100644 lcli/src/rescue.rs diff --git a/lcli/src/main.rs b/lcli/src/main.rs index 17fafe6ec1e..e337407fb4e 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -15,6 +15,7 @@ mod mock_el; mod new_testnet; mod parse_ssz; mod replace_state_pubkeys; +mod rescue; mod skip_slots; mod state_root; mod transition_blocks; @@ -947,6 +948,44 @@ fn main() { until Cancun is triggered on mainnet.") ) ) + .subcommand( + SubCommand::with_name("rescue") + .about("Manual sync") + .arg( + Arg::with_name("source-url") + .long("source-url") + .value_name("URL") + .takes_value(true) + .help("URL to a synced beacon-API provider") + .required(true) + ) + .arg( + Arg::with_name("target-url") + .long("target-url") + .value_name("URL") + .takes_value(true) + .help("URL to an unsynced beacon-API provider") + .required(true) + ) + .arg( + Arg::with_name("testnet-dir") + .short("d") + .long("testnet-dir") + .value_name("PATH") + .takes_value(true) + .global(true) + .help("The testnet dir."), + ) + .arg( + Arg::with_name("network") + .long("network") + .value_name("NAME") + .takes_value(true) + .global(true) + .help("The network to use. Defaults to mainnet.") + .conflicts_with("testnet-dir") + ) + ) .get_matches(); let result = matches @@ -1090,6 +1129,11 @@ fn run( } ("mock-el", Some(matches)) => mock_el::run::(env, matches) .map_err(|e| format!("Failed to run mock-el command: {}", e)), + ("rescue", Some(matches)) => { + let network_config = get_network_config()?; + rescue::run::(env, network_config, matches) + .map_err(|e| format!("Failed to run rescue command: {}", e)) + } (other, _) => Err(format!("Unknown subcommand {}. See --help.", other)), } } diff --git a/lcli/src/rescue.rs b/lcli/src/rescue.rs new file mode 100644 index 00000000000..56976a594e8 --- /dev/null +++ b/lcli/src/rescue.rs @@ -0,0 +1,94 @@ +use clap::ArgMatches; +use clap_utils::parse_required; +use environment::Environment; +use eth2::{ + types::{BlockId, PublishBlockRequest, SignedBlockContents}, + BeaconNodeHttpClient, SensitiveUrl, Timeouts, +}; +use eth2_network_config::Eth2NetworkConfig; +use std::sync::Arc; +use std::time::Duration; +use types::EthSpec; + +const HTTP_TIMEOUT: Duration = Duration::from_secs(180); + +pub fn run( + env: Environment, + network_config: Eth2NetworkConfig, + matches: &ArgMatches<'_>, +) -> Result<(), String> { + let executor = env.core_context().executor; + executor + .handle() + .ok_or("shutdown in progress")? + .block_on(async move { run_async::(network_config, matches).await }) + .unwrap(); + Ok(()) +} +pub async fn run_async( + network_config: Eth2NetworkConfig, + matches: &ArgMatches<'_>, +) -> Result<(), String> { + let spec = &network_config.chain_spec::()?; + let source_url: SensitiveUrl = parse_required(matches, "source-url")?; + let target_url: SensitiveUrl = parse_required(matches, "target-url")?; + + let source = BeaconNodeHttpClient::new(source_url, Timeouts::set_all(HTTP_TIMEOUT)); + let target = BeaconNodeHttpClient::new(target_url, Timeouts::set_all(HTTP_TIMEOUT)); + + // 1. Download blocks back from head, looking for common ancestor. + let mut blocks = vec![]; + let mut next_block_id = BlockId::Head; + loop { + println!("downloading {next_block_id:?}"); + let block_from_source = source + .get_beacon_blocks_ssz::(next_block_id, spec) + .await + .unwrap() + .unwrap(); + let blobs_from_source = source + .get_blobs::(next_block_id, None) + .await + .unwrap() + .unwrap() + .data; + + next_block_id = BlockId::Root(block_from_source.parent_root()); + + let (kzg_proofs, blobs): (Vec<_>, Vec<_>) = blobs_from_source + .iter() + .cloned() + .map(|sidecar| (sidecar.kzg_proof, sidecar.blob.clone())) + .unzip(); + + let slot = block_from_source.slot(); + let block_contents = SignedBlockContents { + signed_block: Arc::new(block_from_source), + kzg_proofs: kzg_proofs.into(), + blobs: blobs.into(), + }; + let publish_block_req = PublishBlockRequest::BlockContents(block_contents); + blocks.push((slot, publish_block_req)); + + let block_exists_in_target = target + .get_beacon_blocks_ssz::(next_block_id, spec) + .await + .unwrap() + .is_some(); + if block_exists_in_target { + println!("common ancestor found: {next_block_id:?}"); + break; + } + } + + // 2. Apply blocks to target. + for (slot, block) in blocks.iter().rev() { + println!("posting block at slot {slot}"); + target.post_beacon_blocks(block).await.unwrap(); + println!("success"); + } + + println!("SYNCED!!!!"); + + Ok(()) +} From e98c87533a01888c0b3b44e19e7d47dd8b84f06a Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 22 Mar 2024 19:55:35 +1100 Subject: [PATCH 2/8] Allow tweaking start block --- lcli/src/main.rs | 9 +++++++++ lcli/src/rescue.rs | 5 +++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/lcli/src/main.rs b/lcli/src/main.rs index e337407fb4e..87ebc0e0e66 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -951,6 +951,15 @@ fn main() { .subcommand( SubCommand::with_name("rescue") .about("Manual sync") + .arg( + Arg::with_name("start-block") + .long("start-block") + .value_name("BLOCK_ID") + .takes_value(true) + .help("Block ID of source's head") + .default_value("head") + .required(true) + ) .arg( Arg::with_name("source-url") .long("source-url") diff --git a/lcli/src/rescue.rs b/lcli/src/rescue.rs index 56976a594e8..974e8542889 100644 --- a/lcli/src/rescue.rs +++ b/lcli/src/rescue.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use std::time::Duration; use types::EthSpec; -const HTTP_TIMEOUT: Duration = Duration::from_secs(180); +const HTTP_TIMEOUT: Duration = Duration::from_secs(3600); pub fn run( env: Environment, @@ -32,13 +32,14 @@ pub async fn run_async( let spec = &network_config.chain_spec::()?; let source_url: SensitiveUrl = parse_required(matches, "source-url")?; let target_url: SensitiveUrl = parse_required(matches, "target-url")?; + let start_block: BlockId = parse_required(matches, "start-block")?; let source = BeaconNodeHttpClient::new(source_url, Timeouts::set_all(HTTP_TIMEOUT)); let target = BeaconNodeHttpClient::new(target_url, Timeouts::set_all(HTTP_TIMEOUT)); // 1. Download blocks back from head, looking for common ancestor. let mut blocks = vec![]; - let mut next_block_id = BlockId::Head; + let mut next_block_id = start_block; loop { println!("downloading {next_block_id:?}"); let block_from_source = source From 1b9c1c7962d0eeb25c78bdc54c181bb8c8beae2f Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 25 Mar 2024 10:42:03 +1100 Subject: [PATCH 3/8] More caching --- lcli/src/rescue.rs | 85 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 59 insertions(+), 26 deletions(-) diff --git a/lcli/src/rescue.rs b/lcli/src/rescue.rs index 974e8542889..ae9d4755ee8 100644 --- a/lcli/src/rescue.rs +++ b/lcli/src/rescue.rs @@ -2,10 +2,14 @@ use clap::ArgMatches; use clap_utils::parse_required; use environment::Environment; use eth2::{ - types::{BlockId, PublishBlockRequest, SignedBlockContents}, + types::{BlockId, ChainSpec, ForkName, PublishBlockRequest, SignedBlockContents}, BeaconNodeHttpClient, SensitiveUrl, Timeouts, }; use eth2_network_config::Eth2NetworkConfig; +use ssz::Encode; +use std::fs::File; +use std::io::{Read, Write}; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use types::EthSpec; @@ -25,6 +29,7 @@ pub fn run( .unwrap(); Ok(()) } + pub async fn run_async( network_config: Eth2NetworkConfig, matches: &ArgMatches<'_>, @@ -42,54 +47,82 @@ pub async fn run_async( let mut next_block_id = start_block; loop { println!("downloading {next_block_id:?}"); - let block_from_source = source + + let publish_block_req = get_block_from_source::(&source, next_block_id, spec).await; + let block = publish_block_req.signed_block(); + + next_block_id = BlockId::Root(block.parent_root()); + blocks.push((block.slot(), publish_block_req)); + + let block_exists_in_target = target .get_beacon_blocks_ssz::(next_block_id, spec) .await .unwrap() + .is_some(); + if block_exists_in_target { + println!("common ancestor found: {next_block_id:?}"); + break; + } + } + + // 2. Apply blocks to target. + for (slot, block) in blocks.iter().rev() { + println!("posting block at slot {slot}"); + if let Err(e) = target.post_beacon_blocks(block).await { + println!("error posting {slot}: {e:?}"); + } else { + println!("success"); + } + } + + println!("SYNCED!!!!"); + + Ok(()) +} + +async fn get_block_from_source( + source: &BeaconNodeHttpClient, + block_id: BlockId, + spec: &ChainSpec, +) -> PublishBlockRequest { + let mut cache_path = PathBuf::from(format!("./cache/block_{block_id}")); + + if cache_path.exists() { + let mut f = File::open(&cache_path).unwrap(); + let mut bytes = vec![]; + f.read_to_end(&mut bytes).unwrap(); + PublishBlockRequest::from_ssz_bytes(&bytes, ForkName::Deneb).unwrap() + } else { + let block_from_source = source + .get_beacon_blocks_ssz::(block_id, spec) + .await + .unwrap() .unwrap(); let blobs_from_source = source - .get_blobs::(next_block_id, None) + .get_blobs::(block_id, None) .await .unwrap() .unwrap() .data; - next_block_id = BlockId::Root(block_from_source.parent_root()); - let (kzg_proofs, blobs): (Vec<_>, Vec<_>) = blobs_from_source .iter() .cloned() .map(|sidecar| (sidecar.kzg_proof, sidecar.blob.clone())) .unzip(); - let slot = block_from_source.slot(); + let block_root = block_from_source.canonical_root(); let block_contents = SignedBlockContents { signed_block: Arc::new(block_from_source), kzg_proofs: kzg_proofs.into(), blobs: blobs.into(), }; let publish_block_req = PublishBlockRequest::BlockContents(block_contents); - blocks.push((slot, publish_block_req)); - let block_exists_in_target = target - .get_beacon_blocks_ssz::(next_block_id, spec) - .await - .unwrap() - .is_some(); - if block_exists_in_target { - println!("common ancestor found: {next_block_id:?}"); - break; - } - } + cache_path = PathBuf::from(format!("./cache/block_{block_root:?}")); + let mut f = File::create(&cache_path).unwrap(); + f.write_all(&publish_block_req.as_ssz_bytes()).unwrap(); - // 2. Apply blocks to target. - for (slot, block) in blocks.iter().rev() { - println!("posting block at slot {slot}"); - target.post_beacon_blocks(block).await.unwrap(); - println!("success"); + publish_block_req } - - println!("SYNCED!!!!"); - - Ok(()) } From 9cc7116953627324a5beedca1e37138ca99b1e32 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 9 Jul 2024 13:30:01 +1000 Subject: [PATCH 4/8] =?UTF-8?q?Add=20`--known=E2=80=93common-ancestor`=20f?= =?UTF-8?q?lag=20to=20optimise=20for=20download=20speed.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lcli/src/main.rs | 8 ++++++++ lcli/src/rescue.rs | 22 +++++++++++++++++----- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/lcli/src/main.rs b/lcli/src/main.rs index 808a87df469..7ca3d2e91b2 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -604,6 +604,14 @@ fn main() { .conflicts_with("testnet-dir") .display_order(0) ) + .arg( + Arg::new("known-common-ancestor") + .long("known-common-ancestor") + .value_name("BLOCK_ID") + .action(ArgAction::Set) + .help("Block ID of common ancestor, if known.") + .display_order(0) + ) ) .get_matches(); diff --git a/lcli/src/rescue.rs b/lcli/src/rescue.rs index b9feac5300a..51aabfbfac7 100644 --- a/lcli/src/rescue.rs +++ b/lcli/src/rescue.rs @@ -1,9 +1,9 @@ use clap::ArgMatches; -use clap_utils::parse_required; +use clap_utils::{parse_optional, parse_required}; use environment::Environment; use eth2::{ types::{BlockId, ChainSpec, ForkName, PublishBlockRequest, SignedBlockContents}, - BeaconNodeHttpClient, SensitiveUrl, Timeouts, + BeaconNodeHttpClient, Error, SensitiveUrl, Timeouts, }; use eth2_network_config::Eth2NetworkConfig; use ssz::Encode; @@ -26,8 +26,6 @@ pub fn run( .handle() .ok_or("shutdown in progress")? .block_on(async move { run_async::(network_config, matches).await }) - .unwrap(); - Ok(()) } pub async fn run_async( @@ -38,6 +36,8 @@ pub async fn run_async( let source_url: SensitiveUrl = parse_required(matches, "source-url")?; let target_url: SensitiveUrl = parse_required(matches, "target-url")?; let start_block: BlockId = parse_required(matches, "start-block")?; + let maybe_common_ancestor_block: Option = + parse_optional(matches, "known–common-ancestor")?; let source = BeaconNodeHttpClient::new(source_url, Timeouts::set_all(HTTP_TIMEOUT)); let target = BeaconNodeHttpClient::new(target_url, Timeouts::set_all(HTTP_TIMEOUT)); @@ -54,6 +54,13 @@ pub async fn run_async( next_block_id = BlockId::Root(block.parent_root()); blocks.push((block.slot(), publish_block_req)); + if let Some(ref common_ancestor_block) = maybe_common_ancestor_block { + if common_ancestor_block == &next_block_id { + println!("reached known common ancestor: {next_block_id:?}"); + break; + } + } + let block_exists_in_target = target .get_beacon_blocks_ssz::(next_block_id, spec) .await @@ -69,7 +76,12 @@ pub async fn run_async( for (slot, block) in blocks.iter().rev() { println!("posting block at slot {slot}"); if let Err(e) = target.post_beacon_blocks(block).await { - println!("error posting {slot}: {e:?}"); + if let Error::ServerMessage(ref e) = e { + if e.code == 202 { + continue; + } + } + return Err(format!("error posting {slot}: {e:?}")); } else { println!("success"); } From ea13ab008a88f5be44cac538ab4516fe0388c9ca Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 9 Jul 2024 13:46:24 +1000 Subject: [PATCH 5/8] Rename rescue command to `http-sync` --- lcli/src/{rescue.rs => http_sync.rs} | 0 lcli/src/main.rs | 10 +++++----- 2 files changed, 5 insertions(+), 5 deletions(-) rename lcli/src/{rescue.rs => http_sync.rs} (100%) diff --git a/lcli/src/rescue.rs b/lcli/src/http_sync.rs similarity index 100% rename from lcli/src/rescue.rs rename to lcli/src/http_sync.rs diff --git a/lcli/src/main.rs b/lcli/src/main.rs index 7ca3d2e91b2..e49eed83c62 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -1,11 +1,11 @@ mod block_root; mod check_deposit_data; mod generate_bootnode_enr; +mod http_sync; mod indexed_attestations; mod mnemonic_validators; mod mock_el; mod parse_ssz; -mod rescue; mod skip_slots; mod state_root; mod transition_blocks; @@ -554,7 +554,7 @@ fn main() { ) ) .subcommand( - Command::new("rescue") + Command::new("http-sync") .about("Manual sync") .arg( Arg::new("start-block") @@ -717,10 +717,10 @@ fn run(env_builder: EnvironmentBuilder, matches: &ArgMatches) -> } Some(("mock-el", matches)) => mock_el::run::(env, matches) .map_err(|e| format!("Failed to run mock-el command: {}", e)), - Some(("rescue", matches)) => { + Some(("http-sync", matches)) => { let network_config = get_network_config()?; - rescue::run::(env, network_config, matches) - .map_err(|e| format!("Failed to run rescue command: {}", e)) + http_sync::run::(env, network_config, matches) + .map_err(|e| format!("Failed to run http-sync command: {}", e)) } Some((other, _)) => Err(format!("Unknown subcommand {}. See --help.", other)), _ => Err("No subcommand provided. See --help.".to_string()), From 09fd650362185ba7fb260ee17a56603139266d7e Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 9 Jul 2024 14:09:09 +1000 Subject: [PATCH 6/8] Add logging --- lcli/src/http_sync.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/lcli/src/http_sync.rs b/lcli/src/http_sync.rs index 51aabfbfac7..7b539a33e8a 100644 --- a/lcli/src/http_sync.rs +++ b/lcli/src/http_sync.rs @@ -78,6 +78,7 @@ pub async fn run_async( if let Err(e) = target.post_beacon_blocks(block).await { if let Error::ServerMessage(ref e) = e { if e.code == 202 { + println!("duplicate block detected while posting block at slot {slot}"); continue; } } From 5abd3c483ef047a07036fbeadcc0453266b12a94 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 9 Jul 2024 15:41:01 +1000 Subject: [PATCH 7/8] Add optional `--block-cache-dir` cli arg and create directory if it doesn't already exist. --- lcli/src/http_sync.rs | 17 ++++++++++++++--- lcli/src/main.rs | 8 ++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/lcli/src/http_sync.rs b/lcli/src/http_sync.rs index 7b539a33e8a..c42b10e8419 100644 --- a/lcli/src/http_sync.rs +++ b/lcli/src/http_sync.rs @@ -7,6 +7,7 @@ use eth2::{ }; use eth2_network_config::Eth2NetworkConfig; use ssz::Encode; +use std::fs; use std::fs::File; use std::io::{Read, Write}; use std::path::PathBuf; @@ -15,6 +16,7 @@ use std::time::Duration; use types::EthSpec; const HTTP_TIMEOUT: Duration = Duration::from_secs(3600); +const DEFAULT_CACHE_DIR: &str = "./cache"; pub fn run( env: Environment, @@ -38,17 +40,25 @@ pub async fn run_async( let start_block: BlockId = parse_required(matches, "start-block")?; let maybe_common_ancestor_block: Option = parse_optional(matches, "known–common-ancestor")?; + let cache_dir_path: PathBuf = + parse_optional(matches, "block-cache-dir")?.unwrap_or(DEFAULT_CACHE_DIR.into()); let source = BeaconNodeHttpClient::new(source_url, Timeouts::set_all(HTTP_TIMEOUT)); let target = BeaconNodeHttpClient::new(target_url, Timeouts::set_all(HTTP_TIMEOUT)); + if !cache_dir_path.exists() { + fs::create_dir_all(&cache_dir_path) + .map_err(|e| format!("Unable to create block cache dir: {:?}", e))?; + } + // 1. Download blocks back from head, looking for common ancestor. let mut blocks = vec![]; let mut next_block_id = start_block; loop { println!("downloading {next_block_id:?}"); - let publish_block_req = get_block_from_source::(&source, next_block_id, spec).await; + let publish_block_req = + get_block_from_source::(&source, next_block_id, spec, &cache_dir_path).await; let block = publish_block_req.signed_block(); next_block_id = BlockId::Root(block.parent_root()); @@ -97,8 +107,9 @@ async fn get_block_from_source( source: &BeaconNodeHttpClient, block_id: BlockId, spec: &ChainSpec, + cache_dir_path: &PathBuf, ) -> PublishBlockRequest { - let mut cache_path = PathBuf::from(format!("./cache/block_{block_id}")); + let mut cache_path = cache_dir_path.join(format!("block_{block_id}")); if cache_path.exists() { let mut f = File::open(&cache_path).unwrap(); @@ -132,7 +143,7 @@ async fn get_block_from_source( }; let publish_block_req = PublishBlockRequest::BlockContents(block_contents); - cache_path = PathBuf::from(format!("./cache/block_{block_root:?}")); + cache_path = cache_dir_path.join(format!("block_{block_root:?}")); let mut f = File::create(&cache_path).unwrap(); f.write_all(&publish_block_req.as_ssz_bytes()).unwrap(); diff --git a/lcli/src/main.rs b/lcli/src/main.rs index e49eed83c62..380aeb6aceb 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -612,6 +612,14 @@ fn main() { .help("Block ID of common ancestor, if known.") .display_order(0) ) + .arg( + Arg::new("block-cache-dir") + .long("block-cache-dir") + .value_name("PATH") + .action(ArgAction::Set) + .help("Directory to keep a cache of the downloaded SSZ blocks.") + .display_order(0) + ) ) .get_matches(); From 7587d5afa9ea33421ee60668af942e9103582cb4 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 10 Jul 2024 20:03:05 +1000 Subject: [PATCH 8/8] Lint fix. --- lcli/src/http_sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lcli/src/http_sync.rs b/lcli/src/http_sync.rs index c42b10e8419..1ef40e63978 100644 --- a/lcli/src/http_sync.rs +++ b/lcli/src/http_sync.rs @@ -10,7 +10,7 @@ use ssz::Encode; use std::fs; use std::fs::File; use std::io::{Read, Write}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use types::EthSpec; @@ -107,7 +107,7 @@ async fn get_block_from_source( source: &BeaconNodeHttpClient, block_id: BlockId, spec: &ChainSpec, - cache_dir_path: &PathBuf, + cache_dir_path: &Path, ) -> PublishBlockRequest { let mut cache_path = cache_dir_path.join(format!("block_{block_id}"));