Skip to content

Commit

Permalink
Move all init tasks into fn
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Sep 5, 2020
1 parent 59f4d02 commit cb031b5
Showing 1 changed file with 97 additions and 93 deletions.
190 changes: 97 additions & 93 deletions validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use block_service::{BlockService, BlockServiceBuilder};
use clap::ArgMatches;
use duties_service::{DutiesService, DutiesServiceBuilder};
use environment::RuntimeContext;
use eth2_config::Eth2Config;
use fork_service::{ForkService, ForkServiceBuilder};
use futures::channel::mpsc;
use initialized_validators::InitializedValidators;
Expand All @@ -28,7 +29,7 @@ use slot_clock::SlotClock;
use slot_clock::SystemTimeSlotClock;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{delay_for, Duration};
use types::EthSpec;
use types::{EthSpec, Hash256};
use validator_store::ValidatorStore;

/// The interval between attempts to contact the beacon node during startup.
Expand Down Expand Up @@ -107,59 +108,11 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
RemoteBeaconNode::new_with_timeout(config.http_server.clone(), HTTP_TIMEOUT)
.map_err(|e| format!("Unable to init beacon node http client: {}", e))?;

// TODO: check if all logs in wait_for_node are produced while awaiting
let beacon_node = wait_for_node(&context, beacon_node, log.clone()).await?;
let eth2_config = beacon_node
.http
.spec()
.get_eth2_config()
.await
.map_err(|e| format!("Unable to read eth2 config from beacon node: {:?}", e))?;
let genesis_time = beacon_node
.http
.beacon()
.get_genesis_time()
.await
.map_err(|e| format!("Unable to read genesis time from beacon node: {:?}", e))?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| format!("Unable to read system time: {:?}", e))?;
let genesis = Duration::from_secs(genesis_time);

// If the time now is less than (prior to) genesis, then delay until the
// genesis instant.
//
// If the validator client starts before genesis, it will get errors from
// the slot clock.
if now < genesis {
info!(
log,
"Starting node prior to genesis";
"seconds_to_wait" => (genesis - now).as_secs()
);

tokio::select! {
() = delay_for(genesis - now) => (),
() = context.executor.exit() => return Err("Shutting down".to_string())
}
} else {
info!(
log,
"Genesis has already occurred";
"seconds_ago" => (now - genesis).as_secs()
);
}
let genesis_validators_root = beacon_node
.http
.beacon()
.get_genesis_validators_root()
.await
.map_err(|e| {
format!(
"Unable to read genesis validators root from beacon node: {:?}",
e
)
})?;
// Perform some potentially long-running initialization tasks.
let (eth2_config, genesis_time, genesis_validators_root) = tokio::select! {
tuple = init_from_beacon_node(&beacon_node, &context) => tuple?,
() = context.executor.exit() => return Err("Shutting down".to_string())
};

// Do not permit a connection to a beacon node using different spec constants.
if context.eth2_config.spec_constants != eth2_config.spec_constants {
Expand Down Expand Up @@ -274,49 +227,100 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
}
}

async fn init_from_beacon_node<E: EthSpec>(
beacon_node: &RemoteBeaconNode<E>,
context: &RuntimeContext<E>,
) -> Result<(Eth2Config, u64, Hash256), String> {
// Wait for the beacon node to come online.
wait_for_node(beacon_node, context.log()).await?;

let eth2_config = beacon_node
.http
.spec()
.get_eth2_config()
.await
.map_err(|e| format!("Unable to read eth2 config from beacon node: {:?}", e))?;
let genesis_time = beacon_node
.http
.beacon()
.get_genesis_time()
.await
.map_err(|e| format!("Unable to read genesis time from beacon node: {:?}", e))?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| format!("Unable to read system time: {:?}", e))?;
let genesis = Duration::from_secs(genesis_time);

// If the time now is less than (prior to) genesis, then delay until the
// genesis instant.
//
// If the validator client starts before genesis, it will get errors from
// the slot clock.
if now < genesis {
info!(
context.log(),
"Starting node prior to genesis";
"seconds_to_wait" => (genesis - now).as_secs()
);

delay_for(genesis - now).await;
} else {
info!(
context.log(),
"Genesis has already occurred";
"seconds_ago" => (now - genesis).as_secs()
);
}
let genesis_validators_root = beacon_node
.http
.beacon()
.get_genesis_validators_root()
.await
.map_err(|e| {
format!(
"Unable to read genesis validators root from beacon node: {:?}",
e
)
})?;

Ok((eth2_config, genesis_time, genesis_validators_root))
}

/// Request the version from the node, looping back and trying again on failure. Exit once the node
/// has been contacted.
async fn wait_for_node<E: EthSpec>(
context: &RuntimeContext<E>,
beacon_node: RemoteBeaconNode<E>,
log: Logger,
) -> Result<RemoteBeaconNode<E>, String> {
let future = Box::pin(async move {
// Try to get the version string from the node, looping until success is returned.
loop {
let log = log.clone();
let result = beacon_node
.clone()
.http
.node()
.get_version()
.await
.map_err(|e| format!("{:?}", e));

match result {
Ok(version) => {
info!(
log,
"Connected to beacon node";
"version" => version,
);

return Ok(beacon_node);
}
Err(e) => {
error!(
log,
"Unable to connect to beacon node";
"error" => format!("{:?}", e),
);
delay_for(RETRY_DELAY).await;
}
beacon_node: &RemoteBeaconNode<E>,
log: &Logger,
) -> Result<(), String> {
// Try to get the version string from the node, looping until success is returned.
loop {
let log = log.clone();
let result = beacon_node
.clone()
.http
.node()
.get_version()
.await
.map_err(|e| format!("{:?}", e));

match result {
Ok(version) => {
info!(
log,
"Connected to beacon node";
"version" => version,
);

return Ok(());
}
Err(e) => {
error!(
log,
"Unable to connect to beacon node";
"error" => format!("{:?}", e),
);
delay_for(RETRY_DELAY).await;
}
}
});

tokio::select! {
result = future => result,
() = context.executor.exit() => Err("Shutting down".to_string())
}
}

0 comments on commit cb031b5

Please sign in to comment.