From 21f8a3a16cc59aec377379ef605f0a6df50b0be4 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 11 Sep 2020 14:37:28 +1000 Subject: [PATCH] First compiling version of validator --- common/eth2/src/types.rs | 6 + validator_client/src/attestation_service.rs | 330 +++++++++----------- validator_client/src/block_service.rs | 44 +-- validator_client/src/config.rs | 8 +- validator_client/src/duties_service.rs | 13 +- validator_client/src/is_synced.rs | 22 +- validator_client/src/lib.rs | 11 +- validator_client/src/validator_duty.rs | 4 + 8 files changed, 200 insertions(+), 238 deletions(-) diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 8f3189d9afa..5d03026aff4 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -394,6 +394,12 @@ impl fmt::Display for Graffiti { } } +impl From<[u8; GRAFFITI_BYTES_LEN]> for Graffiti { + fn from(bytes: [u8; GRAFFITI_BYTES_LEN]) -> Self { + Self(bytes) + } +} + impl Into<[u8; GRAFFITI_BYTES_LEN]> for Graffiti { fn into(self) -> [u8; GRAFFITI_BYTES_LEN] { self.0 diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index fa79877774b..4388fbd5451 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -3,22 +3,26 @@ use crate::{ validator_store::ValidatorStore, }; use environment::RuntimeContext; +use eth2::BeaconNodeClient; use futures::StreamExt; -use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; -use slog::{crit, debug, error, info, trace}; +use slog::{crit, error, info, trace}; use slot_clock::SlotClock; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; use tokio::time::{delay_until, interval_at, Duration, Instant}; -use types::{Attestation, ChainSpec, CommitteeIndex, EthSpec, Slot, SubnetId}; +use tree_hash::TreeHash; +use types::{ + AggregateSignature, Attestation, AttestationData, BitList, ChainSpec, CommitteeIndex, EthSpec, + Slot, +}; /// Builds an `AttestationService`. pub struct AttestationServiceBuilder { duties_service: Option>, validator_store: Option>, slot_clock: Option, - beacon_node: Option>, + beacon_node: Option, context: Option>, } @@ -48,7 +52,7 @@ impl AttestationServiceBuilder { self } - pub fn beacon_node(mut self, beacon_node: RemoteBeaconNode) -> Self { + pub fn beacon_node(mut self, beacon_node: BeaconNodeClient) -> Self { self.beacon_node = Some(beacon_node); self } @@ -86,7 +90,7 @@ pub struct Inner { duties_service: DutiesService, validator_store: ValidatorStore, slot_clock: T, - beacon_node: RemoteBeaconNode, + beacon_node: BeaconNodeClient, context: RuntimeContext, } @@ -262,7 +266,7 @@ impl AttestationService { // Step 2. // // If an attestation was produced, make an aggregate. - if let Some(attestation) = attestation_opt { + if let Some(attestation_data) = attestation_opt { // First, wait until the `aggregation_production_instant` (2/3rds // of the way though the slot). As verified in the // `delay_triggers_when_in_the_past` test, this code will still run @@ -272,7 +276,7 @@ impl AttestationService { // Then download, sign and publish a `SignedAggregateAndProof` for each // validator that is elected to aggregate for this `slot` and // `committee_index`. - self.produce_and_publish_aggregates(attestation, &validator_duties) + self.produce_and_publish_aggregates(attestation_data, &validator_duties) .await .map_err(move |e| { crit!( @@ -305,7 +309,7 @@ impl AttestationService { slot: Slot, committee_index: CommitteeIndex, validator_duties: &[DutyAndProof], - ) -> Result>, String> { + ) -> Result, String> { let log = self.context.log(); if validator_duties.is_empty() { @@ -318,124 +322,88 @@ impl AttestationService { .ok_or_else(|| "Unable to determine current slot from clock".to_string())? .epoch(E::slots_per_epoch()); - let attestation = self + let attestation_data = self .beacon_node - .http - .validator() - .produce_attestation(slot, committee_index) + .get_validator_attestation_data(slot, committee_index) .await - .map_err(|e| format!("Failed to produce attestation: {:?}", e))?; + .map_err(|e| format!("Failed to produce attestation data: {:?}", e))? + .data; + + for duty in validator_duties { + // Ensure that all required fields are present in the validator duty. + let ( + duty_slot, + duty_committee_index, + validator_committee_position, + _, + committee_count_at_slot, + committee_length, + ) = if let Some(tuple) = duty.attestation_duties() { + tuple + } else { + crit!( + log, + "Missing validator duties when signing"; + "duties" => format!("{:?}", duty) + ); + continue; + }; - // For each validator in `validator_duties`, clone the `attestation` and add - // their signature. - // - // If any validator is unable to sign, they are simply skipped. - let signed_attestations = validator_duties - .iter() - .filter_map(|duty| { - // Ensure that all required fields are present in the validator duty. - let ( - duty_slot, - duty_committee_index, - validator_committee_position, - _, - committee_count_at_slot, - ) = if let Some(tuple) = duty.attestation_duties() { - tuple - } else { - crit!( - log, - "Missing validator duties when signing"; - "duties" => format!("{:?}", duty) - ); - return None; - }; + // Ensure that the attestation matches the duties. + if duty_slot != attestation_data.slot || duty_committee_index != attestation_data.index + { + crit!( + log, + "Inconsistent validator duties during signing"; + "validator" => format!("{:?}", duty.validator_pubkey()), + "duty_slot" => duty_slot, + "attestation_slot" => attestation_data.slot, + "duty_index" => duty_committee_index, + "attestation_index" => attestation_data.index, + ); + continue; + } - // Ensure that the attestation matches the duties. - if duty_slot != attestation.data.slot - || duty_committee_index != attestation.data.index - { - crit!( - log, - "Inconsistent validator duties during signing"; - "validator" => format!("{:?}", duty.validator_pubkey()), - "duty_slot" => duty_slot, - "attestation_slot" => attestation.data.slot, - "duty_index" => duty_committee_index, - "attestation_index" => attestation.data.index, - ); - return None; - } + let mut attestation = Attestation { + aggregation_bits: BitList::with_capacity(committee_length as usize).unwrap(), + data: attestation_data.clone(), + signature: AggregateSignature::infinity(), + }; - let mut attestation = attestation.clone(); - let subnet_id = SubnetId::compute_subnet_for_attestation_data::( - &attestation.data, - committee_count_at_slot, - &self.context.eth2_config().spec, + self.validator_store + .sign_attestation( + duty.validator_pubkey(), + validator_committee_position, + &mut attestation, + current_epoch, ) - .map_err(|e| { - error!( - log, - "Failed to compute subnet id to publish attestation: {:?}", e - ) - }) - .ok()?; - self.validator_store - .sign_attestation( - duty.validator_pubkey(), - validator_committee_position, - &mut attestation, - current_epoch, - ) - .map(|_| (attestation, subnet_id)) - }) - .collect::>(); - - // If there are any signed attestations, publish them to the BN. Otherwise, - // just return early. - if let Some(attestation) = signed_attestations.first().cloned() { - let num_attestations = signed_attestations.len(); - let beacon_block_root = attestation.0.data.beacon_block_root; - - self.beacon_node - .http - .validator() - .publish_attestations(signed_attestations) + .ok_or_else(|| "Failed to sign attestation".to_string())?; + + match self + .beacon_node + .post_beacon_pool_attestations(&attestation) .await - .map_err(|e| format!("Failed to publish attestation: {:?}", e)) - .map(move |publish_status| match publish_status { - PublishStatus::Valid => info!( - log, - "Successfully published attestations"; - "count" => num_attestations, - "head_block" => format!("{:?}", beacon_block_root), - "committee_index" => committee_index, - "slot" => slot.as_u64(), - "type" => "unaggregated", - ), - PublishStatus::Invalid(msg) => crit!( - log, - "Published attestation was invalid"; - "message" => msg, - "committee_index" => committee_index, - "slot" => slot.as_u64(), - "type" => "unaggregated", - ), - PublishStatus::Unknown => { - crit!(log, "Unknown condition when publishing unagg. attestation") - } - }) - .map(|()| Some(attestation.0)) - } else { - debug!( - log, - "No attestations to publish"; - "committee_index" => committee_index, - "slot" => slot.as_u64(), - ); - - Ok(None) + { + Ok(()) => info!( + log, + "Successfully published attestation"; + "head_block" => format!("{:?}", attestation.data.beacon_block_root), + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + "type" => "unaggregated", + ), + Err(e) => error!( + log, + "Unable to publish attestation"; + "error" => e.to_string(), + "committee_index" => attestation.data.index, + "slot" => slot.as_u64(), + "type" => "unaggregated", + ), + } } + + Ok(Some(attestation_data)) } /// Performs the second step of the attesting process: downloading an aggregated `Attestation`, @@ -453,103 +421,89 @@ impl AttestationService { /// returned to the BN. async fn produce_and_publish_aggregates( &self, - attestation: Attestation, + attestation_data: AttestationData, validator_duties: &[DutyAndProof], ) -> Result<(), String> { let log = self.context.log(); let aggregated_attestation = self .beacon_node - .http - .validator() - .produce_aggregate_attestation(&attestation.data) + .get_validator_aggregate_attestation( + attestation_data.slot, + attestation_data.tree_hash_root(), + ) .await - .map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))?; - - // For each validator, clone the `aggregated_attestation` and convert it into - // a `SignedAggregateAndProof` - let signed_aggregate_and_proofs = validator_duties - .iter() - .filter_map(|duty_and_proof| { - // Do not produce a signed aggregator for validators that are not + .map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))? + .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))? + .data; + + for duty_and_proof in validator_duties { + let selection_proof = if let Some(proof) = duty_and_proof.selection_proof.as_ref() { + proof + } else { + // Do not produce a signed aggregate for validators that are not // subscribed aggregators. - let selection_proof = duty_and_proof.selection_proof.as_ref()?.clone(); - - let (duty_slot, duty_committee_index, _, validator_index, _) = - duty_and_proof.attestation_duties().or_else(|| { - crit!(log, "Missing duties when signing aggregate"); - None - })?; + continue; + }; + let (duty_slot, duty_committee_index, _, validator_index, _, _) = + if let Some(tuple) = duty_and_proof.attestation_duties() { + tuple + } else { + crit!(log, "Missing duties when signing aggregate"); + continue; + }; - let pubkey = &duty_and_proof.duty.validator_pubkey; - let slot = attestation.data.slot; - let committee_index = attestation.data.index; + let pubkey = &duty_and_proof.duty.validator_pubkey; + let slot = attestation_data.slot; + let committee_index = attestation_data.index; - if duty_slot != slot || duty_committee_index != committee_index { - crit!(log, "Inconsistent validator duties during signing"); - return None; - } + if duty_slot != slot || duty_committee_index != committee_index { + crit!(log, "Inconsistent validator duties during signing"); + continue; + } - if let Some(signed_aggregate_and_proof) = - self.validator_store.produce_signed_aggregate_and_proof( - pubkey, - validator_index, - aggregated_attestation.clone(), - selection_proof, - ) - { - Some(signed_aggregate_and_proof) - } else { - crit!(log, "Failed to sign attestation"); - None - } - }) - .collect::>(); + let signed_aggregate_and_proof = if let Some(aggregate) = + self.validator_store.produce_signed_aggregate_and_proof( + pubkey, + validator_index, + aggregated_attestation.clone(), + selection_proof.clone(), + ) { + aggregate + } else { + crit!(log, "Failed to sign attestation"); + continue; + }; - // If there any signed aggregates and proofs were produced, publish them to the - // BN. - if let Some(first) = signed_aggregate_and_proofs.first().cloned() { - let attestation = first.message.aggregate; + let attestation = &signed_aggregate_and_proof.message.aggregate; - let publish_status = self + match self .beacon_node - .http - .validator() - .publish_aggregate_and_proof(signed_aggregate_and_proofs) + .post_validator_aggregate_and_proof(&signed_aggregate_and_proof) .await - .map_err(|e| format!("Failed to publish aggregate and proofs: {:?}", e))?; - match publish_status { - PublishStatus::Valid => info!( + { + Ok(()) => info!( log, - "Successfully published attestations"; + "Successfully published attestation"; + "aggregator" => signed_aggregate_and_proof.message.aggregator_index, "signatures" => attestation.aggregation_bits.num_set_bits(), "head_block" => format!("{:?}", attestation.data.beacon_block_root), "committee_index" => attestation.data.index, "slot" => attestation.data.slot.as_u64(), "type" => "aggregated", ), - PublishStatus::Invalid(msg) => crit!( + Err(e) => crit!( log, - "Published attestation was invalid"; - "message" => msg, + "Failed to publish attestation"; + "error" => e.to_string(), "committee_index" => attestation.data.index, "slot" => attestation.data.slot.as_u64(), "type" => "aggregated", ), - PublishStatus::Unknown => { - crit!(log, "Unknown condition when publishing agg. attestation") - } - }; - Ok(()) - } else { - debug!( - log, - "No signed aggregates to publish"; - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - ); - Ok(()) + } } + + Ok(()) } } diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 60d1f4d5514..5b050034b16 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -1,19 +1,19 @@ use crate::validator_store::ValidatorStore; use environment::RuntimeContext; +use eth2::{types::Graffiti, BeaconNodeClient}; use futures::channel::mpsc::Receiver; use futures::{StreamExt, TryFutureExt}; -use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; -use types::{EthSpec, Graffiti, PublicKey, Slot}; +use types::{EthSpec, PublicKey, Slot}; /// Builds a `BlockService`. pub struct BlockServiceBuilder { validator_store: Option>, slot_clock: Option>, - beacon_node: Option>, + beacon_node: Option, context: Option>, graffiti: Option, } @@ -39,7 +39,7 @@ impl BlockServiceBuilder { self } - pub fn beacon_node(mut self, beacon_node: RemoteBeaconNode) -> Self { + pub fn beacon_node(mut self, beacon_node: BeaconNodeClient) -> Self { self.beacon_node = Some(beacon_node); self } @@ -79,7 +79,7 @@ impl BlockServiceBuilder { pub struct Inner { validator_store: ValidatorStore, slot_clock: Arc, - beacon_node: RemoteBeaconNode, + beacon_node: BeaconNodeClient, context: RuntimeContext, graffiti: Option, } @@ -221,11 +221,10 @@ impl BlockService { let block = self .beacon_node - .http - .validator() - .produce_block(slot, randao_reveal, self.graffiti) + .get_validator_blocks(slot, randao_reveal.into(), self.graffiti.as_ref()) .await - .map_err(|e| format!("Error from beacon node when producing block: {:?}", e))?; + .map_err(|e| format!("Error from beacon node when producing block: {:?}", e))? + .data; let signed_block = self .validator_store @@ -234,28 +233,17 @@ impl BlockService { let publish_status = self .beacon_node - .http - .validator() - .publish_block(signed_block.clone()) + .post_beacon_blocks(&signed_block) .await .map_err(|e| format!("Error from beacon node when publishing block: {:?}", e))?; - match publish_status { - PublishStatus::Valid => info!( - log, - "Successfully published block"; - "deposits" => signed_block.message.body.deposits.len(), - "attestations" => signed_block.message.body.attestations.len(), - "slot" => signed_block.slot().as_u64(), - ), - PublishStatus::Invalid(msg) => crit!( - log, - "Published block was invalid"; - "message" => msg, - "slot" => signed_block.slot().as_u64(), - ), - PublishStatus::Unknown => crit!(log, "Unknown condition when publishing block"), - } + info!( + log, + "Successfully published block"; + "deposits" => signed_block.message.body.deposits.len(), + "attestations" => signed_block.message.body.attestations.len(), + "slot" => signed_block.slot().as_u64(), + ); Ok(()) } diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 4a11c5aecdc..3a9366463af 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -1,8 +1,9 @@ use clap::ArgMatches; use clap_utils::{parse_optional, parse_path_with_default_in_home_dir}; +use eth2::types::Graffiti; use serde_derive::{Deserialize, Serialize}; use std::path::PathBuf; -use types::{Graffiti, GRAFFITI_BYTES_LEN}; +use types::GRAFFITI_BYTES_LEN; pub const DEFAULT_HTTP_SERVER: &str = "http://localhost:5052/"; pub const DEFAULT_DATA_DIR: &str = ".lighthouse/validators"; @@ -92,15 +93,14 @@ impl Config { GRAFFITI_BYTES_LEN )); } else { - // Default graffiti to all 0 bytes. - let mut graffiti = Graffiti::default(); + let mut graffiti = [0; 32]; // Copy the provided bytes over. // // Panic-free because `graffiti_bytes.len()` <= `GRAFFITI_BYTES_LEN`. graffiti[..graffiti_bytes.len()].copy_from_slice(&graffiti_bytes); - config.graffiti = Some(graffiti); + config.graffiti = Some(graffiti.into()); } } diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index fe01a9ae964..d38db5cd051 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -93,13 +93,14 @@ impl DutyAndProof { /// Returns the information required for an attesting validator, if they are scheduled to /// attest. - pub fn attestation_duties(&self) -> Option<(Slot, CommitteeIndex, usize, u64, u64)> { + pub fn attestation_duties(&self) -> Option<(Slot, CommitteeIndex, usize, u64, u64, u64)> { Some(( self.duty.attestation_slot?, self.duty.attestation_committee_index?, self.duty.attestation_committee_position?, self.duty.validator_index?, self.duty.committee_count_at_slot?, + self.duty.committee_length?, )) } @@ -111,15 +112,7 @@ impl DutyAndProof { impl Into for ValidatorDuty { fn into(self) -> DutyAndProof { DutyAndProof { - duty: ValidatorDuty { - validator_pubkey: self.validator_pubkey, - validator_index: self.validator_index, - attestation_slot: self.attestation_slot, - attestation_committee_index: self.attestation_committee_index, - attestation_committee_position: self.attestation_committee_position, - committee_count_at_slot: self.committee_count_at_slot, - block_proposal_slots: self.block_proposal_slots, - }, + duty: self, selection_proof: None, } } diff --git a/validator_client/src/is_synced.rs b/validator_client/src/is_synced.rs index 2df4708e786..1bee87e2ae1 100644 --- a/validator_client/src/is_synced.rs +++ b/validator_client/src/is_synced.rs @@ -1,5 +1,5 @@ use eth2::BeaconNodeClient; -use slog::{debug, error, Logger}; +use slog::{debug, error, warn, Logger}; use slot_clock::SlotClock; /// A distance in slots. @@ -37,14 +37,15 @@ pub async fn is_synced( let is_synced = !resp.data.is_syncing || (resp.data.sync_distance.as_u64() < SYNC_TOLERANCE); - if !is_synced { - if let Some(log) = log_opt { + if let Some(log) = log_opt { + if !is_synced { debug!( log, "Beacon node sync status"; "status" => format!("{:?}", resp), ); - error!( + + warn!( log, "Beacon node is syncing"; "msg" => "not receiving new duties", @@ -52,6 +53,19 @@ pub async fn is_synced( "head_slot" => resp.data.head_slot.as_u64(), ); } + + if let Some(local_slot) = slot_clock.now() { + let remote_slot = resp.data.head_slot + resp.data.sync_distance; + if remote_slot + 1 < local_slot || local_slot + 1 < remote_slot { + error!( + log, + "Time discrepancy with beacon node"; + "msg" => "check the system time on this host and the beacon node", + "beacon_node_slot" => remote_slot, + "local_slot" => local_slot, + ); + } + } } is_synced diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index f5a7c693a1e..d4397eb0b71 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -20,7 +20,6 @@ use clap::ArgMatches; use duties_service::{DutiesService, DutiesServiceBuilder}; use environment::RuntimeContext; use eth2::{reqwest::ClientBuilder, BeaconNodeClient, StatusCode, Url}; -use eth2_config::Eth2Config; use fork_service::{ForkService, ForkServiceBuilder}; use futures::channel::mpsc; use initialized_validators::InitializedValidators; @@ -28,6 +27,7 @@ use notifier::spawn_notifier; use slog::{error, info, Logger}; use slot_clock::SlotClock; use slot_clock::SystemTimeSlotClock; +use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{delay_for, Duration}; use types::{EthSpec, Hash256, YamlConfig}; @@ -62,7 +62,7 @@ impl ProductionValidatorClient { /// Instantiates the validator client, _without_ starting the timers to trigger block /// and attestation production. - pub async fn new(mut context: RuntimeContext, config: Config) -> Result { + pub async fn new(context: RuntimeContext, config: Config) -> Result { let log = context.log().clone(); info!( @@ -122,7 +122,7 @@ impl ProductionValidatorClient { tuple = init_from_beacon_node(&beacon_node, &context) => tuple?, () = context.executor.exit() => return Err("Shutting down".to_string()) }; - let mut beacon_node_spec = yaml_config.apply_to_chain_spec(&T::default_spec()) + let beacon_node_spec = yaml_config.apply_to_chain_spec::(&T::default_spec()) .ok_or_else(|| format!( "The minimal/mainnet spec type of the beacon node does not match the validator client. \ See the --testnet command." @@ -208,7 +208,10 @@ impl ProductionValidatorClient { self.duties_service .clone() - .start_update_service(block_service_tx, &self.context.eth2_config.spec) + .start_update_service( + block_service_tx, + Arc::new(self.context.eth2_config.spec.clone()), + ) .map_err(|e| format!("Unable to start duties service: {}", e))?; self.fork_service diff --git a/validator_client/src/validator_duty.rs b/validator_client/src/validator_duty.rs index e602c8f594c..47694fa9fd5 100644 --- a/validator_client/src/validator_duty.rs +++ b/validator_client/src/validator_duty.rs @@ -23,6 +23,8 @@ pub struct ValidatorDuty { pub attestation_committee_position: Option, /// The committee count at `attestation_slot`. pub committee_count_at_slot: Option, + /// The number of validators in the committee. + pub committee_length: Option, /// The slots in which a validator must propose a block (can be empty). /// /// Should be set to `None` when duties are not yet known (before the current epoch). @@ -38,6 +40,7 @@ impl ValidatorDuty { attestation_committee_index: None, attestation_committee_position: None, committee_count_at_slot: None, + committee_length: None, block_proposal_slots: None, } } @@ -87,6 +90,7 @@ impl ValidatorDuty { attestation_committee_index: Some(attester.committee_index), attestation_committee_position: Some(attester.validator_committee_index as usize), committee_count_at_slot: Some(attester.committees_at_slot), + committee_length: Some(attester.committee_length), block_proposal_slots: Some(block_proposal_slots), }) } else {