Skip to content

Commit

Permalink
Start adding post route, fix method issue
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Sep 5, 2020
1 parent c7326e1 commit 37bde29
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 20 deletions.
1 change: 1 addition & 0 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ where
let ctx = Arc::new(http_api::Context {
config: self.http_api_config.clone(),
chain: self.beacon_chain.clone(),
network_tx: self.network_send.clone(),
// TODO
log: self.runtime_context.as_ref().unwrap().log().clone(),
});
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ hex = "0.4.2"
beacon_chain = { path = "../beacon_chain" }
eth2 = { path = "../../common/eth2" }
slog = "2.5.2"

network = { path = "../network" }
eth2_libp2p = { path = "../eth2_libp2p" }

[dev-dependencies]
store = { path = "../store" }
Expand Down
116 changes: 98 additions & 18 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ mod state_id;
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use block_id::BlockId;
use eth2::types::{self as api_types, ValidatorId};
use eth2_libp2p::PubsubMessage;
use network::NetworkMessage;
use serde::{Deserialize, Serialize};
use slog::{crit, info, Logger};
use slog::{crit, error, info, Logger};
use state_id::StateId;
use std::borrow::Cow;
use std::future::Future;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use types::{CommitteeCache, Epoch, EthSpec, RelativeEpoch};
use tokio::sync::mpsc::UnboundedSender;
use types::{CommitteeCache, Epoch, EthSpec, RelativeEpoch, SignedBeaconBlock};
use warp::Filter;

const API_PREFIX: &str = "eth";
Expand All @@ -21,6 +24,7 @@ const API_VERSION: &str = "v1";
pub struct Context<T: BeaconChainTypes> {
pub config: Config,
pub chain: Option<Arc<BeaconChain<T>>>,
pub network_tx: Option<UnboundedSender<NetworkMessage<T::EthSpec>>>,
pub log: Logger,
}

Expand Down Expand Up @@ -53,20 +57,37 @@ pub fn serve<T: BeaconChainTypes>(
panic!("a disabled server should not be started");
}

