From dc2518d65173031d4ee629e47aaa64857cea59b3 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 22 Sep 2020 10:33:25 +1000 Subject: [PATCH] Add comments to http_api crate --- Cargo.lock | 1 + beacon_node/http_api/Cargo.toml | 1 + .../http_api/src/beacon_proposer_cache.rs | 36 +++++++++++-- beacon_node/http_api/src/block_id.rs | 4 ++ beacon_node/http_api/src/lib.rs | 51 +++++++++++++++++++ beacon_node/http_api/src/state_id.rs | 9 ++++ 6 files changed, 99 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce3fa319431..8303812609e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2319,6 +2319,7 @@ dependencies = [ "parking_lot 0.11.0", "serde", "slog", + "slot_clock", "state_processing", "store", "tokio 0.2.22", diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index c5e025b2aee..828d26deb3d 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -25,6 +25,7 @@ lighthouse_version = { path = "../../common/lighthouse_version" } lighthouse_metrics = { path = "../../common/lighthouse_metrics" } lazy_static = "1.4.0" warp_utils = { path = "../../common/warp_utils" } +slot_clock = { path = "../../common/slot_clock" } [dev-dependencies] store = { path = "../store" } diff --git a/beacon_node/http_api/src/beacon_proposer_cache.rs b/beacon_node/http_api/src/beacon_proposer_cache.rs index c7eb840dc3f..61870ad5cfd 100644 --- a/beacon_node/http_api/src/beacon_proposer_cache.rs +++ b/beacon_node/http_api/src/beacon_proposer_cache.rs @@ -2,11 +2,18 @@ use crate::metrics; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::types::ProposerData; use fork_choice::ProtoBlock; +use slot_clock::SlotClock; use state_processing::per_slot_processing; use types::{Epoch, EthSpec, Hash256, PublicKeyBytes}; const EPOCHS_TO_SKIP: u64 = 2; +/// Caches the beacon block proposers for a given `epoch` and `epoch_boundary_root`. +/// +/// This cache is only able to contain a single set of proposers and is only +/// intended to cache the proposers for the current epoch according to the head +/// of the chain. A change in epoch or re-org to a different chain may cause a +/// cache miss and rebuild. pub struct BeaconProposerCache { epoch: Epoch, epoch_boundary_root: Hash256, @@ -14,6 +21,7 @@ pub struct BeaconProposerCache { } impl BeaconProposerCache { + /// Create a new cache for the current epoch of the `chain`. pub fn new(chain: &BeaconChain) -> Result { let (head_root, head_block) = Self::current_head_block(chain)?; @@ -32,6 +40,9 @@ impl BeaconProposerCache { Self::for_head_block(chain, epoch, head_root, head_block) } + /// Create a new cache that contains the shuffling for `current_epoch`, + /// assuming that `head_root` and `head_block` represents the most recent + /// canonical block. fn for_head_block( chain: &BeaconChain, current_epoch: Epoch, @@ -44,6 +55,9 @@ impl BeaconProposerCache { .get_state(&head_block.state_root, Some(head_block.slot))? .ok_or_else(|| BeaconChainError::MissingBeaconState(head_block.state_root))?; + // We *must* skip forward to the current epoch to obtain valid proposer + // duties. We cannot skip to the previous epoch, like we do with + // attester duties. while head_state.current_epoch() < current_epoch { // Skip slots until the current epoch, providing `Hash256::zero()` as the state root // since we don't require it to be valid to identify producers. @@ -85,15 +99,29 @@ impl BeaconProposerCache { }) } + /// Return the proposers for the given `Epoch`. + /// + /// The cache may be rebuilt if: + /// + /// - The epoch has changed since the last cache build. + /// - There has been a re-org that crosses an epoch boundary. pub fn get_proposers( &mut self, chain: &BeaconChain, epoch: Epoch, ) -> Result, warp::Rejection> { - let current_epoch = chain - .epoch() - .unwrap_or_else(|_| chain.spec.genesis_slot.epoch(T::EthSpec::slots_per_epoch())); + let current_epoch = if chain.slot_clock.is_prior_to_genesis().ok_or_else(|| { + warp_utils::reject::custom_server_error("unable to read slot clock".to_string()) + })? { + chain.spec.genesis_slot.epoch(T::EthSpec::slots_per_epoch()) + } else { + chain + .epoch() + .map_err(warp_utils::reject::beacon_chain_error)? + }; + // Disallow requests that are outside the current epoch. This ensures the cache doesn't get + // washed-out with old values. if current_epoch != epoch { return Err(warp_utils::reject::custom_bad_request(format!( "requested epoch is {} but only current epoch {} is allowed", @@ -105,6 +133,7 @@ impl BeaconProposerCache { Self::current_head_block(chain).map_err(warp_utils::reject::beacon_chain_error)?; let epoch_boundary_root = head_block.target_root; + // Rebuild the cache if this call causes a cache-miss. if self.epoch != current_epoch || self.epoch_boundary_root != epoch_boundary_root { metrics::inc_counter(&metrics::HTTP_API_BEACON_PROPOSER_CACHE_MISSES_TOTAL); @@ -117,6 +146,7 @@ impl BeaconProposerCache { Ok(self.proposers.clone()) } + /// Use fork choice to obtain some information about the head block of `chain`. fn current_head_block( chain: &BeaconChain, ) -> Result<(Hash256, ProtoBlock), BeaconChainError> { diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index b034a6816a3..5e358a2d683 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -3,6 +3,8 @@ use eth2::types::BlockId as CoreBlockId; use std::str::FromStr; use types::{Hash256, SignedBeaconBlock, Slot}; +/// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given +/// `BlockId`. #[derive(Debug)] pub struct BlockId(pub CoreBlockId); @@ -15,6 +17,7 @@ impl BlockId { Self(CoreBlockId::Root(root)) } + /// Return the block root identified by `self`. pub fn root( &self, chain: &BeaconChain, @@ -48,6 +51,7 @@ impl BlockId { } } + /// Return the `SignedBeaconBlock` identified by `self`. pub fn block( &self, chain: &BeaconChain, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 99403a910f7..6e0c0fcbac4 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1,3 +1,10 @@ +//! This crate contains a HTTP server which serves the endpoints listed here: +//! +//! https://github.com/ethereum/eth2.0-APIs +//! +//! There are also some additional, non-standard endpoints behind the `/lighthouse/` path which are +//! used for development. + mod beacon_proposer_cache; mod block_id; mod consensus; @@ -38,6 +45,9 @@ use warp::Filter; const API_PREFIX: &str = "eth"; const API_VERSION: &str = "v1"; +/// A wrapper around all the items required to spawn the HTTP server. +/// +/// The server will gracefully handle the case where any fields are `None`. pub struct Context { pub config: Config, pub chain: Option>>, @@ -46,6 +56,7 @@ pub struct Context { pub log: Logger, } +/// Configuration for the HTTP server. #[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] pub struct Config { pub enabled: bool, @@ -83,6 +94,7 @@ impl From for Error { } } +/// Creates a `warp` logging wrapper which we use to create `slog` logs. pub fn slog_logging( log: Logger, ) -> warp::filters::log::Log { @@ -112,6 +124,8 @@ pub fn slog_logging( }) } +/// Creates a `warp` logging wrapper which we use for Prometheus metrics (not necessarily logging, +/// per say). pub fn prometheus_metrics() -> warp::filters::log::Log { warp::log::custom(move |info| { // Here we restrict the `info.path()` value to some predefined values. Without this, we end @@ -163,6 +177,21 @@ pub fn prometheus_metrics() -> warp::filters::log::Log( ctx: Arc>, shutdown: impl Future + Send + Sync + 'static, @@ -171,6 +200,7 @@ pub fn serve( let log = ctx.log.clone(); let allow_origin = config.allow_origin.clone(); + // Sanity check. if !config.enabled { crit!(log, "Cannot start disabled HTTP server"); return Err(Error::Other( @@ -180,6 +210,7 @@ pub fn serve( let eth1_v1 = warp::path(API_PREFIX).and(warp::path(API_VERSION)); + // Instantiate the beacon proposer cache. let beacon_proposer_cache = ctx .chain .as_ref() @@ -189,6 +220,7 @@ pub fn serve( .map(Mutex::new) .map(Arc::new); + // Create a `warp` filter that provides access to the proposer cache. let beacon_proposer_cache = || { warp::any() .map(move || beacon_proposer_cache.clone()) @@ -202,6 +234,7 @@ pub fn serve( }) }; + // Create a `warp` filter that provides access to the network globals. let inner_network_globals = ctx.network_globals.clone(); let network_globals = warp::any() .map(move || inner_network_globals.clone()) @@ -214,6 +247,7 @@ pub fn serve( } }); + // Create a `warp` filter that provides access to the beacon chain. let inner_ctx = ctx.clone(); let chain_filter = warp::any() @@ -227,6 +261,7 @@ pub fn serve( } }); + // Create a `warp` filter that provides access to the network sender channel. let inner_ctx = ctx.clone(); let network_tx_filter = warp::any() .map(move || inner_ctx.network_tx.clone()) @@ -239,8 +274,15 @@ pub fn serve( } }); + // Create a `warp` filter that provides access to the logger. let log_filter = warp::any().map(move || ctx.log.clone()); + /* + * + * Start of HTTP method definitions. + * + */ + // GET beacon/genesis let get_beacon_genesis = eth1_v1 .and(warp::path("beacon")) @@ -1520,6 +1562,7 @@ pub fn serve( }) }); + // Define the ultimate set of routes that will be provided to the server. let routes = warp::get() .and( get_beacon_genesis @@ -1573,10 +1616,13 @@ pub fn serve( ) .boxed()) .boxed() + // Maps errors into HTTP responses. .recover(warp_utils::reject::handle_rejection) .with(slog_logging(log.clone())) .with(prometheus_metrics()) + // Add a `Server` header. .map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform())) + // Maybe add some CORS headers. .map(move |reply| warp_utils::reply::maybe_cors(reply, allow_origin.as_ref())); let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( @@ -1595,6 +1641,7 @@ pub fn serve( Ok((listening_socket, server)) } +/// Publish a message to the libp2p pubsub network. fn publish_pubsub_message( network_tx: &UnboundedSender>, message: PubsubMessage, @@ -1607,6 +1654,7 @@ fn publish_pubsub_message( ) } +/// Publish a message to the libp2p network. fn publish_network_message( network_tx: &UnboundedSender>, message: NetworkMessage, @@ -1619,6 +1667,8 @@ fn publish_network_message( }) } +/// Execute some task in a tokio "blocking thread". These threads are ideal for long-running +/// (blocking) tasks since they don't jam up the core executor. async fn blocking_task(func: F) -> T where F: Fn() -> T, @@ -1626,6 +1676,7 @@ where tokio::task::block_in_place(func) } +/// A convenience wrapper around `blocking_task` for use with `warp` JSON responses. async fn blocking_json_task(func: F) -> Result where F: Fn() -> Result, diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index a453eaff59f..11800648f25 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -3,6 +3,8 @@ use eth2::types::StateId as CoreStateId; use std::str::FromStr; use types::{BeaconState, EthSpec, Fork, Hash256, Slot}; +/// Wraps `eth2::types::StateId` and provides common state-access functionality. E.g., reading +/// states or parts of states from the database. pub struct StateId(CoreStateId); impl StateId { @@ -14,6 +16,7 @@ impl StateId { Self(CoreStateId::Slot(slot)) } + /// Return the state root identified by `self`. pub fn root( &self, chain: &BeaconChain, @@ -49,6 +52,7 @@ impl StateId { }) } + /// Return the `fork` field of the state identified by `self`. pub fn fork( &self, chain: &BeaconChain, @@ -56,6 +60,7 @@ impl StateId { self.map_state(chain, |state| Ok(state.fork)) } + /// Return the `BeaconState` identified by `self`. pub fn state( &self, chain: &BeaconChain, @@ -83,6 +88,10 @@ impl StateId { }) } + /// Map a function across the `BeaconState` identified by `self`. + /// + /// This function will avoid instantiating/copying a new state when `self` points to the head + /// of the chain. pub fn map_state( &self, chain: &BeaconChain,