From cb031b55318dd1302e9dc24112a1f04134203456 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 5 Sep 2020 12:13:54 +1000 Subject: [PATCH] Move all init tasks into fn --- validator_client/src/lib.rs | 190 ++++++++++++++++++------------------ 1 file changed, 97 insertions(+), 93 deletions(-) diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 3e0188ecce4..6b709023faf 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -18,6 +18,7 @@ use block_service::{BlockService, BlockServiceBuilder}; use clap::ArgMatches; use duties_service::{DutiesService, DutiesServiceBuilder}; use environment::RuntimeContext; +use eth2_config::Eth2Config; use fork_service::{ForkService, ForkServiceBuilder}; use futures::channel::mpsc; use initialized_validators::InitializedValidators; @@ -28,7 +29,7 @@ use slot_clock::SlotClock; use slot_clock::SystemTimeSlotClock; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{delay_for, Duration}; -use types::EthSpec; +use types::{EthSpec, Hash256}; use validator_store::ValidatorStore; /// The interval between attempts to contact the beacon node during startup. @@ -107,59 +108,11 @@ impl ProductionValidatorClient { RemoteBeaconNode::new_with_timeout(config.http_server.clone(), HTTP_TIMEOUT) .map_err(|e| format!("Unable to init beacon node http client: {}", e))?; - // TODO: check if all logs in wait_for_node are produced while awaiting - let beacon_node = wait_for_node(&context, beacon_node, log.clone()).await?; - let eth2_config = beacon_node - .http - .spec() - .get_eth2_config() - .await - .map_err(|e| format!("Unable to read eth2 config from beacon node: {:?}", e))?; - let genesis_time = beacon_node - .http - .beacon() - .get_genesis_time() - .await - .map_err(|e| format!("Unable to read genesis time from beacon node: {:?}", e))?; - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map_err(|e| format!("Unable to read system time: {:?}", e))?; - let genesis = Duration::from_secs(genesis_time); - - // If the time now is less than (prior to) genesis, then delay until the - // genesis instant. - // - // If the validator client starts before genesis, it will get errors from - // the slot clock. - if now < genesis { - info!( - log, - "Starting node prior to genesis"; - "seconds_to_wait" => (genesis - now).as_secs() - ); - - tokio::select! { - () = delay_for(genesis - now) => (), - () = context.executor.exit() => return Err("Shutting down".to_string()) - } - } else { - info!( - log, - "Genesis has already occurred"; - "seconds_ago" => (now - genesis).as_secs() - ); - } - let genesis_validators_root = beacon_node - .http - .beacon() - .get_genesis_validators_root() - .await - .map_err(|e| { - format!( - "Unable to read genesis validators root from beacon node: {:?}", - e - ) - })?; + // Perform some potentially long-running initialization tasks. + let (eth2_config, genesis_time, genesis_validators_root) = tokio::select! { + tuple = init_from_beacon_node(&beacon_node, &context) => tuple?, + () = context.executor.exit() => return Err("Shutting down".to_string()) + }; // Do not permit a connection to a beacon node using different spec constants. if context.eth2_config.spec_constants != eth2_config.spec_constants { @@ -274,49 +227,100 @@ impl ProductionValidatorClient { } } +async fn init_from_beacon_node( + beacon_node: &RemoteBeaconNode, + context: &RuntimeContext, +) -> Result<(Eth2Config, u64, Hash256), String> { + // Wait for the beacon node to come online. + wait_for_node(beacon_node, context.log()).await?; + + let eth2_config = beacon_node + .http + .spec() + .get_eth2_config() + .await + .map_err(|e| format!("Unable to read eth2 config from beacon node: {:?}", e))?; + let genesis_time = beacon_node + .http + .beacon() + .get_genesis_time() + .await + .map_err(|e| format!("Unable to read genesis time from beacon node: {:?}", e))?; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| format!("Unable to read system time: {:?}", e))?; + let genesis = Duration::from_secs(genesis_time); + + // If the time now is less than (prior to) genesis, then delay until the + // genesis instant. + // + // If the validator client starts before genesis, it will get errors from + // the slot clock. + if now < genesis { + info!( + context.log(), + "Starting node prior to genesis"; + "seconds_to_wait" => (genesis - now).as_secs() + ); + + delay_for(genesis - now).await; + } else { + info!( + context.log(), + "Genesis has already occurred"; + "seconds_ago" => (now - genesis).as_secs() + ); + } + let genesis_validators_root = beacon_node + .http + .beacon() + .get_genesis_validators_root() + .await + .map_err(|e| { + format!( + "Unable to read genesis validators root from beacon node: {:?}", + e + ) + })?; + + Ok((eth2_config, genesis_time, genesis_validators_root)) +} + /// Request the version from the node, looping back and trying again on failure. Exit once the node /// has been contacted. async fn wait_for_node( - context: &RuntimeContext, - beacon_node: RemoteBeaconNode, - log: Logger, -) -> Result, String> { - let future = Box::pin(async move { - // Try to get the version string from the node, looping until success is returned. - loop { - let log = log.clone(); - let result = beacon_node - .clone() - .http - .node() - .get_version() - .await - .map_err(|e| format!("{:?}", e)); - - match result { - Ok(version) => { - info!( - log, - "Connected to beacon node"; - "version" => version, - ); - - return Ok(beacon_node); - } - Err(e) => { - error!( - log, - "Unable to connect to beacon node"; - "error" => format!("{:?}", e), - ); - delay_for(RETRY_DELAY).await; - } + beacon_node: &RemoteBeaconNode, + log: &Logger, +) -> Result<(), String> { + // Try to get the version string from the node, looping until success is returned. + loop { + let log = log.clone(); + let result = beacon_node + .clone() + .http + .node() + .get_version() + .await + .map_err(|e| format!("{:?}", e)); + + match result { + Ok(version) => { + info!( + log, + "Connected to beacon node"; + "version" => version, + ); + + return Ok(()); + } + Err(e) => { + error!( + log, + "Unable to connect to beacon node"; + "error" => format!("{:?}", e), + ); + delay_for(RETRY_DELAY).await; } } - }); - - tokio::select! { - result = future => result, - () = context.executor.exit() => Err("Shutting down".to_string()) } }