let base_path = warp::path(API_PREFIX).and(warp::path(API_VERSION));
let chain_filter = warp::any()
.map(move || ctx.chain.clone())
.and_then(|chain| async move {
match chain {
Some(chain) => Ok(chain),
let eth1_v1 = warp::path(API_PREFIX).and(warp::path(API_VERSION));

let inner_ctx = ctx.clone();
let chain_filter =
warp::any()
.map(move || inner_ctx.chain.clone())
.and_then(|chain| async move {
match chain {
Some(chain) => Ok(chain),
None => Err(crate::reject::custom_not_found(
"Beacon chain genesis has not yet been observed.".to_string(),
)),
}
});

let inner_ctx = ctx.clone();
let network_tx_filter = warp::any()
.map(move || inner_ctx.network_tx.clone())
.and_then(|network_tx| async move {
match network_tx {
Some(network_tx) => Ok(network_tx),
None => Err(crate::reject::custom_not_found(
"Beacon chain genesis has not yet been observed.".to_string(),
"The networking stack has not yet started.".to_string(),
)),
}
});

let log_filter = warp::any().map(move || ctx.log.clone());

// beacon/genesis
let beacon_genesis = base_path
let beacon_genesis = eth1_v1
.and(warp::path("beacon"))
.and(warp::path("genesis"))
.and(warp::path::end())
Expand All @@ -89,7 +110,7 @@ pub fn serve<T: BeaconChainTypes>(
* beacon/states/{state_id}
*/

let beacon_states_path = base_path
let beacon_states_path = eth1_v1
.and(warp::path("beacon"))
.and(warp::path("states"))
.and(warp::path::param::<StateId>())
Expand Down Expand Up @@ -119,10 +140,17 @@ pub fn serve<T: BeaconChainTypes>(
});

// beacon/states/{state_id}/finality_checkpoints
let beacon_state_finality_checkpoints = beacon_states_path
.clone()
//let beacon_state_finality_checkpoints = beacon_states_path
let beacon_state_finality_checkpoints = warp::any()
.and(warp::path(API_PREFIX))
.and(warp::path(API_VERSION))
.and(warp::path("beacon"))
.and(warp::path("states"))
.and(warp::path::param::<StateId>())
.and(warp::path("finality_checkpoints"))
.and(warp::path::end())
.and(warp::get())
.and(chain_filter.clone())
.and_then(|state_id: StateId, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
state_id
Expand Down Expand Up @@ -308,7 +336,7 @@ pub fn serve<T: BeaconChainTypes>(
// things. Returning non-canonical things is hard for us since we don't already have a
// mechanism for arbitrary forwards block iteration, we only support iterating forwards along
// the canonical chain.
let beacon_headers = base_path
let beacon_headers = eth1_v1
.and(warp::path("beacon"))
.and(warp::path("headers"))
.and(warp::query::<api_types::HeadersQuery>())
Expand Down Expand Up @@ -383,7 +411,7 @@ pub fn serve<T: BeaconChainTypes>(
);

// beacon/headers/{block_id}
let beacon_headers_block_id = base_path
let beacon_headers_block_id = eth1_v1
.and(warp::path("beacon"))
.and(warp::path("headers"))
.and(warp::path::param::<BlockId>())
Expand Down Expand Up @@ -413,10 +441,62 @@ pub fn serve<T: BeaconChainTypes>(
});

/*
* beacon/blocks/{block_id}
* beacon/blocks
*/

let beacon_blocks_path = base_path
// beacon/blocks/{block_id}
//let post_beacon_blocks = post_eth1_v1
let post_beacon_blocks = eth1_v1
.and(warp::path("beacon"))
.and(warp::path("blocks"))
.and(warp::path::end())
.and(warp::post())
.and(warp::body::json())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then(
|block: SignedBeaconBlock<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
blocking_json_task(move || {
dbg!("rahh");
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
if let Err(e) = network_tx.send(NetworkMessage::Publish {
messages: vec![PubsubMessage::BeaconBlock(Box::new(block.clone()))],
}) {
return Err(crate::reject::custom_server_error(format!(
"unable to publish to network channel: {}",
e
)));
}

match chain.process_block(block.clone()) {
Ok(root) => {
info!(
log,
"Valid block from HTTP API";
"root" => format!("{}", root)
);
Ok(())
}
Err(e) => {
let msg = format!("{:?}", e);
error!(
log,
"Invalid block provided to HTTP API";
"reason" => &msg
);
Err(crate::reject::block_failed_validation(msg))
}
}
})
},
);

let beacon_blocks_path = eth1_v1
.and(warp::path("beacon"))
.and(warp::path("blocks"))
.and(warp::path::param::<BlockId>())
Expand Down Expand Up @@ -466,12 +546,12 @@ pub fn serve<T: BeaconChainTypes>(
.or(beacon_state_committees)
.or(beacon_headers)
.or(beacon_headers_block_id)
.or(post_beacon_blocks)
.or(beacon_block)
.or(beacon_block_attestations)
.or(beacon_block_root)
.recover(crate::reject::handle_rejection);

// let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown(
SocketAddrV4::new(config.listen_addr, config.listen_port),
async {
Expand Down
28 changes: 28 additions & 0 deletions beacon_node/http_api/src/reject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,24 @@ pub fn custom_bad_request(msg: String) -> warp::reject::Rejection {
warp::reject::custom(CustomBadRequest(msg))
}

#[derive(Debug)]
pub struct CustomServerError(pub String);

impl Reject for CustomServerError {}

pub fn custom_server_error(msg: String) -> warp::reject::Rejection {
warp::reject::custom(CustomServerError(msg))
}

#[derive(Debug)]
pub struct BlockFailedValidation(pub String);

impl Reject for BlockFailedValidation {}

pub fn block_failed_validation(msg: String) -> warp::reject::Rejection {
warp::reject::custom(BlockFailedValidation(msg))
}

/// An API error serializable to JSON.
#[derive(Serialize)]
struct ErrorMessage {
Expand Down Expand Up @@ -76,6 +94,16 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply,
} else if let Some(e) = err.find::<crate::reject::CustomBadRequest>() {
code = StatusCode::BAD_REQUEST;
message = format!("BAD_REQUEST: {}", e.0);
} else if let Some(e) = err.find::<crate::reject::CustomServerError>() {
code = StatusCode::INTERNAL_SERVER_ERROR;
message = format!("INTERNAL_SERVER_ERROR: {}", e.0);
} else if let Some(e) = err.find::<crate::reject::BlockFailedValidation>() {
code = StatusCode::ACCEPTED;
message = format!(
"ACCEPTED: The block failed validation, but was successfully broadcast anyway. \
It was not integrated into the beacon node database: {}",
e.0
);
} else {
// We should have expected this... Just log and say its a 500
eprintln!("unhandled rejection: {:?}", err);
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use beacon_chain::{
use environment::null_logger;
use eth2::{types::*, BeaconNodeClient, Url};
use http_api::{Config, Context};
use network::NetworkMessage;
use std::net::Ipv4Addr;
use std::sync::Arc;
use store::config::StoreConfig;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use types::{
test_utils::generate_deterministic_keypairs, BeaconState, EthSpec, Hash256, MainnetEthSpec,
Expand Down Expand Up @@ -36,6 +38,7 @@ struct ApiTester {
chain: Arc<BeaconChain<HarnessType<E>>>,
client: BeaconNodeClient,
_server_shutdown: oneshot::Sender<()>,
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
}

impl ApiTester {
Expand Down Expand Up @@ -79,13 +82,16 @@ impl ApiTester {
"precondition: justification"
);

let (network_tx, network_rx) = mpsc::unbounded_channel();

let context = Arc::new(Context {
config: Config {
enabled: true,
listen_addr: Ipv4Addr::new(127, 0, 0, 1),
listen_port: 0,
},
chain: Some(chain.clone()),
network_tx: Some(network_tx),
log: null_logger().unwrap(),
});
let ctx = context.clone();
Expand All @@ -112,6 +118,7 @@ impl ApiTester {
chain,
client,
_server_shutdown: shutdown_tx,
network_rx,
}
}

Expand Down
24 changes: 24 additions & 0 deletions common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,30 @@ impl BeaconNodeClient {
self.get_opt(path).await
}

/// `GET beacon/blocks`
///
/// Returns `Ok(None)` on a 404 error.
pub async fn post_beacon_blocks<T: EthSpec>(
&self,
block: SignedBeaconBlock<T>,
) -> Result<(), Error> {
let mut path = self.server.clone();

path.path_segments_mut()
.expect("path is base")
.push("beacon")
.push("blocks");

self.client
.post(path)
.json(&block)
.send()
.await?
.error_for_status()?;

Ok(())
}

/// `GET beacon/blocks`
///
/// Returns `Ok(None)` on a 404 error.
Expand Down
2 changes: 1 addition & 1 deletion common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl fmt::Display for StateId {
}
}

#[derive(Serialize, Deserialize)]
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(bound = "T: Serialize + serde::de::DeserializeOwned")]
pub struct GenericResponse<T: Serialize + serde::de::DeserializeOwned> {
pub data: T,
Expand Down

0 comments on commit 37bde29

Please sign in to comment.