diff --git a/Cargo.lock b/Cargo.lock index 3bcb5680..6c6794d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -647,6 +647,32 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "types", +] + +[[package]] +name = "anchor_validator_store" +version = "0.1.0" +dependencies = [ + "beacon_node_fallback", + "dashmap", + "eth2", + "futures", + "parking_lot", + "qbft", + "qbft_manager", + "safe_arith", + "signature_collector", + "slashing_protection", + "slot_clock", + "ssv_types", + "task_executor", + "tokio", + "tracing", + "types", + "validator_metrics", + "validator_services", + "validator_store", ] [[package]] @@ -1163,6 +1189,24 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "beacon_node_fallback" +version = "0.1.0" +source = "git+https://github.com/sigp/lighthouse?branch=anchor#997991f5513f22bb240816c2e2400cf6a1819a0c" +dependencies = [ + "eth2", + "futures", + "itertools 0.10.5", + "serde", + "slot_clock", + "strum 0.24.1", + "task_executor", + "tokio", + "tracing", + "types", + "validator_metrics", +] + [[package]] name = "bimap" version = "0.6.3" @@ -1496,8 +1540,12 @@ dependencies = [ name = "client" version = "0.1.0" dependencies = [ + "anchor_validator_store", + "beacon_node_fallback", "clap", "dirs 5.0.1", + "eth2", + "eth2_config", "ethereum_hashing", "fdlimit", "http_api", @@ -1506,13 +1554,21 @@ dependencies = [ "network", "parking_lot", "processor", + "qbft_manager", "sensitive_url", "serde", + "signature_collector", + "slashing_protection", + "slot_clock", + "ssv_types", "strum 0.24.1", "task_executor", "tokio", "tracing", + "types", "unused_port", + "validator_metrics", + "validator_services", "version", ] @@ -2946,6 +3002,17 @@ dependencies = [ "web-time", ] +[[package]] +name = "graffiti_file" +version = "0.1.0" +source = "git+https://github.com/sigp/lighthouse?branch=anchor#997991f5513f22bb240816c2e2400cf6a1819a0c" +dependencies = [ + "bls", + "serde", + "slog", + "types", +] + [[package]] name = "group" version = "0.13.0" @@ -7778,6 +7845,38 @@ dependencies = [ "metrics", ] +[[package]] +name = "validator_services" +version = "0.1.0" +source = "git+https://github.com/sigp/lighthouse?branch=anchor#997991f5513f22bb240816c2e2400cf6a1819a0c" +dependencies = [ + "beacon_node_fallback", + "bls", + "eth2", + "futures", + "graffiti_file", + "logging", + "parking_lot", + "safe_arith", + "slot_clock", + "task_executor", + "tokio", + "tracing", + "tree_hash", + "types", + "validator_metrics", + "validator_store", +] + +[[package]] +name = "validator_store" +version = "0.1.0" +source = "git+https://github.com/sigp/lighthouse?branch=anchor#997991f5513f22bb240816c2e2400cf6a1819a0c" +dependencies = [ + "slashing_protection", + "types", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index ba41b737..89e8c656 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "anchor/processor", "anchor/qbft_manager", "anchor/signature_collector", + "anchor/validator_store", ] resolver = "2" @@ -22,6 +23,7 @@ edition = "2021" # This table has three subsections: first the internal dependencies, then the lighthouse dependencies, then all other. [workspace.dependencies] +anchor_validator_store = { path = "anchor/validator_store" } client = { path = "anchor/client" } database = { path = "anchor/database" } eth = { path = "anchor/eth" } @@ -30,14 +32,20 @@ http_metrics = { path = "anchor/http_metrics" } network = { path = "anchor/network" } processor = { path = "anchor/processor" } qbft = { path = "anchor/common/qbft" } -qbft_manager = { path = "anchor/common/qbft" } +qbft_manager = { path = "anchor/qbft_manager" } +signature_collector = { path = "anchor/signature_collector" } ssv_types = { path = "anchor/common/ssv_types" } version = { path = "anchor/common/version" } +beacon_node_fallback = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } +eth2 = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } +eth2_config = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } health_metrics = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } lighthouse_network = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } metrics = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } +safe_arith = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } sensitive_url = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } +slashing_protection = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } slot_clock = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } task_executor = { git = "https://github.com/sigp/lighthouse", branch = "anchor", default-features = false, features = [ "tracing", @@ -45,6 +53,8 @@ task_executor = { git = "https://github.com/sigp/lighthouse", branch = "anchor", types = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } unused_port = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } validator_metrics = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } +validator_services = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } +validator_store = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } alloy = { version = "0.6.4", features = [ "sol-types", diff --git a/anchor/Cargo.toml b/anchor/Cargo.toml index 7cbea889..4189887e 100644 --- a/anchor/Cargo.toml +++ b/anchor/Cargo.toml @@ -17,6 +17,7 @@ task_executor = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } +types = { workspace = true } [dev-dependencies] regex = "1.10.6" diff --git a/anchor/client/Cargo.toml b/anchor/client/Cargo.toml index dfb3c40d..84562eba 100644 --- a/anchor/client/Cargo.toml +++ b/anchor/client/Cargo.toml @@ -9,8 +9,12 @@ name = "client" path = "src/lib.rs" [dependencies] +anchor_validator_store = { workspace = true } +beacon_node_fallback = { workspace = true } clap = { workspace = true } dirs = { workspace = true } +eth2 = { workspace = true } +eth2_config = { workspace = true } ethereum_hashing = "0.7.0" fdlimit = "0.3" http_api = { workspace = true } @@ -19,11 +23,19 @@ hyper = { workspace = true } network = { workspace = true } parking_lot = { workspace = true } processor = { workspace = true } +qbft_manager = { workspace = true } sensitive_url = { workspace = true } serde = { workspace = true } +signature_collector = { workspace = true } +slashing_protection = { workspace = true } +slot_clock = { workspace = true } +ssv_types = { workspace = true } strum = { workspace = true } task_executor = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +types = { workspace = true } unused_port = { workspace = true } +validator_metrics = { workspace = true } +validator_services = { workspace = true } version = { workspace = true } diff --git a/anchor/client/src/config.rs b/anchor/client/src/config.rs index 8f8a12a1..235a5cce 100644 --- a/anchor/client/src/config.rs +++ b/anchor/client/src/config.rs @@ -30,10 +30,14 @@ pub struct Config { /// /// Should be similar to `["http://localhost:8080"]` pub beacon_nodes: Vec, + /// An optional beacon node used for block proposals only. + pub proposer_nodes: Vec, /// The http endpoints of the execution node APIs. pub execution_nodes: Vec, /// beacon node is not synced at startup. pub allow_unsynced_beacon_node: bool, + /// If true, use longer timeouts for requests made to the beacon node. + pub use_long_timeouts: bool, /// Configuration for the HTTP REST API. pub http_api: http_api::Config, /// Configuration for the network stack. @@ -69,8 +73,10 @@ impl Default for Config { data_dir, secrets_dir, beacon_nodes, + proposer_nodes: vec![], execution_nodes, allow_unsynced_beacon_node: false, + use_long_timeouts: false, http_api: <_>::default(), http_metrics: <_>::default(), network: <_>::default(), diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index c4b67402..2be725f4 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -3,21 +3,69 @@ mod cli; pub mod config; +use anchor_validator_store::sync_committee_service::SyncCommitteeService; +use anchor_validator_store::AnchorValidatorStore; +use beacon_node_fallback::{ + start_fallback_updater_service, ApiTopic, BeaconNodeFallback, CandidateBeaconNode, +}; pub use cli::Anchor; use config::Config; +use eth2::reqwest::{Certificate, ClientBuilder}; +use eth2::{BeaconNodeHttpClient, Timeouts}; +use eth2_config::Eth2Config; use network::Network; use parking_lot::RwLock; +use qbft_manager::QbftManager; +use sensitive_url::SensitiveUrl; +use signature_collector::SignatureCollectorManager; +use slashing_protection::SlashingDatabase; +use slot_clock::{SlotClock, SystemTimeSlotClock}; +use ssv_types::OperatorId; +use std::fs::File; +use std::io::Read; use std::net::SocketAddr; +use std::path::Path; use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use task_executor::TaskExecutor; use tokio::net::TcpListener; -use tracing::{debug, error, info}; +use tokio::sync::mpsc; +use tokio::time::sleep; +use tracing::{debug, error, info, warn}; +use types::{EthSpec, Hash256}; +use validator_metrics::set_gauge; +use validator_services::attestation_service::AttestationServiceBuilder; +use validator_services::block_service::BlockServiceBuilder; +use validator_services::duties_service; +use validator_services::duties_service::DutiesServiceBuilder; +use validator_services::preparation_service::PreparationServiceBuilder; + +/// The filename within the `validators` directory that contains the slashing protection DB. +const SLASHING_PROTECTION_FILENAME: &str = "slashing_protection.sqlite"; + +/// The time between polls when waiting for genesis. +const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12); + +/// Specific timeout constants for HTTP requests involved in different validator duties. +/// This can help ensure that proper endpoint fallback occurs. +const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24; +const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2; +const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4; +const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4; +const HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT: u32 = 4; pub struct Client {} impl Client { /// Runs the Anchor Client - pub async fn run(executor: TaskExecutor, config: Config) -> Result<(), String> { + pub async fn run(executor: TaskExecutor, config: Config) -> Result<(), String> { // Attempt to raise soft fd limit. The behavior is OS specific: // `linux` - raise soft fd limit to hard // `macos` - raise soft fd limit to `min(kernel limit, hard fd limit)` @@ -42,11 +90,11 @@ impl Client { "Starting the Anchor client" ); - // Start the processor - let _processor_senders = processor::spawn(config.processor, executor.clone()); + // TODO make configurable + let spec = Eth2Config::mainnet().spec; // Optionally start the metrics server. - let _http_metrics_shared_state = if config.http_metrics.enabled { + let http_metrics_shared_state = if config.http_metrics.enabled { let shared_state = Arc::new(RwLock::new(http_metrics::Shared { genesis_time: None })); let exit = executor.exit(); @@ -77,6 +125,447 @@ impl Client { // Spawn the network listening task executor.spawn(network.run(), "network"); + // Initialize slashing protection. + let slashing_db_path = config.data_dir.join(SLASHING_PROTECTION_FILENAME); + let slashing_protection = + SlashingDatabase::open_or_create(&slashing_db_path).map_err(|e| { + format!( + "Failed to open or create slashing protection database: {:?}", + e + ) + })?; + + let last_beacon_node_index = config + .beacon_nodes + .len() + .checked_sub(1) + .ok_or_else(|| "No beacon nodes defined.".to_string())?; + + let beacon_node_setup = |x: (usize, &SensitiveUrl)| { + let i = x.0; + let url = x.1; + let slot_duration = Duration::from_secs(spec.seconds_per_slot); + + let mut beacon_node_http_client_builder = ClientBuilder::new(); + + // Add new custom root certificates if specified. + if let Some(certificates) = &config.beacon_nodes_tls_certs { + for cert in certificates { + beacon_node_http_client_builder = beacon_node_http_client_builder + .add_root_certificate(load_pem_certificate(cert)?); + } + } + + let beacon_node_http_client = beacon_node_http_client_builder + // Set default timeout to be the full slot duration. + .timeout(slot_duration) + .build() + .map_err(|e| format!("Unable to build HTTP client: {:?}", e))?; + + // Use quicker timeouts if a fallback beacon node exists. + let timeouts = if i < last_beacon_node_index && !config.use_long_timeouts { + info!("Fallback endpoints are available, using optimized timeouts."); + Timeouts { + attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT, + attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT, + attestation_subscriptions: slot_duration + / HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT, + liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT, + proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT, + proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT, + sync_committee_contribution: slot_duration + / HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT, + sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT, + get_beacon_blocks_ssz: slot_duration + / HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT, + get_debug_beacon_states: slot_duration / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT, + get_deposit_snapshot: slot_duration / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT, + get_validator_block: slot_duration / HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT, + } + } else { + Timeouts::set_all(slot_duration) + }; + + Ok(BeaconNodeHttpClient::from_components( + url.clone(), + beacon_node_http_client, + timeouts, + )) + }; + + let beacon_nodes: Vec = config + .beacon_nodes + .iter() + .enumerate() + .map(beacon_node_setup) + .collect::, String>>()?; + + let proposer_nodes: Vec = config + .proposer_nodes + .iter() + .enumerate() + .map(beacon_node_setup) + .collect::, String>>()?; + + let num_nodes = beacon_nodes.len(); + // User order of `beacon_nodes` is preserved, so `index` corresponds to the position of + // the node in `--beacon_nodes`. + let candidates = beacon_nodes + .into_iter() + .enumerate() + .map(|(index, node)| CandidateBeaconNode::new(node, index)) + .collect(); + + // User order of `proposer_nodes` is preserved, so `index` corresponds to the position of + // the node in `--proposer_nodes`. + let proposer_candidates = proposer_nodes + .into_iter() + .enumerate() + .map(|(index, node)| CandidateBeaconNode::new(node, index)) + .collect(); + + // Set the count for beacon node fallbacks excluding the primary beacon node. + set_gauge( + &validator_metrics::ETH2_FALLBACK_CONFIGURED, + num_nodes.saturating_sub(1) as i64, + ); + // Set the total beacon node count. + set_gauge( + &validator_metrics::TOTAL_BEACON_NODES_COUNT, + num_nodes as i64, + ); + + // Initialize the number of connected, synced beacon nodes to 0. + set_gauge(&validator_metrics::ETH2_FALLBACK_CONNECTED, 0); + set_gauge(&validator_metrics::SYNCED_BEACON_NODES_COUNT, 0); + // Initialize the number of connected, avaliable beacon nodes to 0. + set_gauge(&validator_metrics::AVAILABLE_BEACON_NODES_COUNT, 0); + + let mut beacon_nodes: BeaconNodeFallback<_> = BeaconNodeFallback::new( + candidates, + beacon_node_fallback::Config::default(), // TODO make configurable + vec![ApiTopic::Subscriptions], // TODO make configurable + spec.clone(), + ); + + let mut proposer_nodes: BeaconNodeFallback<_> = BeaconNodeFallback::new( + proposer_candidates, + beacon_node_fallback::Config::default(), // TODO make configurable + vec![ApiTopic::Subscriptions], // TODO make configurable + spec.clone(), + ); + + // Perform some potentially long-running initialization tasks. + let (genesis_time, genesis_validators_root) = tokio::select! { + tuple = init_from_beacon_node::(&beacon_nodes, &proposer_nodes) => tuple?, + () = executor.exit() => return Err("Shutting down".to_string()) + }; + + let slot_clock = SystemTimeSlotClock::new( + spec.genesis_slot, + Duration::from_secs(genesis_time), + Duration::from_secs(spec.seconds_per_slot), + ); + + beacon_nodes.set_slot_clock(slot_clock.clone()); + proposer_nodes.set_slot_clock(slot_clock.clone()); + + let beacon_nodes = Arc::new(beacon_nodes); + start_fallback_updater_service::<_, E>(executor.clone(), beacon_nodes.clone())?; + + let proposer_nodes = Arc::new(proposer_nodes); + start_fallback_updater_service::<_, E>(executor.clone(), proposer_nodes.clone())?; + + // Start the processor + let processor_senders = processor::spawn(config.processor, executor.clone()); + + // Create the processor-adjacent managers + let signature_collector = + Arc::new(SignatureCollectorManager::new(processor_senders.clone())); + let Ok(qbft_manager) = + QbftManager::new(processor_senders.clone(), OperatorId(1), slot_clock.clone()) + else { + return Err("Unable to initialize qbft manager".into()); + }; + + let validator_store = Arc::new(AnchorValidatorStore::<_, E>::new( + signature_collector, + qbft_manager, + slashing_protection, + spec.clone(), + genesis_validators_root, + OperatorId(123), + )); + + let duties_service = Arc::new( + DutiesServiceBuilder::new() + .slot_clock(slot_clock.clone()) + .beacon_nodes(beacon_nodes.clone()) + .validator_store(validator_store.clone()) + .spec(spec.clone()) + .executor(executor.clone()) + //.enable_high_validator_count_metrics(config.enable_high_validator_count_metrics) + .distributed(true) + .build()?, + ); + + // Update the metrics server. + if let Some(ctx) = &http_metrics_shared_state { + ctx.write().genesis_time = Some(genesis_time); + //ctx.write().validator_store = Some(validator_store.clone()); + //ctx.write().duties_service = Some(duties_service.clone()); + } + + let mut block_service_builder = BlockServiceBuilder::new() + .slot_clock(slot_clock.clone()) + .validator_store(validator_store.clone()) + .beacon_nodes(beacon_nodes.clone()) + .executor(executor.clone()) + .chain_spec(spec.clone()); + //.graffiti(config.graffiti) + //.graffiti_file(config.graffiti_file.clone()); + + // If we have proposer nodes, add them to the block service builder. + if proposer_nodes.num_total().await > 0 { + block_service_builder = block_service_builder.proposer_nodes(proposer_nodes.clone()); + } + + let block_service = block_service_builder.build()?; + + let attestation_service = AttestationServiceBuilder::new() + .duties_service(duties_service.clone()) + .slot_clock(slot_clock.clone()) + .validator_store(validator_store.clone()) + .beacon_nodes(beacon_nodes.clone()) + .executor(executor.clone()) + .chain_spec(spec.clone()) + .build()?; + + let preparation_service = PreparationServiceBuilder::new() + .slot_clock(slot_clock.clone()) + .validator_store(validator_store.clone()) + .beacon_nodes(beacon_nodes.clone()) + .executor(executor.clone()) + //.builder_registration_timestamp_override(config.builder_registration_timestamp_override) + .validator_registration_batch_size(500) + .build()?; + + let sync_committee_service = SyncCommitteeService::new( + duties_service.clone(), + validator_store.clone(), + slot_clock.clone(), + beacon_nodes.clone(), + executor.clone(), + ); + + // We use `SLOTS_PER_EPOCH` as the capacity of the block notification channel, because + // we don't expect notifications to be delayed by more than a single slot, let alone a + // whole epoch! + let channel_capacity = E::slots_per_epoch() as usize; + let (block_service_tx, block_service_rx) = mpsc::channel(channel_capacity); + + // Wait until genesis has occurred. + wait_for_genesis(&beacon_nodes, genesis_time).await?; + + duties_service::start_update_service(duties_service.clone(), block_service_tx); + + block_service + .start_update_service(block_service_rx) + .map_err(|e| format!("Unable to start block service: {}", e))?; + + attestation_service + .start_update_service(&spec) + .map_err(|e| format!("Unable to start attestation service: {}", e))?; + + sync_committee_service + .start_update_service(&spec) + .map_err(|e| format!("Unable to start sync committee service: {}", e))?; + + preparation_service + .start_update_service(&spec) + .map_err(|e| format!("Unable to start preparation service: {}", e))?; + Ok(()) } } + +async fn init_from_beacon_node( + beacon_nodes: &BeaconNodeFallback, + proposer_nodes: &BeaconNodeFallback, +) -> Result<(u64, Hash256), String> { + const RETRY_DELAY: Duration = Duration::from_secs(2); + + loop { + beacon_nodes.update_all_candidates::().await; + proposer_nodes.update_all_candidates::().await; + + let num_available = beacon_nodes.num_available().await; + let num_total = beacon_nodes.num_total().await; + + let proposer_available = proposer_nodes.num_available().await; + let proposer_total = proposer_nodes.num_total().await; + + if proposer_total > 0 && proposer_available == 0 { + warn!( + retry_in = format!("{} seconds", RETRY_DELAY.as_secs()), + total_proposers = proposer_total, + available_proposers = proposer_available, + total_beacon_nodes = num_total, + available_beacon_nodes = num_available, + "Unable to connect to a proposer node" + ); + } + + if num_available > 0 && proposer_available == 0 { + info!( + total = num_total, + available = num_available, + "Initialized beacon node connections" + ); + break; + } else if num_available > 0 { + info!( + total = num_total, + available = num_available, + proposers_available = proposer_available, + proposers_total = proposer_total, + "Initialized beacon node connections" + ); + break; + } else { + warn!( + retry_in = format!("{} seconds", RETRY_DELAY.as_secs()), + total = num_total, + available = num_available, + "Unable to connect to a beacon node" + ); + sleep(RETRY_DELAY).await; + } + } + + let genesis = loop { + match beacon_nodes + .first_success(|node| async move { node.get_beacon_genesis().await }) + .await + { + Ok(genesis) => break genesis.data, + Err(errors) => { + // Search for a 404 error which indicates that genesis has not yet + // occurred. + if errors + .0 + .iter() + .filter_map(|(_, e)| e.request_failure()) + .any(|e| e.status() == Some(eth2::StatusCode::NOT_FOUND)) + { + info!("Waiting for genesis",); + } else { + error!( + error = ?errors.0, + "Errors polling beacon node", + ); + } + } + } + + sleep(RETRY_DELAY).await; + }; + + Ok((genesis.genesis_time, genesis.genesis_validators_root)) +} + +async fn wait_for_genesis( + beacon_nodes: &BeaconNodeFallback, + genesis_time: u64, +) -> Result<(), String> { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| format!("Unable to read system time: {:?}", e))?; + let genesis_time = Duration::from_secs(genesis_time); + + // If the time now is less than (prior to) genesis, then delay until the + // genesis instant. + // + // If the validator client starts before genesis, it will get errors from + // the slot clock. + if now < genesis_time { + info!( + seconds_to_wait = (genesis_time - now).as_secs(), + "Starting node prior to genesis", + ); + + // Start polling the node for pre-genesis information, cancelling the polling as soon as the + // timer runs out. + tokio::select! { + result = poll_whilst_waiting_for_genesis(beacon_nodes, genesis_time) => result?, + () = sleep(genesis_time - now) => () + }; + + info!( + ms_since_genesis = (genesis_time - now).as_millis(), + "Genesis has occurred", + ); + } else { + info!( + seconds_ago = (now - genesis_time).as_secs(), + "Genesis has already occurred", + ); + } + + Ok(()) +} + +/// Request the version from the node, looping back and trying again on failure. Exit once the node +/// has been contacted. +async fn poll_whilst_waiting_for_genesis( + beacon_nodes: &BeaconNodeFallback, + genesis_time: Duration, +) -> Result<(), String> { + loop { + match beacon_nodes + .first_success(|beacon_node| async move { beacon_node.get_lighthouse_staking().await }) + .await + { + Ok(is_staking) => { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| format!("Unable to read system time: {:?}", e))?; + + if !is_staking { + error!( + msg = "this will caused missed duties", + info = "see the --staking CLI flag on the beacon node", + "Staking is disabled for beacon node" + ); + } + + if now < genesis_time { + info!( + bn_staking_enabled = is_staking, + seconds_to_wait = (genesis_time - now).as_secs(), + "Waiting for genesis" + ); + } else { + break Ok(()); + } + } + Err(e) => { + error!( + error = ?e.0, + "Error polling beacon node", + ); + } + } + + sleep(WAITING_FOR_GENESIS_POLL_TIME).await; + } +} + +pub fn load_pem_certificate>(pem_path: P) -> Result { + let mut buf = Vec::new(); + File::open(&pem_path) + .map_err(|e| format!("Unable to open certificate path: {}", e))? + .read_to_end(&mut buf) + .map_err(|e| format!("Unable to read certificate file: {}", e))?; + Certificate::from_pem(&buf).map_err(|e| format!("Unable to parse certificate: {}", e)) +} diff --git a/anchor/common/qbft/src/error.rs b/anchor/common/qbft/src/error.rs index 72928bf9..5290e1bd 100644 --- a/anchor/common/qbft/src/error.rs +++ b/anchor/common/qbft/src/error.rs @@ -1,5 +1,5 @@ /// Error associated with Config building. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ConfigBuilderError { /// No participants were specified NoParticipants, diff --git a/anchor/common/ssv_types/src/message.rs b/anchor/common/ssv_types/src/message.rs index c47052b7..a60d9e6f 100644 --- a/anchor/common/ssv_types/src/message.rs +++ b/anchor/common/ssv_types/src/message.rs @@ -174,7 +174,7 @@ pub struct Contribution { pub contribution: SyncCommitteeContribution, } -#[derive(Clone, Debug, TreeHash)] +#[derive(Clone, Debug, TreeHash, PartialEq, Eq)] pub struct BeaconVote { pub block_root: Hash256, pub source: Checkpoint, diff --git a/anchor/qbft_manager/src/lib.rs b/anchor/qbft_manager/src/lib.rs index d95a1c8d..6f960a0a 100644 --- a/anchor/qbft_manager/src/lib.rs +++ b/anchor/qbft_manager/src/lib.rs @@ -35,12 +35,12 @@ pub struct CommitteeInstanceId { #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct ValidatorInstanceId { pub validator: PublicKeyBytes, - pub duty: ValidatorDuty, + pub duty: ValidatorDutyKind, pub instance_height: InstanceHeight, } #[derive(Debug, Clone, Hash, PartialEq, Eq)] -pub enum ValidatorDuty { +pub enum ValidatorDutyKind { Proposal, Aggregator, SyncCommitteeAggregator, @@ -345,7 +345,7 @@ async fn qbft_instance(mut rx: UnboundedReceiver>) } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum QbftError { QueueClosedError, QueueFullError, diff --git a/anchor/signature_collector/src/lib.rs b/anchor/signature_collector/src/lib.rs index 41e1685f..c5968d77 100644 --- a/anchor/signature_collector/src/lib.rs +++ b/anchor/signature_collector/src/lib.rs @@ -131,7 +131,7 @@ pub enum CollectorMessageKind { }, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum CollectionError { QueueClosedError, QueueFullError, diff --git a/anchor/src/main.rs b/anchor/src/main.rs index 0f4bc07f..6b632ecf 100644 --- a/anchor/src/main.rs +++ b/anchor/src/main.rs @@ -5,6 +5,7 @@ mod environment; use client::{config, Anchor, Client}; use environment::Environment; use task_executor::ShutdownReason; +use types::MainnetEthSpec; fn main() { // Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided. @@ -39,7 +40,7 @@ fn main() { // Run the main task core_executor.spawn( async move { - if let Err(e) = Client::run(anchor_executor, config).await { + if let Err(e) = Client::run::(anchor_executor, config).await { error!(reason = e, "Failed to start Anchor"); // Ignore the error since it always occurs during normal operation when // shutting down. diff --git a/anchor/validator_store/Cargo.toml b/anchor/validator_store/Cargo.toml new file mode 100644 index 00000000..e0ed9732 --- /dev/null +++ b/anchor/validator_store/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "anchor_validator_store" +version = "0.1.0" +edition = { workspace = true } +authors = ["Sigma Prime "] +rust-version = "1.81.0" + +[dependencies] +beacon_node_fallback = { workspace = true } +dashmap = { workspace = true } +eth2 = { workspace = true } +futures = "0.3.31" +parking_lot = { workspace = true } +qbft = { workspace = true } +qbft_manager = { workspace = true } +safe_arith = { workspace = true } +signature_collector = { workspace = true } +slashing_protection = { workspace = true } +slot_clock = { workspace = true } +ssv_types = { workspace = true } +task_executor = { workspace = true } +tokio = { workspace = true, features = ["sync", "time"] } +tracing = { workspace = true } +types = { workspace = true } +validator_metrics = { workspace = true } +validator_services = { workspace = true } +validator_store = { workspace = true } diff --git a/anchor/validator_store/src/lib.rs b/anchor/validator_store/src/lib.rs new file mode 100644 index 00000000..4c12b566 --- /dev/null +++ b/anchor/validator_store/src/lib.rs @@ -0,0 +1,847 @@ +pub mod sync_committee_service; + +use dashmap::DashMap; +use futures::future::join_all; +use parking_lot::Mutex; +use qbft::Completed; +use qbft_manager::{ + CommitteeInstanceId, QbftError, QbftManager, ValidatorDutyKind, ValidatorInstanceId, +}; +use safe_arith::{ArithError, SafeArith}; +use signature_collector::{CollectionError, SignatureCollectorManager, SignatureRequest}; +use slashing_protection::{NotSafe, Safe, SlashingDatabase}; +use slot_clock::SlotClock; +use ssv_types::message::{ + BeaconVote, Contribution, DataSsz, ValidatorConsensusData, ValidatorDuty, + BEACON_ROLE_AGGREGATOR, BEACON_ROLE_PROPOSER, BEACON_ROLE_SYNC_COMMITTEE_CONTRIBUTION, + DATA_VERSION_ALTAIR, DATA_VERSION_BELLATRIX, DATA_VERSION_CAPELLA, DATA_VERSION_DENEB, + DATA_VERSION_PHASE0, DATA_VERSION_UNKNOWN, +}; +use ssv_types::{Cluster, OperatorId, ValidatorMetadata}; +use std::fmt::Debug; +use std::sync::Arc; +use tracing::{error, info, warn}; +use types::attestation::Attestation; +use types::beacon_block::BeaconBlock; +use types::graffiti::Graffiti; +use types::selection_proof::SelectionProof; +use types::signed_aggregate_and_proof::SignedAggregateAndProof; +use types::signed_beacon_block::SignedBeaconBlock; +use types::signed_contribution_and_proof::SignedContributionAndProof; +use types::signed_voluntary_exit::SignedVoluntaryExit; +use types::slot_epoch::{Epoch, Slot}; +use types::sync_committee_contribution::SyncCommitteeContribution; +use types::sync_committee_message::SyncCommitteeMessage; +use types::sync_selection_proof::SyncSelectionProof; +use types::sync_subnet_id::SyncSubnetId; +use types::validator_registration_data::{ + SignedValidatorRegistrationData, ValidatorRegistrationData, +}; +use types::voluntary_exit::VoluntaryExit; +use types::{ + AbstractExecPayload, Address, AggregateAndProof, ChainSpec, ContributionAndProof, Domain, + EthSpec, Hash256, PublicKeyBytes, SecretKey, Signature, SignedRoot, + SyncAggregatorSelectionData, VariableList, +}; +use validator_store::{ + DoppelgangerStatus, Error as ValidatorStoreError, ProposalData, SignedBlock, UnsignedBlock, + ValidatorStore, +}; + +/// Number of epochs of slashing protection history to keep. +/// +/// This acts as a maximum safe-guard against clock drift. +const SLASHING_PROTECTION_HISTORY_EPOCHS: u64 = 512; + +#[derive(Clone)] +struct InitializedCluster { + cluster: Cluster, + validator_metadata: ValidatorMetadata, + decrypted_key_share: SecretKey, +} + +pub struct AnchorValidatorStore { + clusters: DashMap, + signature_collector: Arc, + qbft_manager: Arc>, + slashing_protection: SlashingDatabase, + slashing_protection_last_prune: Mutex, + spec: Arc, + genesis_validators_root: Hash256, + operator_id: OperatorId, +} + +impl AnchorValidatorStore { + pub fn new( + signature_collector: Arc, + qbft_manager: Arc>, + slashing_protection: SlashingDatabase, + spec: Arc, + genesis_validators_root: Hash256, + operator_id: OperatorId, + ) -> AnchorValidatorStore { + Self { + clusters: DashMap::new(), + signature_collector, + qbft_manager, + slashing_protection, + slashing_protection_last_prune: Mutex::new(Epoch::new(0)), + spec, + genesis_validators_root, + operator_id, + } + } + + pub fn add_cluster( + &self, + cluster: Cluster, + validator_metadata: ValidatorMetadata, + decrypted_key_share: SecretKey, + ) -> Result<(), Error> { + let pubkey_bytes = validator_metadata.public_key.compress(); + self.clusters.insert( + pubkey_bytes, + InitializedCluster { + cluster, + validator_metadata, + decrypted_key_share, + }, + ); + self.slashing_protection + .register_validator(pubkey_bytes) + .map_err(Error::Slashable) + } + + fn cluster(&self, validator_pubkey: PublicKeyBytes) -> Result { + self.clusters + .get(&validator_pubkey) + .map(|c| c.value().clone()) + .ok_or(Error::UnknownPubkey(validator_pubkey)) + } + + fn get_domain(&self, epoch: Epoch, domain: Domain) -> Hash256 { + self.spec.get_domain( + epoch, + domain, + &self.spec.fork_at_epoch(epoch), + self.genesis_validators_root, + ) + } + + async fn collect_signature( + &self, + cluster: InitializedCluster, + signing_root: Hash256, + ) -> Result { + let collector = self.signature_collector.sign_and_collect( + SignatureRequest { + cluster_id: cluster.cluster.cluster_id, + signing_root, + threshold: cluster + .cluster + .faulty + .safe_mul(2) + .and_then(|x| x.safe_add(1)) + .map_err(SpecificError::from)?, + }, + self.operator_id, + cluster.decrypted_key_share, + ); + Ok((*collector.await.map_err(SpecificError::from)?).clone()) + } + + async fn decide_abstract_block< + P: AbstractExecPayload, + F: FnOnce(BeaconBlock) -> DataSsz, + >( + &self, + validator_pubkey: PublicKeyBytes, + block: BeaconBlock, + current_slot: Slot, + wrapper: F, + ) -> Result, Error> { + // Make sure the block slot is not higher than the current slot to avoid potential attacks. + if block.slot() > current_slot { + warn!( + block_slot = block.slot().as_u64(), + current_slot = current_slot.as_u64(), + "Not signing block with slot greater than current slot", + ); + return Err(Error::GreaterThanCurrentSlot { + slot: block.slot(), + current_slot, + }); + } + + let cluster = self.cluster(validator_pubkey)?; + + // first, we have to get to consensus + let completed = self + .qbft_manager + .decide_instance( + ValidatorInstanceId { + validator: validator_pubkey, + duty: ValidatorDutyKind::Proposal, + instance_height: block.slot().as_usize().into(), + }, + ValidatorConsensusData { + duty: ValidatorDuty { + r#type: BEACON_ROLE_PROPOSER, + pub_key: validator_pubkey, + slot: block.slot().as_usize().into(), + validator_index: cluster.validator_metadata.index, + committee_index: 0, + committee_length: 0, + committees_at_slot: 0, + validator_committee_index: 0, + validator_sync_committee_indices: Default::default(), + }, + version: match &block { + BeaconBlock::Base(_) => DATA_VERSION_PHASE0, + BeaconBlock::Altair(_) => DATA_VERSION_ALTAIR, + BeaconBlock::Bellatrix(_) => DATA_VERSION_BELLATRIX, + BeaconBlock::Capella(_) => DATA_VERSION_CAPELLA, + BeaconBlock::Deneb(_) => DATA_VERSION_DENEB, + BeaconBlock::Electra(_) => DATA_VERSION_UNKNOWN, + }, + data_ssz: Box::new(wrapper(block)), + }, + &cluster.cluster, + ) + .await + .map_err(SpecificError::from)?; + let data = match completed { + Completed::TimedOut => return Err(Error::SpecificError(SpecificError::Timeout)), + Completed::Success(data) => data, + }; + Ok(*data.data_ssz) + } + + async fn sign_abstract_block>( + &self, + validator_pubkey: PublicKeyBytes, + block: BeaconBlock, + ) -> Result, Error> { + let domain_hash = self.get_domain(block.epoch(), Domain::BeaconProposer); + + let header = block.block_header(); + handle_slashing_check_result( + self.slashing_protection.check_and_insert_block_proposal( + &validator_pubkey, + &header, + domain_hash, + ), + &header, + "block", + )?; + + let signing_root = block.signing_root(domain_hash); + let signature = self + .collect_signature(self.cluster(validator_pubkey)?, signing_root) + .await?; + Ok(SignedBeaconBlock::from_block(block, signature)) + } + + pub async fn produce_sync_committee_signature_with_full_vote( + &self, + slot: Slot, + vote: BeaconVote, + validator_index: u64, + validator_pubkey: &PublicKeyBytes, + ) -> Result { + let epoch = slot.epoch(E::slots_per_epoch()); + let cluster = self.cluster(*validator_pubkey)?; + let beacon_block_root = vote.block_root; + + let completed = self + .qbft_manager + .decide_instance( + CommitteeInstanceId { + committee: cluster.cluster.cluster_id, + instance_height: slot.as_usize().into(), + }, + vote, + &cluster.cluster, + ) + .await + .map_err(SpecificError::from)?; + let data = match completed { + Completed::TimedOut => return Err(Error::SpecificError(SpecificError::Timeout)), + Completed::Success(data) => data, + }; + + let domain = self.get_domain(epoch, Domain::SyncCommittee); + let signing_root = data.block_root.signing_root(domain); + let signature = self.collect_signature(cluster, signing_root).await?; + + Ok(SyncCommitteeMessage { + slot, + beacon_block_root, + validator_index, + signature, + }) + } + + pub async fn produce_signed_contribution_and_proofs( + &self, + aggregator_index: u64, + aggregator_pubkey: PublicKeyBytes, + signing_data: Vec>, + ) -> Vec, Error>> { + let error = |err: Error| signing_data.iter().map(move |_| Err(err.clone())).collect(); + + let Some(slot) = signing_data.first().map(|data| data.contribution.slot) else { + return vec![]; + }; + let epoch = slot.epoch(E::slots_per_epoch()); + let cluster = match self.cluster(aggregator_pubkey) { + Ok(cluster) => cluster, + Err(err) => return error(err), + }; + + let data = match VariableList::new( + signing_data + .iter() + .map(|signing_data| Contribution { + selection_proof_sig: signing_data.selection_proof.clone().into(), + contribution: signing_data.contribution.clone(), + }) + .collect(), + ) { + Ok(data) => data, + Err(_) => return error(SpecificError::TooManySyncSubnetsToSign.into()), + }; + + let completed = self + .qbft_manager + .decide_instance( + ValidatorInstanceId { + validator: aggregator_pubkey, + duty: ValidatorDutyKind::SyncCommitteeAggregator, + instance_height: slot.as_usize().into(), + }, + ValidatorConsensusData { + duty: ValidatorDuty { + r#type: BEACON_ROLE_SYNC_COMMITTEE_CONTRIBUTION, + pub_key: aggregator_pubkey, + slot, + validator_index: cluster.validator_metadata.index, + committee_index: 0, + committee_length: 0, + committees_at_slot: 0, + validator_committee_index: aggregator_index, + validator_sync_committee_indices: Default::default(), + }, + version: DATA_VERSION_PHASE0, + data_ssz: Box::new(DataSsz::Contributions(data)), + }, + &cluster.cluster, + ) + .await; + let data = match completed { + Ok(Completed::Success(data)) => data, + Ok(Completed::TimedOut) => return error(SpecificError::Timeout.into()), + Err(err) => return error(SpecificError::QbftError(err).into()), + }; + let data = match *data.data_ssz { + DataSsz::Contributions(data) => data, + _ => return error(SpecificError::InvalidQbftData.into()), + }; + + let domain_hash = self.get_domain(epoch, Domain::ContributionAndProof); + let signing_futures = data + .into_iter() + .map(|contribution| { + let cluster = cluster.clone(); + async move { + let message = ContributionAndProof { + aggregator_index, + contribution: contribution.contribution, + selection_proof: contribution.selection_proof_sig, + }; + let signing_root = message.signing_root(domain_hash); + self.collect_signature(cluster, signing_root) + .await + .map(|signature| SignedContributionAndProof { message, signature }) + } + }) + .collect::>(); + + join_all(signing_futures).await + } +} + +fn handle_slashing_check_result( + slashing_status: Result, + object: impl Debug, + kind: &'static str, +) -> Result<(), Error> { + match slashing_status { + // We can safely sign this attestation. + Ok(Safe::Valid) => Ok(()), + Ok(Safe::SameData) => { + warn!("Skipping signing of previously signed {kind}",); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::SAME_DATA], + ); + Err(Error::SameData) + } + Err(NotSafe::UnregisteredValidator(pk)) => { + error!( + "public_key" = format!("{:?}", pk), + "Internal error: validator was not properly registered for slashing protection", + ); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::UNREGISTERED], + ); + Err(Error::Slashable(NotSafe::UnregisteredValidator(pk))) + } + Err(e) => { + error!( + "object" = format!("{:?}", object), + "error" = format!("{:?}", e), + "Not signing slashable {kind}", + ); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::SLASHABLE], + ); + Err(Error::Slashable(e)) + } + } +} + +pub struct ContributionAndProofSigningData { + contribution: SyncCommitteeContribution, + selection_proof: SyncSelectionProof, +} + +#[derive(Debug, Clone)] +pub enum SpecificError { + Unsupported, + SignatureCollectionFailed(CollectionError), + ArithError(ArithError), + QbftError(QbftError), + Timeout, + InvalidQbftData, + TooManySyncSubnetsToSign, +} + +impl From for SpecificError { + fn from(err: CollectionError) -> SpecificError { + SpecificError::SignatureCollectionFailed(err) + } +} + +impl From for SpecificError { + fn from(err: ArithError) -> SpecificError { + SpecificError::ArithError(err) + } +} + +impl From for SpecificError { + fn from(err: QbftError) -> SpecificError { + SpecificError::QbftError(err) + } +} + +pub type Error = ValidatorStoreError; + +impl ValidatorStore for AnchorValidatorStore { + type Error = SpecificError; + type E = E; + + fn validator_index(&self, pubkey: &PublicKeyBytes) -> Option { + self.clusters + .get(pubkey) + .map(|v| v.validator_metadata.index.0 as u64) + } + + fn voting_pubkeys(&self, _filter_func: F) -> I + where + I: FromIterator, + F: Fn(DoppelgangerStatus) -> Option, + { + // we don't care about doppelgangers + self.clusters.iter().map(|v| *v.key()).collect() + } + + fn doppelganger_protection_allows_signing(&self, _validator_pubkey: PublicKeyBytes) -> bool { + // we don't care about doppelgangers + true + } + + fn num_voting_validators(&self) -> usize { + self.clusters.len() + } + + fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option { + self.clusters + .get(validator_pubkey) + .map(|v| v.validator_metadata.graffiti) + } + + fn get_fee_recipient(&self, validator_pubkey: &PublicKeyBytes) -> Option
{ + self.clusters + .get(validator_pubkey) + .map(|v| v.cluster.fee_recipient) + } + + fn determine_builder_boost_factor(&self, _validator_pubkey: &PublicKeyBytes) -> Option { + Some(1) + } + + async fn randao_reveal( + &self, + validator_pubkey: PublicKeyBytes, + signing_epoch: Epoch, + ) -> Result { + let domain_hash = self.get_domain(signing_epoch, Domain::Randao); + let signing_root = signing_epoch.signing_root(domain_hash); + self.collect_signature(self.cluster(validator_pubkey)?, signing_root) + .await + } + + fn set_validator_index(&self, validator_pubkey: &PublicKeyBytes, index: u64) { + // we actually have the index already. we use the opportunity to do a sanity check + match self.clusters.get(validator_pubkey) { + None => warn!( + validator = validator_pubkey.as_hex_string(), + "Trying to set index for unknown validator" + ), + Some(v) => { + if v.validator_metadata.index.0 as u64 != index { + error!( + validator = validator_pubkey.as_hex_string(), + expected = v.validator_metadata.index.0, + actual = index, + "Mismatched validator index", + ) + } + } + } + } + + async fn sign_block( + &self, + validator_pubkey: PublicKeyBytes, + block: UnsignedBlock, + current_slot: Slot, + ) -> Result, Error> { + let data = match block { + UnsignedBlock::Full(block) => { + self.decide_abstract_block( + validator_pubkey, + block, + current_slot, + DataSsz::BeaconBlock, + ) + .await + } + UnsignedBlock::Blinded(block) => { + self.decide_abstract_block( + validator_pubkey, + block, + current_slot, + DataSsz::BlindedBeaconBlock, + ) + .await + } + }?; + + // yay - we agree! let's sign the block we agreed on + match data { + DataSsz::BeaconBlock(block) => Ok(self + .sign_abstract_block(validator_pubkey, block) + .await? + .into()), + DataSsz::BlindedBeaconBlock(block) => Ok(self + .sign_abstract_block(validator_pubkey, block) + .await? + .into()), + _ => Err(Error::SpecificError(SpecificError::InvalidQbftData)), + } + } + + async fn sign_attestation( + &self, + validator_pubkey: PublicKeyBytes, + validator_committee_position: usize, + attestation: &mut Attestation, + current_epoch: Epoch, + ) -> Result<(), Error> { + // Make sure the target epoch is not higher than the current epoch to avoid potential attacks. + if attestation.data().target.epoch > current_epoch { + return Err(Error::GreaterThanCurrentEpoch { + epoch: attestation.data().target.epoch, + current_epoch, + }); + } + + let cluster = self.cluster(validator_pubkey)?; + + let completed = self + .qbft_manager + .decide_instance( + CommitteeInstanceId { + committee: cluster.cluster.cluster_id, + instance_height: attestation.data().slot.as_usize().into(), + }, + BeaconVote { + block_root: attestation.data().beacon_block_root, + source: attestation.data().source, + target: attestation.data().target, + }, + &cluster.cluster, + ) + .await + .map_err(SpecificError::from)?; + let data = match completed { + Completed::TimedOut => return Err(Error::SpecificError(SpecificError::Timeout)), + Completed::Success(data) => data, + }; + attestation.data_mut().beacon_block_root = data.block_root; + attestation.data_mut().source = data.source; + attestation.data_mut().target = data.target; + + // yay - we agree! let's sign the att we agreed on + let domain_hash = self.get_domain(current_epoch, Domain::BeaconAttester); + + handle_slashing_check_result( + self.slashing_protection.check_and_insert_attestation( + &validator_pubkey, + attestation.data(), + domain_hash, + ), + attestation.data(), + "attestation", + )?; + + let signing_root = attestation.data().signing_root(domain_hash); + let signature = self.collect_signature(cluster, signing_root).await?; + attestation + .add_signature(&signature, validator_committee_position) + .map_err(Error::UnableToSignAttestation)?; + + Ok(()) + } + + async fn sign_voluntary_exit( + &self, + _validator_pubkey: PublicKeyBytes, + _voluntary_exit: VoluntaryExit, + ) -> Result { + // there should be no situation ever where we want to sign an exit + Err(Error::SpecificError(SpecificError::Unsupported)) + } + + async fn sign_validator_registration_data( + &self, + validator_registration_data: ValidatorRegistrationData, + ) -> Result { + let domain_hash = self.spec.get_builder_domain(); + let signing_root = validator_registration_data.signing_root(domain_hash); + + let signature = self + .collect_signature( + self.cluster(validator_registration_data.pubkey)?, + signing_root, + ) + .await?; + + Ok(SignedValidatorRegistrationData { + message: validator_registration_data, + signature, + }) + } + + async fn produce_signed_aggregate_and_proof( + &self, + validator_pubkey: PublicKeyBytes, + aggregator_index: u64, + aggregate: Attestation, + selection_proof: SelectionProof, + ) -> Result, Error> { + let signing_epoch = aggregate.data().target.epoch; + let cluster = self.cluster(validator_pubkey)?; + + let message = + AggregateAndProof::from_attestation(aggregator_index, aggregate, selection_proof); + + // first, we have to get to consensus + let completed = self + .qbft_manager + .decide_instance( + ValidatorInstanceId { + validator: validator_pubkey, + duty: ValidatorDutyKind::Aggregator, + instance_height: message.aggregate().data().slot.as_usize().into(), + }, + ValidatorConsensusData { + duty: ValidatorDuty { + r#type: BEACON_ROLE_AGGREGATOR, + pub_key: validator_pubkey, + slot: message.aggregate().data().slot, + validator_index: cluster.validator_metadata.index, + committee_index: message.aggregate().data().index, + // todo it seems the below are not needed (anymore?) + committee_length: 0, + committees_at_slot: 0, + validator_committee_index: 0, + validator_sync_committee_indices: Default::default(), + }, + version: DATA_VERSION_PHASE0, + data_ssz: Box::new(DataSsz::AggregateAndProof(message)), + }, + &cluster.cluster, + ) + .await + .map_err(SpecificError::from)?; + let data = match completed { + Completed::TimedOut => return Err(Error::SpecificError(SpecificError::Timeout)), + Completed::Success(data) => data, + }; + let message = match *data.data_ssz { + DataSsz::AggregateAndProof(message) => message, + _ => return Err(Error::SpecificError(SpecificError::InvalidQbftData)), + }; + + let domain_hash = self.get_domain(signing_epoch, Domain::AggregateAndProof); + let signing_root = message.signing_root(domain_hash); + let signature = self.collect_signature(cluster, signing_root).await?; + + Ok(SignedAggregateAndProof::from_aggregate_and_proof( + message, signature, + )) + } + + async fn produce_selection_proof( + &self, + validator_pubkey: PublicKeyBytes, + slot: Slot, + ) -> Result { + let epoch = slot.epoch(E::slots_per_epoch()); + let domain_hash = self.get_domain(epoch, Domain::SelectionProof); + let signing_root = slot.signing_root(domain_hash); + + self.collect_signature(self.cluster(validator_pubkey)?, signing_root) + .await + .map(SelectionProof::from) + } + + async fn produce_sync_selection_proof( + &self, + validator_pubkey: &PublicKeyBytes, + slot: Slot, + subnet_id: SyncSubnetId, + ) -> Result { + let epoch = slot.epoch(E::slots_per_epoch()); + let domain_hash = self.get_domain(epoch, Domain::SyncCommitteeSelectionProof); + let signing_root = SyncAggregatorSelectionData { + slot, + subcommittee_index: subnet_id.into(), + } + .signing_root(domain_hash); + + self.collect_signature(self.cluster(*validator_pubkey)?, signing_root) + .await + .map(SyncSelectionProof::from) + } + + async fn produce_sync_committee_signature( + &self, + _slot: Slot, + _beacon_block_root: Hash256, + _validator_index: u64, + _validator_pubkey: &PublicKeyBytes, + ) -> Result { + // use `produce_sync_committee_signature_with_full_vote` instead + Err(Error::SpecificError(SpecificError::Unsupported)) + } + + async fn produce_signed_contribution_and_proof( + &self, + _aggregator_index: u64, + _aggregator_pubkey: PublicKeyBytes, + _contribution: SyncCommitteeContribution, + _selection_proof: SyncSelectionProof, + ) -> Result, Error> { + // use `produce_signed_contribution_and_proofs` instead + Err(Error::SpecificError(SpecificError::Unsupported)) + } + + // stolen from lighthouse + /// Prune the slashing protection database so that it remains performant. + /// + /// This function will only do actual pruning periodically, so it should usually be + /// cheap to call. The `first_run` flag can be used to print a more verbose message when pruning + /// runs. + fn prune_slashing_protection_db(&self, current_epoch: Epoch, first_run: bool) { + // Attempt to prune every SLASHING_PROTECTION_HISTORY_EPOCHs, with a tolerance for + // missing the epoch that aligns exactly. + let mut last_prune = self.slashing_protection_last_prune.lock(); + if current_epoch / SLASHING_PROTECTION_HISTORY_EPOCHS + <= *last_prune / SLASHING_PROTECTION_HISTORY_EPOCHS + { + return; + } + + if first_run { + info!( + "epoch" = %current_epoch, + "msg" = "pruning may take several minutes the first time it runs", + "Pruning slashing protection DB", + ); + } else { + info!( + "epoch" = %current_epoch, + "Pruning slashing protection DB", + ); + } + + let _timer = + validator_metrics::start_timer(&validator_metrics::SLASHING_PROTECTION_PRUNE_TIMES); + + let new_min_target_epoch = current_epoch.saturating_sub(SLASHING_PROTECTION_HISTORY_EPOCHS); + let new_min_slot = new_min_target_epoch.start_slot(E::slots_per_epoch()); + + let all_pubkeys: Vec<_> = self.voting_pubkeys(DoppelgangerStatus::ignored); + + if let Err(e) = self + .slashing_protection + .prune_all_signed_attestations(all_pubkeys.iter(), new_min_target_epoch) + { + error!( + "error" = ?e, + "Error during pruning of signed attestations", + ); + return; + } + + if let Err(e) = self + .slashing_protection + .prune_all_signed_blocks(all_pubkeys.iter(), new_min_slot) + { + error!( + "error" = ?e, + "Error during pruning of signed blocks", + ); + return; + } + + *last_prune = current_epoch; + + info!("Completed pruning of slashing protection DB"); + } + + fn proposal_data(&self, pubkey: &PublicKeyBytes) -> Option { + self.clusters.get(pubkey).map(|v| ProposalData { + validator_index: Some(v.validator_metadata.index.0 as u64), + fee_recipient: Some(v.cluster.fee_recipient), + gas_limit: 29_999_998, // TODO support scalooors + builder_proposals: false, // TODO support MEVooors + }) + } +} diff --git a/anchor/validator_store/src/sync_committee_service.rs b/anchor/validator_store/src/sync_committee_service.rs new file mode 100644 index 00000000..2b167825 --- /dev/null +++ b/anchor/validator_store/src/sync_committee_service.rs @@ -0,0 +1,564 @@ +//! Shamelessly stolen from lighthouse and adapted for anchor purposes: +//! - Provide attestation data when requesting signature to ensure valid `BeaconVote` for qbft +//! - One aggregation request per validator including every subnet + +use crate::{AnchorValidatorStore, ContributionAndProofSigningData}; +use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; +use futures::future::join_all; +use futures::future::FutureExt; +use slot_clock::SlotClock; +use ssv_types::message::BeaconVote; +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::time::{sleep, sleep_until, Duration, Instant}; +use tracing::{debug, error, info, trace}; +use types::{ + ChainSpec, EthSpec, Hash256, PublicKeyBytes, Slot, SyncCommitteeSubscription, + SyncContributionData, SyncDuty, SyncSelectionProof, SyncSubnetId, +}; +use validator_services::duties_service::DutiesService; +use validator_store::Error as ValidatorStoreError; + +pub const SUBSCRIPTION_LOOKAHEAD_EPOCHS: u64 = 4; + +pub struct SyncCommitteeService { + inner: Arc>, +} + +impl Clone for SyncCommitteeService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for SyncCommitteeService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +pub struct Inner { + duties_service: Arc, T>>, + validator_store: Arc>, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + /// Boolean to track whether the service has posted subscriptions to the BN at least once. + /// + /// This acts as a latch that fires once upon start-up, and then never again. + first_subscription_done: AtomicBool, +} + +impl SyncCommitteeService { + pub fn new( + duties_service: Arc, T>>, + validator_store: Arc>, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + ) -> Self { + Self { + inner: Arc::new(Inner { + duties_service, + validator_store, + slot_clock, + beacon_nodes, + executor, + first_subscription_done: AtomicBool::new(false), + }), + } + } + + /// Check if the Altair fork has been activated and therefore sync duties should be performed. + /// + /// Slot clock errors are mapped to `false`. + fn altair_fork_activated(&self) -> bool { + self.duties_service + .spec + .altair_fork_epoch + .and_then(|fork_epoch| { + let current_epoch = self.slot_clock.now()?.epoch(E::slots_per_epoch()); + Some(current_epoch >= fork_epoch) + }) + .unwrap_or(false) + } + + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { + let slot_duration = Duration::from_secs(spec.seconds_per_slot); + let duration_to_next_slot = self + .slot_clock + .duration_to_next_slot() + .ok_or("Unable to determine duration to next slot")?; + + info!( + next_update_millis = duration_to_next_slot.as_millis(), + "Sync committee service started" + ); + + let executor = self.executor.clone(); + + let interval_fut = async move { + loop { + if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() { + // Wait for contribution broadcast interval 1/3 of the way through the slot. + sleep(duration_to_next_slot + slot_duration / 3).await; + + // Do nothing if the Altair fork has not yet occurred. + if !self.altair_fork_activated() { + continue; + } + + if let Err(e) = self.spawn_contribution_tasks(slot_duration).await { + error!( + error = ?e, + "Failed to spawn sync contribution tasks" + ) + } else { + trace!("Spawned sync contribution tasks") + } + + // Do subscriptions for future slots/epochs. + self.spawn_subscription_tasks(); + } else { + error!("Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + } + } + }; + + executor.spawn(interval_fut, "sync_committee_service"); + Ok(()) + } + + async fn spawn_contribution_tasks(&self, slot_duration: Duration) -> Result<(), String> { + let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + let duration_to_next_slot = self + .slot_clock + .duration_to_next_slot() + .ok_or("Unable to determine duration to next slot")?; + + // If a validator needs to publish a sync aggregate, they must do so at 2/3 + // through the slot. This delay triggers at this time + let aggregate_production_instant = Instant::now() + + duration_to_next_slot + .checked_sub(slot_duration / 3) + .unwrap_or_else(|| Duration::from_secs(0)); + + let Some(slot_duties) = self + .duties_service + .sync_duties + .get_duties_for_slot::(slot, &self.duties_service.spec) + else { + debug!("No duties known for slot {}", slot); + return Ok(()); + }; + + if slot_duties.duties.is_empty() { + debug!(%slot, "No local validators in current sync committee"); + return Ok(()); + } + + // ANCHOR SPECIFIC - fetch source and target to enable proper qbft voting + let attestation_data = self + .beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_GET], + ); + beacon_node + .get_validator_attestation_data(slot, 0) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) + .map(|result| result.data) + }) + .await + .map_err(|e| e.to_string())?; + + let block_root = attestation_data.beacon_block_root; + let beacon_vote = BeaconVote { + block_root, + source: attestation_data.source, + target: attestation_data.target, + }; + + // Spawn one task to publish all of the sync committee signatures. + let validator_duties = slot_duties.duties; + let service = self.clone(); + self.inner.executor.spawn( + async move { + service + .publish_sync_committee_signatures(slot, beacon_vote, validator_duties) + .map(|_| ()) + .await + }, + "sync_committee_signature_publish", + ); + + let aggregators = slot_duties.aggregators; + let service = self.clone(); + self.inner.executor.spawn( + async move { + service + .publish_sync_committee_aggregates( + slot, + block_root, + aggregators, + aggregate_production_instant, + ) + .map(|_| ()) + .await + }, + "sync_committee_aggregate_publish", + ); + + Ok(()) + } + + /// Publish sync committee signatures. + async fn publish_sync_committee_signatures( + &self, + slot: Slot, + vote: BeaconVote, + validator_duties: Vec, + ) -> Result<(), ()> { + // Create futures to produce sync committee signatures. + let signature_futures = validator_duties.iter().map(|duty| { + let vote = vote.clone(); + async move { + match self + .validator_store + .produce_sync_committee_signature_with_full_vote( + slot, + vote.clone(), + duty.validator_index, + &duty.pubkey, + ) + .await + { + Ok(signature) => Some(signature), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!( + ?pubkey, + validator_index = duty.validator_index, + %slot, + "Missing pubkey for sync committee signature" + ); + None + } + Err(e) => { + error!( + validator_index = duty.validator_index, + %slot, + error = ?e, + "Failed to sign sync committee signature" + ); + None + } + } + } + }); + + // Execute all the futures in parallel, collecting any successful results. + let committee_signatures = &join_all(signature_futures) + .await + .into_iter() + .flatten() + .collect::>(); + + self.beacon_nodes + .request(ApiTopic::SyncCommittee, |beacon_node| async move { + beacon_node + .post_beacon_pool_sync_committee_signatures(committee_signatures) + .await + }) + .await + .map_err(|e| { + error!( + %slot, + error = %e, + "Unable to publish sync committee messages" + ); + })?; + + info!( + count = committee_signatures.len(), + head_block = ?vote.block_root, + %slot, + "Successfully published sync committee messages" + ); + + Ok(()) + } + + async fn publish_sync_committee_aggregates( + &self, + slot: Slot, + beacon_block_root: Hash256, + aggregators: HashMap>, + aggregate_instant: Instant, + ) { + sleep_until(aggregate_instant).await; + + let contributions = aggregators.keys().map(|&subnet_id| { + self.beacon_nodes + .first_success(move |beacon_node| async move { + let sync_contribution_data = SyncContributionData { + slot, + beacon_block_root, + subcommittee_index: *subnet_id, + }; + + beacon_node + .get_validator_sync_committee_contribution(&sync_contribution_data) + .await + .map(|data| data.map(|data| (subnet_id, data.data))) + }) + }); + + let contributions = join_all(contributions) + .await + .into_iter() + .filter_map(|result| match result { + Ok(Some((subnet_id, data))) => Some((*subnet_id, data)), + Ok(None) => { + error!(%slot, ?beacon_block_root, "No aggregate contribution found"); + None + } + Err(err) => { + error!( + %slot, + ?beacon_block_root, + error = %err, + "Failed to produce sync contribution" + ); + None + } + }) + .collect::>(); + + let mut aggregators_by_validator = HashMap::new(); + for (subnet, aggregators) in aggregators { + for aggregator in aggregators { + if let Some(contribution) = contributions.get(&subnet).cloned() { + aggregators_by_validator + .entry((aggregator.0, aggregator.1)) + .or_insert(vec![]) + .push(ContributionAndProofSigningData { + contribution, + selection_proof: aggregator.2, + }); + } + } + } + + // Create futures to produce signed contributions. + let signature_futures = aggregators_by_validator.into_iter().map( + |((aggregator_index, aggregator_pk), data)| { + self.validator_store.produce_signed_contribution_and_proofs( + aggregator_index, + aggregator_pk, + data, + ) + }, + ); + + // Execute all the futures in parallel, collecting any successful results. + let signed_contributions = &join_all(signature_futures) + .await + .into_iter() + .flatten() + .filter_map(|result| match result { + Ok(signed_contribution) => Some(signed_contribution), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!(?pubkey, %slot, "Missing pubkey for sync contribution"); + None + } + Err(e) => { + error!( + %slot, + error = ?e, + "Unable to sign sync committee contribution" + ); + None + } + }) + .collect::>(); + + // Publish to the beacon node. + if let Err(err) = self + .beacon_nodes + .first_success(|beacon_node| async move { + beacon_node + .post_validator_contribution_and_proofs(signed_contributions) + .await + }) + .await + { + error!( + %slot, + error = %err, + "Unable to publish signed contributions and proofs" + ); + } + + info!( + beacon_block_root = %beacon_block_root, + %slot, + "Successfully published sync contributions" + ); + } + + fn spawn_subscription_tasks(&self) { + let service = self.clone(); + + self.inner.executor.spawn( + async move { + service.publish_subscriptions().await.unwrap_or_else(|e| { + error!( + error = ?e, + "Error publishing subscriptions" + ) + }); + }, + "sync_committee_subscription_publish", + ); + } + + async fn publish_subscriptions(self) -> Result<(), String> { + let spec = &self.duties_service.spec; + let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + + let mut duty_slots = vec![]; + let mut all_succeeded = true; + + // At the start of every epoch during the current period, re-post the subscriptions + // to the beacon node. This covers the case where the BN has forgotten the subscriptions + // due to a restart, or where the VC has switched to a fallback BN. + let current_period = sync_period_of_slot::(slot, spec)?; + + if !self.first_subscription_done.load(Ordering::Relaxed) + || slot.as_u64() % E::slots_per_epoch() == 0 + { + duty_slots.push((slot, current_period)); + } + + // Near the end of the current period, push subscriptions for the next period to the + // beacon node. We aggressively push every slot in the lead-up, as this is the main way + // that we want to ensure that the BN is subscribed (well in advance). + let lookahead_slot = slot + SUBSCRIPTION_LOOKAHEAD_EPOCHS * E::slots_per_epoch(); + + let lookahead_period = sync_period_of_slot::(lookahead_slot, spec)?; + + if lookahead_period > current_period { + duty_slots.push((lookahead_slot, lookahead_period)); + } + + if duty_slots.is_empty() { + return Ok(()); + } + + // Collect subscriptions. + let mut subscriptions = vec![]; + + for (duty_slot, sync_committee_period) in duty_slots { + debug!(%duty_slot, %slot, "Fetching subscription duties"); + match self + .duties_service + .sync_duties + .get_duties_for_slot::(duty_slot, spec) + { + Some(duties) => subscriptions.extend(subscriptions_from_sync_duties( + duties.duties, + sync_committee_period, + spec, + )), + None => { + debug!( + slot = %duty_slot, + "No duties for subscription" + ); + all_succeeded = false; + } + } + } + + if subscriptions.is_empty() { + debug!(%slot, "No sync subscriptions to send"); + return Ok(()); + } + + // Post subscriptions to BN. + debug!( + count = subscriptions.len(), + "Posting sync subscriptions to BN" + ); + let subscriptions_slice = &subscriptions; + + for subscription in subscriptions_slice { + debug!( + validator_index = subscription.validator_index, + validator_sync_committee_indices = ?subscription.sync_committee_indices, + until_epoch = %subscription.until_epoch, + "Subscription" + ); + } + + if let Err(e) = self + .beacon_nodes + .request(ApiTopic::Subscriptions, |beacon_node| async move { + beacon_node + .post_validator_sync_committee_subscriptions(subscriptions_slice) + .await + }) + .await + { + error!( + %slot, + error = %e, + "Unable to post sync committee subscriptions" + ); + all_succeeded = false; + } + + // Disable first-subscription latch once all duties have succeeded once. + if all_succeeded { + self.first_subscription_done.store(true, Ordering::Relaxed); + } + + Ok(()) + } +} + +fn sync_period_of_slot(slot: Slot, spec: &ChainSpec) -> Result { + slot.epoch(E::slots_per_epoch()) + .sync_committee_period(spec) + .map_err(|e| format!("Error computing sync period: {:?}", e)) +} + +fn subscriptions_from_sync_duties( + duties: Vec, + sync_committee_period: u64, + spec: &ChainSpec, +) -> impl Iterator { + let until_epoch = spec.epochs_per_sync_committee_period * (sync_committee_period + 1); + duties + .into_iter() + .map(move |duty| SyncCommitteeSubscription { + validator_index: duty.validator_index, + sync_committee_indices: duty.validator_sync_committee_indices, + until_epoch, + }) +}