Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor RPC and implement hello protocol #246

Merged
merged 4 commits into from
Mar 2, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion blockchain/chain_sync/src/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
use async_std::sync::Sender;
use blocks::TipSetKeys;
use forest_libp2p::{
rpc::{BlockSyncRequest, RPCEvent, RPCRequest, RequestId},
blocksync::BlockSyncRequest,
rpc::{RPCEvent, RPCRequest, RequestId},
NetworkMessage,
};
use libp2p::core::PeerId;
Expand Down
4 changes: 2 additions & 2 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<'db, DB, ST> Future for ChainSyncer<'db, DB, ST> {

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().network_rx.poll_next(cx) {
Poll::Ready(Some(event)) => info!("chain syncer received event: {:?}", event),
Poll::Ready(Some(_event)) => (),
Poll::Pending | Poll::Ready(None) => (),
};
Poll::Pending
Expand Down Expand Up @@ -147,7 +147,7 @@ where
loop {
select! {
network_msg = nw.next().fuse() => match network_msg {
Some(event) => info!("received some other event: {:?}", event),
Some(event) =>(),
None => break,
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/forest_libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ forest_cid = { path = "../../ipld/cid" }
bytes = "0.5.2"
fnv = "1.0.6"
smallvec = "1.1.0"
clock = { path = "../clock" }
num-bigint = { path = "../../math/bigint", package = "forest_bigint" }

[dev-dependencies]
forest_address = { path = "../../vm/address" }
Expand Down
8 changes: 8 additions & 0 deletions node/forest_libp2p/src/blocksync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

mod message;

pub use self::message::*;

pub const BLOCKSYNC_PROTOCOL_ID: &[u8] = b"/fil/sync/blk/0.0.1";
94 changes: 94 additions & 0 deletions node/forest_libp2p/src/hello/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use clock::ChainEpoch;
use forest_cid::Cid;
use forest_encoding::{de::Deserializer, ser::Serializer};
use num_bigint::{bigint_ser, BigInt};
use serde::{Deserialize, Serialize};

/// Hello message https://filecoin-project.github.io/specs/#hello-spec
#[derive(Clone, Debug, PartialEq, Default)]
pub struct HelloMessage {
pub heaviest_tip_set: Vec<Cid>,
pub heaviest_tipset_height: ChainEpoch,
pub heaviest_tipset_weight: BigInt,
pub genesis_hash: Cid,
}

impl Serialize for HelloMessage {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
#[derive(Serialize)]
struct TupleHelloMessage<'a>(
&'a [Cid],
&'a ChainEpoch,
#[serde(with = "bigint_ser")] &'a BigInt,
&'a Cid,
);
TupleHelloMessage(
&self.heaviest_tip_set,
&self.heaviest_tipset_height,
&self.heaviest_tipset_weight,
&self.genesis_hash,
)
.serialize(serializer)
}
}

impl<'de> Deserialize<'de> for HelloMessage {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
struct TupleHelloMessage(
Vec<Cid>,
ChainEpoch,
#[serde(with = "bigint_ser")] BigInt,
Cid,
);
let TupleHelloMessage(
heaviest_tip_set,
heaviest_tipset_height,
heaviest_tipset_weight,
genesis_hash,
) = Deserialize::deserialize(deserializer)?;
Ok(HelloMessage {
heaviest_tip_set,
heaviest_tipset_height,
heaviest_tipset_weight,
genesis_hash,
})
}
}

/// Response to a Hello
#[derive(Clone, Debug, PartialEq)]
pub struct HelloResponse {
/// Time of arrival in unix nanoseconds
pub arrival: i64,
/// Time sent in unix nanoseconds
pub sent: i64,
}

impl Serialize for HelloResponse {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
(&self.arrival, &self.sent).serialize(serializer)
}
}

impl<'de> Deserialize<'de> for HelloResponse {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let (arrival, sent) = Deserialize::deserialize(deserializer)?;
Ok(HelloResponse { arrival, sent })
}
}
8 changes: 8 additions & 0 deletions node/forest_libp2p/src/hello/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

mod message;

pub use self::message::*;

pub const HELLO_PROTOCOL_ID: &[u8] = b"/fil/hello/1.0.0";
2 changes: 2 additions & 0 deletions node/forest_libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#![recursion_limit = "1024"]

mod behaviour;
pub mod blocksync;
mod config;
pub mod hello;
pub mod rpc;
mod service;

Expand Down
95 changes: 51 additions & 44 deletions node/forest_libp2p/src/rpc/codec.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,21 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::{RPCRequest, RPCResponse};
use super::{RPCError, RPCRequest, RPCResponse};
use crate::blocksync::BLOCKSYNC_PROTOCOL_ID;
use crate::hello::HELLO_PROTOCOL_ID;
use bytes::BytesMut;
use forest_encoding::{error::Error as EncodingError, from_slice, to_vec};
use forest_encoding::{from_slice, to_vec};
use futures_codec::{Decoder, Encoder};
use std::fmt;

/// Codec used for inbound connections. Decodes the inbound message into a RPCRequest, and encodes the RPCResponse to send.
pub struct InboundCodec;
/// Codec used for outbound connections. Encodes the outbound message into a RPCRequest to send, and decodes the RPCResponse when received.
pub struct OutboundCodec;

#[derive(Debug, Clone, PartialEq)]
pub enum RPCError {
Codec(String),
Custom(String),
}
impl From<std::io::Error> for RPCError {
fn from(err: std::io::Error) -> Self {
Self::Custom(err.to_string())
}
pub struct InboundCodec {
protocol: &'static [u8],
}

impl From<EncodingError> for RPCError {
fn from(err: EncodingError) -> Self {
Self::Codec(err.to_string())
}
}

impl fmt::Display for RPCError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RPCError::Codec(err) => write!(f, "Codec Error: {}", err),
RPCError::Custom(err) => write!(f, "{}", err),
}
}
}

