Skip to content

Commit

Permalink
Add comments to http_api crate
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Sep 22, 2020
1 parent 852a38e commit dc2518d
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ lighthouse_version = { path = "../../common/lighthouse_version" }
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
lazy_static = "1.4.0"
warp_utils = { path = "../../common/warp_utils" }
slot_clock = { path = "../../common/slot_clock" }

[dev-dependencies]
store = { path = "../store" }
Expand Down
36 changes: 33 additions & 3 deletions beacon_node/http_api/src/beacon_proposer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@ use crate::metrics;
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::types::ProposerData;
use fork_choice::ProtoBlock;
use slot_clock::SlotClock;
use state_processing::per_slot_processing;
use types::{Epoch, EthSpec, Hash256, PublicKeyBytes};

const EPOCHS_TO_SKIP: u64 = 2;

/// Caches the beacon block proposers for a given `epoch` and `epoch_boundary_root`.
///
/// This cache is only able to contain a single set of proposers and is only
/// intended to cache the proposers for the current epoch according to the head
/// of the chain. A change in epoch or re-org to a different chain may cause a
/// cache miss and rebuild.
pub struct BeaconProposerCache {
epoch: Epoch,
epoch_boundary_root: Hash256,
proposers: Vec<ProposerData>,
}

impl BeaconProposerCache {
/// Create a new cache for the current epoch of the `chain`.
pub fn new<T: BeaconChainTypes>(chain: &BeaconChain<T>) -> Result<Self, BeaconChainError> {
let (head_root, head_block) = Self::current_head_block(chain)?;

Expand All @@ -32,6 +40,9 @@ impl BeaconProposerCache {
Self::for_head_block(chain, epoch, head_root, head_block)
}

/// Create a new cache that contains the shuffling for `current_epoch`,
/// assuming that `head_root` and `head_block` represents the most recent
/// canonical block.
fn for_head_block<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
current_epoch: Epoch,
Expand All @@ -44,6 +55,9 @@ impl BeaconProposerCache {
.get_state(&head_block.state_root, Some(head_block.slot))?
.ok_or_else(|| BeaconChainError::MissingBeaconState(head_block.state_root))?;

// We *must* skip forward to the current epoch to obtain valid proposer
// duties. We cannot skip to the previous epoch, like we do with
// attester duties.
while head_state.current_epoch() < current_epoch {
// Skip slots until the current epoch, providing `Hash256::zero()` as the state root
// since we don't require it to be valid to identify producers.
Expand Down Expand Up @@ -85,15 +99,29 @@ impl BeaconProposerCache {
})
}

/// Return the proposers for the given `Epoch`.
///
/// The cache may be rebuilt if:
///
/// - The epoch has changed since the last cache build.
/// - There has been a re-org that crosses an epoch boundary.
pub fn get_proposers<T: BeaconChainTypes>(
&mut self,
chain: &BeaconChain<T>,
epoch: Epoch,
) -> Result<Vec<ProposerData>, warp::Rejection> {
let current_epoch = chain
.epoch()
.unwrap_or_else(|_| chain.spec.genesis_slot.epoch(T::EthSpec::slots_per_epoch()));
let current_epoch = if chain.slot_clock.is_prior_to_genesis().ok_or_else(|| {
warp_utils::reject::custom_server_error("unable to read slot clock".to_string())
})? {
chain.spec.genesis_slot.epoch(T::EthSpec::slots_per_epoch())
} else {
chain
.epoch()
.map_err(warp_utils::reject::beacon_chain_error)?
};

// Disallow requests that are outside the current epoch. This ensures the cache doesn't get
// washed-out with old values.
if current_epoch != epoch {
return Err(warp_utils::reject::custom_bad_request(format!(
"requested epoch is {} but only current epoch {} is allowed",
Expand All @@ -105,6 +133,7 @@ impl BeaconProposerCache {
Self::current_head_block(chain).map_err(warp_utils::reject::beacon_chain_error)?;
let epoch_boundary_root = head_block.target_root;

// Rebuild the cache if this call causes a cache-miss.
if self.epoch != current_epoch || self.epoch_boundary_root != epoch_boundary_root {
metrics::inc_counter(&metrics::HTTP_API_BEACON_PROPOSER_CACHE_MISSES_TOTAL);

Expand All @@ -117,6 +146,7 @@ impl BeaconProposerCache {
Ok(self.proposers.clone())
}

/// Use fork choice to obtain some information about the head block of `chain`.
fn current_head_block<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
) -> Result<(Hash256, ProtoBlock), BeaconChainError> {
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/http_api/src/block_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use eth2::types::BlockId as CoreBlockId;
use std::str::FromStr;
use types::{Hash256, SignedBeaconBlock, Slot};

/// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given
/// `BlockId`.
#[derive(Debug)]
pub struct BlockId(pub CoreBlockId);

Expand All @@ -15,6 +17,7 @@ impl BlockId {
Self(CoreBlockId::Root(root))
}

/// Return the block root identified by `self`.
pub fn root<T: BeaconChainTypes>(
&self,
chain: &BeaconChain<T>,
Expand Down Expand Up @@ -48,6 +51,7 @@ impl BlockId {
}
}

/// Return the `SignedBeaconBlock` identified by `self`.
pub fn block<T: BeaconChainTypes>(
&self,
chain: &BeaconChain<T>,
Expand Down
51 changes: 51 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
//! This crate contains a HTTP server which serves the endpoints listed here:
//!
//! https://github.com/ethereum/eth2.0-APIs
//!
//! There are also some additional, non-standard endpoints behind the `/lighthouse/` path which are
//! used for development.
mod beacon_proposer_cache;
mod block_id;
mod consensus;
Expand Down Expand Up @@ -38,6 +45,9 @@ use warp::Filter;
const API_PREFIX: &str = "eth";
const API_VERSION: &str = "v1";

/// A wrapper around all the items required to spawn the HTTP server.
///
/// The server will gracefully handle the case where any fields are `None`.
pub struct Context<T: BeaconChainTypes> {
pub config: Config,
pub chain: Option<Arc<BeaconChain<T>>>,
Expand All @@ -46,6 +56,7 @@ pub struct Context<T: BeaconChainTypes> {
pub log: Logger,
}

/// Configuration for the HTTP server.
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub enabled: bool,
Expand Down Expand Up @@ -83,6 +94,7 @@ impl From<String> for Error {
}
}

/// Creates a `warp` logging wrapper which we use to create `slog` logs.
pub fn slog_logging(
log: Logger,
) -> warp::filters::log::Log<impl Fn(warp::filters::log::Info) + Clone> {
Expand Down Expand Up @@ -112,6 +124,8 @@ pub fn slog_logging(
})
}

/// Creates a `warp` logging wrapper which we use for Prometheus metrics (not necessarily logging,
/// per say).
pub fn prometheus_metrics() -> warp::filters::log::Log<impl Fn(warp::filters::log::Info) + Clone> {
warp::log::custom(move |info| {
// Here we restrict the `info.path()` value to some predefined values. Without this, we end
Expand Down Expand Up @@ -163,6 +177,21 @@ pub fn prometheus_metrics() -> warp::filters::log::Log<impl Fn(warp::filters::lo
})
}

/// Creates a server that will serve requests using information from `ctx`.
///
/// The server will shut down gracefully when the `shutdown` future resolves.
///
/// ## Returns
///
/// This function will bind the server to the provided address and then return a tuple of:
///
/// - `SocketAddr`: the address that the HTTP server will listen on.
/// - `Future`: the actual server future that will need to be awaited.
///
/// ## Errors
///
/// Returns an error if the server is unable to bind or there is another error during
/// configuration.
pub fn serve<T: BeaconChainTypes>(
ctx: Arc<Context<T>>,
shutdown: impl Future<Output = ()> + Send + Sync + 'static,
Expand All @@ -171,6 +200,7 @@ pub fn serve<T: BeaconChainTypes>(
let log = ctx.log.clone();
let allow_origin = config.allow_origin.clone();

// Sanity check.
if !config.enabled {
crit!(log, "Cannot start disabled HTTP server");
return Err(Error::Other(
Expand All @@ -180,6 +210,7 @@ pub fn serve<T: BeaconChainTypes>(

let eth1_v1 = warp::path(API_PREFIX).and(warp::path(API_VERSION));

// Instantiate the beacon proposer cache.
let beacon_proposer_cache = ctx
.chain
.as_ref()
Expand All @@ -189,6 +220,7 @@ pub fn serve<T: BeaconChainTypes>(
.map(Mutex::new)
.map(Arc::new);

// Create a `warp` filter that provides access to the proposer cache.
let beacon_proposer_cache = || {
warp::any()
.map(move || beacon_proposer_cache.clone())
Expand All @@ -202,6 +234,7 @@ pub fn serve<T: BeaconChainTypes>(
})
};

// Create a `warp` filter that provides access to the network globals.
let inner_network_globals = ctx.network_globals.clone();
let network_globals = warp::any()
.map(move || inner_network_globals.clone())
Expand All @@ -214,6 +247,7 @@ pub fn serve<T: BeaconChainTypes>(
}
});

// Create a `warp` filter that provides access to the beacon chain.
let inner_ctx = ctx.clone();
let chain_filter =
warp::any()
Expand All @@ -227,6 +261,7 @@ pub fn serve<T: BeaconChainTypes>(
}
});

// Create a `warp` filter that provides access to the network sender channel.
let inner_ctx = ctx.clone();
let network_tx_filter = warp::any()
.map(move || inner_ctx.network_tx.clone())
Expand All @@ -239,8 +274,15 @@ pub fn serve<T: BeaconChainTypes>(
}
});

// Create a `warp` filter that provides access to the logger.
let log_filter = warp::any().map(move || ctx.log.clone());

/*
*
* Start of HTTP method definitions.
*
*/

// GET beacon/genesis
let get_beacon_genesis = eth1_v1
.and(warp::path("beacon"))
Expand Down Expand Up @@ -1520,6 +1562,7 @@ pub fn serve<T: BeaconChainTypes>(
})
});

// Define the ultimate set of routes that will be provided to the server.
let routes = warp::get()
.and(
get_beacon_genesis
Expand Down Expand Up @@ -1573,10 +1616,13 @@ pub fn serve<T: BeaconChainTypes>(
)
.boxed())
.boxed()
// Maps errors into HTTP responses.
.recover(warp_utils::reject::handle_rejection)
.with(slog_logging(log.clone()))
.with(prometheus_metrics())
// Add a `Server` header.
.map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform()))
// Maybe add some CORS headers.
.map(move |reply| warp_utils::reply::maybe_cors(reply, allow_origin.as_ref()));

let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown(
Expand All @@ -1595,6 +1641,7 @@ pub fn serve<T: BeaconChainTypes>(
Ok((listening_socket, server))
}

/// Publish a message to the libp2p pubsub network.
fn publish_pubsub_message<T: EthSpec>(
network_tx: &UnboundedSender<NetworkMessage<T>>,
message: PubsubMessage<T>,
Expand All @@ -1607,6 +1654,7 @@ fn publish_pubsub_message<T: EthSpec>(
)
}

/// Publish a message to the libp2p network.
fn publish_network_message<T: EthSpec>(
network_tx: &UnboundedSender<NetworkMessage<T>>,
message: NetworkMessage<T>,
Expand All @@ -1619,13 +1667,16 @@ fn publish_network_message<T: EthSpec>(
})
}

/// Execute some task in a tokio "blocking thread". These threads are ideal for long-running
/// (blocking) tasks since they don't jam up the core executor.
async fn blocking_task<F, T>(func: F) -> T
where
F: Fn() -> T,
{
tokio::task::block_in_place(func)
}

/// A convenience wrapper around `blocking_task` for use with `warp` JSON responses.
async fn blocking_json_task<F, T>(func: F) -> Result<warp::reply::Json, warp::Rejection>
where
F: Fn() -> Result<T, warp::Rejection>,
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/http_api/src/state_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use eth2::types::StateId as CoreStateId;
use std::str::FromStr;
use types::{BeaconState, EthSpec, Fork, Hash256, Slot};

/// Wraps `eth2::types::StateId` and provides common state-access functionality. E.g., reading
/// states or parts of states from the database.
pub struct StateId(CoreStateId);

impl StateId {
Expand All @@ -14,6 +16,7 @@ impl StateId {
Self(CoreStateId::Slot(slot))
}

/// Return the state root identified by `self`.
pub fn root<T: BeaconChainTypes>(
&self,
chain: &BeaconChain<T>,
Expand Down Expand Up @@ -49,13 +52,15 @@ impl StateId {
})
}

/// Return the `fork` field of the state identified by `self`.
pub fn fork<T: BeaconChainTypes>(
&self,
chain: &BeaconChain<T>,
) -> Result<Fork, warp::Rejection> {
self.map_state(chain, |state| Ok(state.fork))
}

/// Return the `BeaconState` identified by `self`.
pub fn state<T: BeaconChainTypes>(
&self,
chain: &BeaconChain<T>,
Expand Down Expand Up @@ -83,6 +88,10 @@ impl StateId {
})
}

/// Map a function across the `BeaconState` identified by `self`.
///
/// This function will avoid instantiating/copying a new state when `self` points to the head
/// of the chain.
pub fn map_state<T: BeaconChainTypes, F, U>(
&self,
chain: &BeaconChain<T>,
Expand Down

0 comments on commit dc2518d

Please sign in to comment.