impl std::error::Error for RPCError {
fn description(&self) -> &str {
"Libp2p RPC Error"
impl InboundCodec {
pub fn new(protocol: &'static [u8]) -> Self {
Self { protocol }
}
}

Expand All @@ -56,6 +31,12 @@ impl Encoder for InboundCodec {
dst.extend_from_slice(&resp);
Ok(())
}
RPCResponse::Hello(response) => {
let resp = to_vec(&response)?;
dst.clear();
dst.extend_from_slice(&resp);
Ok(())
}
}
}
}
Expand All @@ -69,16 +50,32 @@ impl Decoder for InboundCodec {
return Ok(None);
}

Ok(Some(RPCRequest::Blocksync(
from_slice(bz).map_err(|err| RPCError::Codec(err.to_string()))?,
)))
match self.protocol {
HELLO_PROTOCOL_ID => Ok(Some(RPCRequest::Hello(
from_slice(bz).map_err(|err| RPCError::Codec(err.to_string()))?,
))),
BLOCKSYNC_PROTOCOL_ID => Ok(Some(RPCRequest::Blocksync(
from_slice(bz).map_err(|err| RPCError::Codec(err.to_string()))?,
))),
_ => Err(RPCError::Codec("Unsupported codec".to_string())),
}
}
}

/// Codec used for outbound connections. Encodes the outbound message into a RPCRequest to send, and decodes the RPCResponse when received.
pub struct OutboundCodec {
protocol: &'static [u8],
}

impl OutboundCodec {
pub fn new(protocol: &'static [u8]) -> Self {
Self { protocol }
}
}

impl Encoder for OutboundCodec {
type Error = RPCError;
type Item = RPCRequest;

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
match item {
RPCRequest::Blocksync(request) => {
Expand All @@ -87,21 +84,31 @@ impl Encoder for OutboundCodec {
dst.extend_from_slice(&resp);
Ok(())
}
RPCRequest::Hello(request) => {
let resp = to_vec(&request)?;
dst.clear();
dst.extend_from_slice(&resp);
Ok(())
}
}
}
}

impl Decoder for OutboundCodec {
type Error = RPCError;
type Item = RPCResponse;

fn decode(&mut self, bz: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if bz.is_empty() {
return Ok(None);
}

Ok(Some(RPCResponse::Blocksync(
// Replace map
from_slice(bz).map_err(|err| RPCError::Codec(err.to_string()))?,
)))
match self.protocol {
HELLO_PROTOCOL_ID => Ok(Some(RPCResponse::Hello(
from_slice(bz).map_err(|err| RPCError::Codec(err.to_string()))?,
))),
BLOCKSYNC_PROTOCOL_ID => Ok(Some(RPCResponse::Blocksync(
from_slice(bz).map_err(|err| RPCError::Codec(err.to_string()))?,
))),
_ => Err(RPCError::Codec("Unsupported codec".to_string())),
}
}
}
7 changes: 4 additions & 3 deletions node/forest_libp2p/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use super::{
InboundCodec, OutboundFramed, RPCError, RPCEvent, RPCInbound, RPCOutbound, RPCResponse,
InboundCodec, OutboundFramed, RPCError, RPCEvent, RPCInbound, RPCRequest, RPCResponse,
RequestId,
};
use fnv::FnvHashMap;
Expand Down Expand Up @@ -140,7 +140,7 @@ where
type Error = RPCError;
type Substream = TSubstream;
type InboundProtocol = RPCInbound;
type OutboundProtocol = RPCOutbound;
type OutboundProtocol = RPCRequest;
type OutboundOpenInfo = RPCEvent;

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
Expand Down Expand Up @@ -347,9 +347,10 @@ where
if self.dial_negotiated < self.max_dial_negotiated {
self.dial_negotiated += 1;
let event = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit();
if let RPCEvent::Request(id, req) = event {
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(RPCOutbound { req: req.clone() }),
protocol: SubstreamProtocol::new(req.clone()),
info: RPCEvent::Request(id, req),
});
}
Expand Down
40 changes: 36 additions & 4 deletions node/forest_libp2p/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@
// SPDX-License-Identifier: Apache-2.0, MIT

mod behaviour;
mod blocksync_message;
mod codec;
mod handler;
mod protocol;
mod rpc_message;

pub use behaviour::*;
pub use blocksync_message::*;
pub use codec::*;
pub use handler::*;
pub use protocol::*;
pub use rpc_message::*;

use forest_encoding::error::Error as EncodingError;
use std::fmt;

pub type RequestId = usize;

Expand Down Expand Up @@ -51,3 +50,36 @@ impl std::fmt::Display for RPCEvent {
}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency should this be its own file error.rs?

#[derive(Debug, Clone, PartialEq)]
pub enum RPCError {
Codec(String),
Custom(String),
}

impl From<std::io::Error> for RPCError {
fn from(err: std::io::Error) -> Self {
Self::Custom(err.to_string())
}
}

impl From<EncodingError> for RPCError {
fn from(err: EncodingError) -> Self {
Self::Codec(err.to_string())
}
}

impl fmt::Display for RPCError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RPCError::Codec(err) => write!(f, "Codec Error: {}", err),
RPCError::Custom(err) => write!(f, "{}", err),
}
}
}

impl std::error::Error for RPCError {
fn description(&self) -> &str {
"Libp2p RPC Error"
}
}
Loading