From f5030c9ac7e6e082874761e4d4167acf169e6e9b Mon Sep 17 00:00:00 2001 From: austinabell Date: Fri, 22 May 2020 17:14:05 -0400 Subject: [PATCH 1/4] Setup libp2p graphsync framework --- ipld/graphsync/Cargo.toml | 7 ++ ipld/graphsync/src/lib.rs | 2 + ipld/graphsync/src/libp2p/behaviour.rs | 89 +++++++++++++++++++++++++ ipld/graphsync/src/libp2p/codec.rs | 32 +++++++++ ipld/graphsync/src/libp2p/error.rs | 38 +++++++++++ ipld/graphsync/src/libp2p/handler.rs | 91 ++++++++++++++++++++++++++ ipld/graphsync/src/libp2p/mod.rs | 13 ++++ ipld/graphsync/src/libp2p/protocol.rs | 75 +++++++++++++++++++++ 8 files changed, 347 insertions(+) create mode 100644 ipld/graphsync/src/libp2p/behaviour.rs create mode 100644 ipld/graphsync/src/libp2p/codec.rs create mode 100644 ipld/graphsync/src/libp2p/error.rs create mode 100644 ipld/graphsync/src/libp2p/handler.rs create mode 100644 ipld/graphsync/src/libp2p/mod.rs create mode 100644 ipld/graphsync/src/libp2p/protocol.rs diff --git a/ipld/graphsync/Cargo.toml b/ipld/graphsync/Cargo.toml index 471beaa3a25e..25fb1fb8653a 100644 --- a/ipld/graphsync/Cargo.toml +++ b/ipld/graphsync/Cargo.toml @@ -10,6 +10,13 @@ cid = { package = "forest_cid", path = "../cid", version = "0.1" } forest_ipld = { path = "../" } fnv = "1.0.6" forest_encoding = { path = "../../encoding", version = "0.1" } +libp2p = "0.19" +futures = "0.3.2" +futures-util = "0.3.1" +futures_codec = "0.4.0" +log = "0.4.8" +bytes = "0.5.2" +unsigned-varint = { version = "0.3.0", features = ["futures-codec"] } [build-dependencies] protoc-rust = "2.14.0" diff --git a/ipld/graphsync/src/lib.rs b/ipld/graphsync/src/lib.rs index abec1008effd..8028fedfdaf2 100644 --- a/ipld/graphsync/src/lib.rs +++ b/ipld/graphsync/src/lib.rs @@ -1,6 +1,8 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +// TODO evaluate exporting from libp2p mod +pub mod libp2p; mod message; pub use self::message::*; diff --git a/ipld/graphsync/src/libp2p/behaviour.rs b/ipld/graphsync/src/libp2p/behaviour.rs new file mode 100644 index 000000000000..551fcb6fb005 --- /dev/null +++ b/ipld/graphsync/src/libp2p/behaviour.rs @@ -0,0 +1,89 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::handler::GraphSyncHandler; +use crate::GraphSyncMessage; +use futures::task::Context; +use futures_util::task::Poll; +use libp2p::core::connection::ConnectionId; +use libp2p::swarm::{ + protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, +}; +use libp2p::{Multiaddr, PeerId}; + +/// The RPC behaviour that gets consumed by the Swarm. +pub struct RPC { + /// Queue of events to processed. + events: Vec>, +} + +impl RPC { + /// Creates a new RPC behaviour + pub fn new() -> Self { + RPC::default() + } +} + +impl Default for RPC { + fn default() -> Self { + RPC { events: vec![] } + } +} + +impl NetworkBehaviour for RPC { + type ProtocolsHandler = GraphSyncHandler; + type OutEvent = GraphSyncEvent; + fn new_handler(&mut self) -> Self::ProtocolsHandler { + GraphSyncHandler::default() + } + + fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { + vec![] + } + + fn inject_connected(&mut self, _peer_id: &PeerId) { + todo!() + } + + fn inject_disconnected(&mut self, _peer_id: &PeerId) { + todo!() + } + + fn inject_event( + &mut self, + _peer_id: PeerId, + _connection: ConnectionId, + _event: ::OutEvent, + ) { + todo!() + } + + fn poll( + &mut self, + _: &mut Context, + _: &mut impl PollParameters, + ) -> Poll< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + if !self.events.is_empty() { + return Poll::Ready(self.events.remove(0)); + } + Poll::Pending + } +} + +// TODO remove +#[allow(dead_code)] +/// Event from the GraphSync behaviour. +#[derive(Debug)] +pub enum GraphSyncEvent { + /// A message has been received. This contains the PeerId that we received the message from + /// and the message itself. + Message(PeerId, GraphSyncMessage), + + Connected(PeerId), + Disconnected(PeerId), +} diff --git a/ipld/graphsync/src/libp2p/codec.rs b/ipld/graphsync/src/libp2p/codec.rs new file mode 100644 index 000000000000..a7191827fd84 --- /dev/null +++ b/ipld/graphsync/src/libp2p/codec.rs @@ -0,0 +1,32 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::RPCError; +use crate::GraphSyncMessage; +use bytes::BytesMut; +use futures_codec::{Decoder, Encoder}; +use unsigned_varint::codec; + +#[allow(dead_code)] +/// Codec used +pub struct GraphSyncCodec { + pub(crate) length_codec: codec::UviBytes, +} + +impl Encoder for GraphSyncCodec { + type Error = RPCError; + type Item = GraphSyncMessage; + + fn encode(&mut self, _item: Self::Item, _dst: &mut BytesMut) -> Result<(), Self::Error> { + todo!() + } +} + +impl Decoder for GraphSyncCodec { + type Error = RPCError; + type Item = GraphSyncMessage; + + fn decode(&mut self, _bz: &mut BytesMut) -> Result, Self::Error> { + todo!() + } +} diff --git a/ipld/graphsync/src/libp2p/error.rs b/ipld/graphsync/src/libp2p/error.rs new file mode 100644 index 000000000000..60194159412c --- /dev/null +++ b/ipld/graphsync/src/libp2p/error.rs @@ -0,0 +1,38 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use forest_encoding::error::Error as EncodingError; +use std::fmt; + +#[derive(Debug, Clone, PartialEq)] +pub enum RPCError { + Codec(String), + Custom(String), +} + +impl From for RPCError { + fn from(err: std::io::Error) -> Self { + Self::Custom(err.to_string()) + } +} + +impl From for RPCError { + fn from(err: EncodingError) -> Self { + Self::Codec(err.to_string()) + } +} + +impl fmt::Display for RPCError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RPCError::Codec(err) => write!(f, "Codec Error: {}", err), + RPCError::Custom(err) => write!(f, "{}", err), + } + } +} + +impl std::error::Error for RPCError { + fn description(&self) -> &str { + "Libp2p RPC Error" + } +} diff --git a/ipld/graphsync/src/libp2p/handler.rs b/ipld/graphsync/src/libp2p/handler.rs new file mode 100644 index 000000000000..cccf5755bb30 --- /dev/null +++ b/ipld/graphsync/src/libp2p/handler.rs @@ -0,0 +1,91 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::protocol::ProtocolConfig; +use crate::GraphSyncMessage; +use libp2p::swarm::{ + KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr, SubstreamProtocol, +}; +use libp2p::{InboundUpgrade, OutboundUpgrade}; +use std::io; +use std::task::{Context, Poll}; + +pub struct GraphSyncHandler { + // TODO +} + +impl GraphSyncHandler { + /// Constructor for new RPC handler + pub fn new() -> Self { + // TODO + GraphSyncHandler {} + } +} + +impl Default for GraphSyncHandler { + fn default() -> Self { + GraphSyncHandler::new() + } +} + +impl ProtocolsHandler for GraphSyncHandler { + type InEvent = GraphSyncMessage; + type OutEvent = GraphSyncMessage; + type Error = io::Error; + type InboundProtocol = ProtocolConfig; + type OutboundProtocol = ProtocolConfig; + type OutboundOpenInfo = GraphSyncMessage; + + fn listen_protocol(&self) -> SubstreamProtocol { + todo!() + } + + fn inject_fully_negotiated_inbound( + &mut self, + _out: >::Output, + ) { + todo!() + } + + fn inject_fully_negotiated_outbound( + &mut self, + _substream: >::Output, + _event: Self::OutboundOpenInfo, + ) { + todo!() + } + + fn inject_event(&mut self, _event: Self::InEvent) { + todo!() + } + + fn inject_dial_upgrade_error( + &mut self, + _: Self::OutboundOpenInfo, + _error: ProtocolsHandlerUpgrErr< + >::Error, + >, + ) { + todo!() + } + + fn connection_keep_alive(&self) -> KeepAlive { + todo!() + } + + #[allow(clippy::type_complexity)] + fn poll( + &mut self, + _cx: &mut Context, + ) -> Poll< + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + todo!() + } +} diff --git a/ipld/graphsync/src/libp2p/mod.rs b/ipld/graphsync/src/libp2p/mod.rs new file mode 100644 index 000000000000..4af9864030d1 --- /dev/null +++ b/ipld/graphsync/src/libp2p/mod.rs @@ -0,0 +1,13 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +mod behaviour; +mod codec; +mod error; +mod handler; +mod protocol; + +pub use self::behaviour::*; +pub use self::codec::*; +pub use self::error::*; +pub use self::handler::*; diff --git a/ipld/graphsync/src/libp2p/protocol.rs b/ipld/graphsync/src/libp2p/protocol.rs new file mode 100644 index 000000000000..478fa552f2c5 --- /dev/null +++ b/ipld/graphsync/src/libp2p/protocol.rs @@ -0,0 +1,75 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::GraphSyncCodec; +use futures::prelude::*; +use futures::{AsyncRead, AsyncWrite}; +use futures_codec::Framed; +use libp2p::core::UpgradeInfo; +use libp2p::{InboundUpgrade, OutboundUpgrade}; +use std::borrow::Cow; +use std::io; +use std::iter; +use std::pin::Pin; +use unsigned_varint::codec; + +/// Protocol upgrade for GraphSync requests. +#[derive(Debug, Clone)] +pub struct ProtocolConfig { + protocol_id: Cow<'static, [u8]>, +} + +impl Default for ProtocolConfig { + fn default() -> Self { + Self { + protocol_id: Cow::Borrowed(b"/ipfs/graphsync/1.0.0"), + } + } +} + +impl UpgradeInfo for ProtocolConfig { + type Info = Cow<'static, [u8]>; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol_id.clone()) + } +} + +impl InboundUpgrade for ProtocolConfig +where + TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + type Output = Framed; + type Error = io::Error; + #[allow(clippy::type_complexity)] + type Future = Pin> + Send>>; + + fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + let length_codec = codec::UviBytes::default(); + // length_codec.set_max_len(self.max_transmit_size); + Box::pin(future::ok(Framed::new( + socket, + GraphSyncCodec { length_codec }, + ))) + } +} + +impl OutboundUpgrade for ProtocolConfig +where + TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static, +{ + type Output = Framed; + type Error = io::Error; + #[allow(clippy::type_complexity)] + type Future = Pin> + Send>>; + + fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + let length_codec = codec::UviBytes::default(); + // length_codec.set_max_len(self.max_transmit_size); + Box::pin(future::ok(Framed::new( + socket, + GraphSyncCodec { length_codec }, + ))) + } +} From 209509052a3d1ada197a6af63596ccbac8cc4728 Mon Sep 17 00:00:00 2001 From: austinabell Date: Sat, 23 May 2020 13:00:08 -0400 Subject: [PATCH 2/4] Setup handler and behaviour frameworks --- ipld/graphsync/Cargo.toml | 1 + ipld/graphsync/src/libp2p/behaviour.rs | 91 ++++++++++++++---------- ipld/graphsync/src/libp2p/codec.rs | 8 +-- ipld/graphsync/src/libp2p/error.rs | 38 ----------- ipld/graphsync/src/libp2p/handler.rs | 95 ++++++++++++++++++++++---- ipld/graphsync/src/libp2p/mod.rs | 2 - ipld/graphsync/src/libp2p/protocol.rs | 1 + 7 files changed, 140 insertions(+), 96 deletions(-) delete mode 100644 ipld/graphsync/src/libp2p/error.rs diff --git a/ipld/graphsync/Cargo.toml b/ipld/graphsync/Cargo.toml index 25fb1fb8653a..b254d077eba8 100644 --- a/ipld/graphsync/Cargo.toml +++ b/ipld/graphsync/Cargo.toml @@ -17,6 +17,7 @@ futures_codec = "0.4.0" log = "0.4.8" bytes = "0.5.2" unsigned-varint = { version = "0.3.0", features = ["futures-codec"] } +smallvec = "1.1.0" [build-dependencies] protoc-rust = "2.14.0" diff --git a/ipld/graphsync/src/libp2p/behaviour.rs b/ipld/graphsync/src/libp2p/behaviour.rs index 551fcb6fb005..981406e54f47 100644 --- a/ipld/graphsync/src/libp2p/behaviour.rs +++ b/ipld/graphsync/src/libp2p/behaviour.rs @@ -2,60 +2,90 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::handler::GraphSyncHandler; -use crate::GraphSyncMessage; +use crate::{Extensions, GraphSyncMessage}; +use cid::Cid; +use forest_ipld::selector::Selector; use futures::task::Context; use futures_util::task::Poll; use libp2p::core::connection::ConnectionId; use libp2p::swarm::{ - protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, + protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + PollParameters, }; use libp2p::{Multiaddr, PeerId}; +use log::debug; +use std::collections::{HashSet, VecDeque}; -/// The RPC behaviour that gets consumed by the Swarm. -pub struct RPC { +/// The GraphSync behaviour that gets consumed by the Swarm. +#[derive(Default)] +pub struct GraphSync { /// Queue of events to processed. - events: Vec>, + events: VecDeque>, + + // TODO just temporary, will probably have to attach some data with peers + peers: HashSet, } -impl RPC { - /// Creates a new RPC behaviour +impl GraphSync { + /// Creates a new GraphSync behaviour pub fn new() -> Self { - RPC::default() + GraphSync::default() } -} -impl Default for RPC { - fn default() -> Self { - RPC { events: vec![] } + /// Initiates GraphSync request to peer given root and selector. + pub fn send_request( + &mut self, + _peer_id: PeerId, + _root: Cid, + _selector: Selector, + _extensions: Extensions, + ) { + todo!() + // self.events + // .push_back(NetworkBehaviourAction::NotifyHandler { + // peer_id, + // // TODO once request manager logic built out + // event: todo!(), + // handler: NotifyHandler::Any, + // }); } } -impl NetworkBehaviour for RPC { +impl NetworkBehaviour for GraphSync { type ProtocolsHandler = GraphSyncHandler; - type OutEvent = GraphSyncEvent; + // TODO this will need to be updated to include data emitted from the GS responses + type OutEvent = (); + fn new_handler(&mut self) -> Self::ProtocolsHandler { GraphSyncHandler::default() } fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { - vec![] + Vec::new() } - fn inject_connected(&mut self, _peer_id: &PeerId) { - todo!() + fn inject_connected(&mut self, peer_id: &PeerId) { + debug!("New peer connected: {:?}", peer_id); + self.peers.insert(peer_id.clone()); } - fn inject_disconnected(&mut self, _peer_id: &PeerId) { - todo!() + fn inject_disconnected(&mut self, peer_id: &PeerId) { + debug!("Peer disconnected: {:?}", peer_id); + self.peers.remove(peer_id); } fn inject_event( &mut self, - _peer_id: PeerId, + peer_id: PeerId, _connection: ConnectionId, - _event: ::OutEvent, + event: GraphSyncMessage, ) { - todo!() + self.events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id, + event, + handler: NotifyHandler::Any, + }); } fn poll( @@ -68,22 +98,9 @@ impl NetworkBehaviour for RPC { Self::OutEvent, >, > { - if !self.events.is_empty() { - return Poll::Ready(self.events.remove(0)); + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); } Poll::Pending } } - -// TODO remove -#[allow(dead_code)] -/// Event from the GraphSync behaviour. -#[derive(Debug)] -pub enum GraphSyncEvent { - /// A message has been received. This contains the PeerId that we received the message from - /// and the message itself. - Message(PeerId, GraphSyncMessage), - - Connected(PeerId), - Disconnected(PeerId), -} diff --git a/ipld/graphsync/src/libp2p/codec.rs b/ipld/graphsync/src/libp2p/codec.rs index a7191827fd84..1aa87f7c792b 100644 --- a/ipld/graphsync/src/libp2p/codec.rs +++ b/ipld/graphsync/src/libp2p/codec.rs @@ -1,20 +1,20 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use super::RPCError; use crate::GraphSyncMessage; use bytes::BytesMut; use futures_codec::{Decoder, Encoder}; +use std::io; use unsigned_varint::codec; #[allow(dead_code)] -/// Codec used +/// Codec used for encoding and decoding protobuf messages pub struct GraphSyncCodec { pub(crate) length_codec: codec::UviBytes, } impl Encoder for GraphSyncCodec { - type Error = RPCError; + type Error = io::Error; type Item = GraphSyncMessage; fn encode(&mut self, _item: Self::Item, _dst: &mut BytesMut) -> Result<(), Self::Error> { @@ -23,7 +23,7 @@ impl Encoder for GraphSyncCodec { } impl Decoder for GraphSyncCodec { - type Error = RPCError; + type Error = io::Error; type Item = GraphSyncMessage; fn decode(&mut self, _bz: &mut BytesMut) -> Result, Self::Error> { diff --git a/ipld/graphsync/src/libp2p/error.rs b/ipld/graphsync/src/libp2p/error.rs deleted file mode 100644 index 60194159412c..000000000000 --- a/ipld/graphsync/src/libp2p/error.rs +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -use forest_encoding::error::Error as EncodingError; -use std::fmt; - -#[derive(Debug, Clone, PartialEq)] -pub enum RPCError { - Codec(String), - Custom(String), -} - -impl From for RPCError { - fn from(err: std::io::Error) -> Self { - Self::Custom(err.to_string()) - } -} - -impl From for RPCError { - fn from(err: EncodingError) -> Self { - Self::Codec(err.to_string()) - } -} - -impl fmt::Display for RPCError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - RPCError::Codec(err) => write!(f, "Codec Error: {}", err), - RPCError::Custom(err) => write!(f, "{}", err), - } - } -} - -impl std::error::Error for RPCError { - fn description(&self) -> &str { - "Libp2p RPC Error" - } -} diff --git a/ipld/graphsync/src/libp2p/handler.rs b/ipld/graphsync/src/libp2p/handler.rs index cccf5755bb30..f86df3a22fea 100644 --- a/ipld/graphsync/src/libp2p/handler.rs +++ b/ipld/graphsync/src/libp2p/handler.rs @@ -1,25 +1,68 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use super::codec::GraphSyncCodec; use super::protocol::ProtocolConfig; use crate::GraphSyncMessage; +use futures_codec::Framed; use libp2p::swarm::{ KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; use libp2p::{InboundUpgrade, OutboundUpgrade}; +use log::trace; +use smallvec::SmallVec; +use std::collections::VecDeque; use std::io; use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +// TODO move this to config option +const TIMEOUT: u64 = 10; + +/// Handler implementation for GraphSync protocol. pub struct GraphSyncHandler { - // TODO + /// Upgrade configuration for the GraphSync protocol. + listen_protocol: SubstreamProtocol, + + /// Map of current substreams awaiting a response to a GraphSync request. + inbound_substreams: VecDeque, + + /// Queue of outbound substreams to open. + dial_queue: SmallVec<[GraphSyncMessage; 4]>, + + /// Current number of concurrent outbound substreams being opened. + dial_negotiated: u32, + + /// Maximum number of concurrent outbound substreams being opened. Value is never modified. + _max_dial_negotiated: u32, + + /// Value to return from `connection_keep_alive`. + keep_alive: KeepAlive, + + /// If `Some`, something bad happened and we should shut down the handler with an error. + pending_error: Option>, } impl GraphSyncHandler { /// Constructor for new RPC handler pub fn new() -> Self { - // TODO - GraphSyncHandler {} + GraphSyncHandler { + listen_protocol: SubstreamProtocol::new(ProtocolConfig::default()), + inbound_substreams: Default::default(), + dial_queue: Default::default(), + dial_negotiated: Default::default(), + _max_dial_negotiated: 8, + keep_alive: KeepAlive::Yes, + pending_error: None, + } + } + + /// Opens an outbound substream with `upgrade`. + #[inline] + fn send_request(&mut self, upgrade: GraphSyncMessage) { + self.keep_alive = KeepAlive::Yes; + self.dial_queue.push(upgrade); } } @@ -29,6 +72,18 @@ impl Default for GraphSyncHandler { } } +// TODO remove allow dead_code on impl +#[allow(dead_code, clippy::large_enum_variant)] +/// State of the inbound substream, opened either by us or by the remote. +enum InboundSubstreamState { + /// Waiting for a message from the remote. The idle state for an inbound substream. + WaitingInput(Framed), + /// The substream is being closed. + Closing(Framed), + /// An error occurred during processing. + Poisoned, +} + impl ProtocolsHandler for GraphSyncHandler { type InEvent = GraphSyncMessage; type OutEvent = GraphSyncMessage; @@ -38,40 +93,50 @@ impl ProtocolsHandler for GraphSyncHandler { type OutboundOpenInfo = GraphSyncMessage; fn listen_protocol(&self) -> SubstreamProtocol { - todo!() + self.listen_protocol.clone() } fn inject_fully_negotiated_inbound( &mut self, - _out: >::Output, + substream: >::Output, ) { - todo!() + // new inbound substream. Push to back of inbound queue + trace!("New inbound substream request"); + self.inbound_substreams + .push_back(InboundSubstreamState::WaitingInput(substream)); } fn inject_fully_negotiated_outbound( &mut self, - _substream: >::Output, + _out: >::Output, _event: Self::OutboundOpenInfo, ) { - todo!() + self.dial_negotiated -= 1; + + if self.dial_negotiated == 0 && self.dial_queue.is_empty() { + self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(TIMEOUT)); + } + + // TODO handle outbound + // self.events_out.push(out); } - fn inject_event(&mut self, _event: Self::InEvent) { - todo!() + fn inject_event(&mut self, event: Self::InEvent) { + self.send_request(event); } fn inject_dial_upgrade_error( &mut self, _: Self::OutboundOpenInfo, - _error: ProtocolsHandlerUpgrErr< - >::Error, - >, + error: ProtocolsHandlerUpgrErr, ) { - todo!() + if self.pending_error.is_none() { + self.pending_error = Some(error); + } } fn connection_keep_alive(&self) -> KeepAlive { - todo!() + self.keep_alive } #[allow(clippy::type_complexity)] diff --git a/ipld/graphsync/src/libp2p/mod.rs b/ipld/graphsync/src/libp2p/mod.rs index 4af9864030d1..6eac4f6abbbd 100644 --- a/ipld/graphsync/src/libp2p/mod.rs +++ b/ipld/graphsync/src/libp2p/mod.rs @@ -3,11 +3,9 @@ mod behaviour; mod codec; -mod error; mod handler; mod protocol; pub use self::behaviour::*; pub use self::codec::*; -pub use self::error::*; pub use self::handler::*; diff --git a/ipld/graphsync/src/libp2p/protocol.rs b/ipld/graphsync/src/libp2p/protocol.rs index 478fa552f2c5..64f9c89b475e 100644 --- a/ipld/graphsync/src/libp2p/protocol.rs +++ b/ipld/graphsync/src/libp2p/protocol.rs @@ -19,6 +19,7 @@ pub struct ProtocolConfig { protocol_id: Cow<'static, [u8]>, } +// TODO allow configuration of id with a constructor impl Default for ProtocolConfig { fn default() -> Self { Self { From b1dac811a01448e85d1ed1d1d7c8ef0d0a5a05da Mon Sep 17 00:00:00 2001 From: austinabell Date: Mon, 25 May 2020 16:17:09 -0400 Subject: [PATCH 3/4] Setup config and cleanup --- encoding/src/errors.rs | 7 +++++++ ipld/graphsync/Cargo.toml | 2 +- ipld/graphsync/src/libp2p/behaviour.rs | 16 +++++++++++++--- ipld/graphsync/src/libp2p/codec.rs | 23 +++++++++++++++++------ ipld/graphsync/src/libp2p/config.rs | 23 +++++++++++++++++++++++ ipld/graphsync/src/libp2p/handler.rs | 26 +++++++++++++++----------- ipld/graphsync/src/libp2p/mod.rs | 1 + ipld/graphsync/src/libp2p/protocol.rs | 20 +++++++++++++++----- 8 files changed, 92 insertions(+), 26 deletions(-) create mode 100644 ipld/graphsync/src/libp2p/config.rs diff --git a/encoding/src/errors.rs b/encoding/src/errors.rs index 2716959009aa..275bd07ab2d8 100644 --- a/encoding/src/errors.rs +++ b/encoding/src/errors.rs @@ -4,6 +4,7 @@ use cid::Error as CidError; use serde_cbor::error::Error as CborError; use std::fmt; +use std::io; use thiserror::Error; /// Error type for encoding and decoding data through any Forest supported protocol @@ -56,6 +57,12 @@ impl From for Error { } } +impl From for io::Error { + fn from(err: Error) -> io::Error { + io::Error::new(io::ErrorKind::Other, err) + } +} + /// CodecProtocol defines the protocol in which the data is encoded or decoded /// /// This is used with the encoding errors, to detail the encoding protocol or any other diff --git a/ipld/graphsync/Cargo.toml b/ipld/graphsync/Cargo.toml index b254d077eba8..d97b9ce25806 100644 --- a/ipld/graphsync/Cargo.toml +++ b/ipld/graphsync/Cargo.toml @@ -16,7 +16,7 @@ futures-util = "0.3.1" futures_codec = "0.4.0" log = "0.4.8" bytes = "0.5.2" -unsigned-varint = { version = "0.3.0", features = ["futures-codec"] } +unsigned-varint = { version = "0.4", features = ["futures-codec"] } smallvec = "1.1.0" [build-dependencies] diff --git a/ipld/graphsync/src/libp2p/behaviour.rs b/ipld/graphsync/src/libp2p/behaviour.rs index 981406e54f47..36e934893315 100644 --- a/ipld/graphsync/src/libp2p/behaviour.rs +++ b/ipld/graphsync/src/libp2p/behaviour.rs @@ -1,6 +1,7 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use super::config::GraphSyncConfig; use super::handler::GraphSyncHandler; use crate::{Extensions, GraphSyncMessage}; use cid::Cid; @@ -19,6 +20,9 @@ use std::collections::{HashSet, VecDeque}; /// The GraphSync behaviour that gets consumed by the Swarm. #[derive(Default)] pub struct GraphSync { + /// Config options for the service + config: GraphSyncConfig, + /// Queue of events to processed. events: VecDeque>, @@ -28,8 +32,11 @@ pub struct GraphSync { impl GraphSync { /// Creates a new GraphSync behaviour - pub fn new() -> Self { - GraphSync::default() + pub fn new(config: GraphSyncConfig) -> Self { + Self { + config, + ..Default::default() + } } /// Initiates GraphSync request to peer given root and selector. @@ -57,7 +64,10 @@ impl NetworkBehaviour for GraphSync { type OutEvent = (); fn new_handler(&mut self) -> Self::ProtocolsHandler { - GraphSyncHandler::default() + GraphSyncHandler::new( + self.config.protocol_id.clone(), + self.config.max_transmit_size, + ) } fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { diff --git a/ipld/graphsync/src/libp2p/codec.rs b/ipld/graphsync/src/libp2p/codec.rs index 1aa87f7c792b..e74c916ce060 100644 --- a/ipld/graphsync/src/libp2p/codec.rs +++ b/ipld/graphsync/src/libp2p/codec.rs @@ -1,9 +1,11 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::GraphSyncMessage; -use bytes::BytesMut; +use crate::{proto, GraphSyncMessage}; +use bytes::{Bytes, BytesMut}; use futures_codec::{Decoder, Encoder}; +use protobuf::{parse_from_bytes, Message}; +use std::convert::TryFrom; use std::io; use unsigned_varint::codec; @@ -17,8 +19,11 @@ impl Encoder for GraphSyncCodec { type Error = io::Error; type Item = GraphSyncMessage; - fn encode(&mut self, _item: Self::Item, _dst: &mut BytesMut) -> Result<(), Self::Error> { - todo!() + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + let proto_msg = proto::Message::try_from(item).unwrap(); + let buf: Vec = proto_msg.write_to_bytes()?; + + self.length_codec.encode(Bytes::from(buf), dst) } } @@ -26,7 +31,13 @@ impl Decoder for GraphSyncCodec { type Error = io::Error; type Item = GraphSyncMessage; - fn decode(&mut self, _bz: &mut BytesMut) -> Result, Self::Error> { - todo!() + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let packet = match self.length_codec.decode(src)? { + Some(p) => p, + None => return Ok(None), + }; + + let decoded_packet = parse_from_bytes::(&packet)?; + Ok(Some(GraphSyncMessage::try_from(decoded_packet)?)) } } diff --git a/ipld/graphsync/src/libp2p/config.rs b/ipld/graphsync/src/libp2p/config.rs new file mode 100644 index 000000000000..ca31688f3a53 --- /dev/null +++ b/ipld/graphsync/src/libp2p/config.rs @@ -0,0 +1,23 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use std::borrow::Cow; + +/// Configuration parameters for the GraphSync protocol. +#[derive(Clone)] +pub struct GraphSyncConfig { + /// The protocol id to negotiate this protocol (default is `/ipfs/graphsync/1.0.0`). + pub protocol_id: Cow<'static, [u8]>, + + /// The maximum byte size for messages sent over the network. + pub max_transmit_size: usize, +} + +impl Default for GraphSyncConfig { + fn default() -> Self { + Self { + protocol_id: Cow::Borrowed(b"/ipfs/graphsync/1.0.0"), + max_transmit_size: 2048, + } + } +} diff --git a/ipld/graphsync/src/libp2p/handler.rs b/ipld/graphsync/src/libp2p/handler.rs index f86df3a22fea..ffb13c914e5d 100644 --- a/ipld/graphsync/src/libp2p/handler.rs +++ b/ipld/graphsync/src/libp2p/handler.rs @@ -12,6 +12,7 @@ use libp2p::swarm::{ use libp2p::{InboundUpgrade, OutboundUpgrade}; use log::trace; use smallvec::SmallVec; +use std::borrow::Cow; use std::collections::VecDeque; use std::io; use std::task::{Context, Poll}; @@ -46,15 +47,10 @@ pub struct GraphSyncHandler { impl GraphSyncHandler { /// Constructor for new RPC handler - pub fn new() -> Self { - GraphSyncHandler { - listen_protocol: SubstreamProtocol::new(ProtocolConfig::default()), - inbound_substreams: Default::default(), - dial_queue: Default::default(), - dial_negotiated: Default::default(), - _max_dial_negotiated: 8, - keep_alive: KeepAlive::Yes, - pending_error: None, + pub fn new(id: impl Into>, max_transmit_size: usize) -> Self { + Self { + listen_protocol: SubstreamProtocol::new(ProtocolConfig::new(id, max_transmit_size)), + ..Default::default() } } @@ -68,7 +64,15 @@ impl GraphSyncHandler { impl Default for GraphSyncHandler { fn default() -> Self { - GraphSyncHandler::new() + Self { + listen_protocol: SubstreamProtocol::new(ProtocolConfig::default()), + inbound_substreams: Default::default(), + dial_queue: Default::default(), + dial_negotiated: 0, + _max_dial_negotiated: 8, + keep_alive: KeepAlive::Yes, + pending_error: None, + } } } @@ -117,7 +121,7 @@ impl ProtocolsHandler for GraphSyncHandler { self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(TIMEOUT)); } - // TODO handle outbound + // TODO handle outbound when events are emitted from service // self.events_out.push(out); } diff --git a/ipld/graphsync/src/libp2p/mod.rs b/ipld/graphsync/src/libp2p/mod.rs index 6eac4f6abbbd..7dcd6012de1a 100644 --- a/ipld/graphsync/src/libp2p/mod.rs +++ b/ipld/graphsync/src/libp2p/mod.rs @@ -3,6 +3,7 @@ mod behaviour; mod codec; +mod config; mod handler; mod protocol; diff --git a/ipld/graphsync/src/libp2p/protocol.rs b/ipld/graphsync/src/libp2p/protocol.rs index 64f9c89b475e..bffbf6854030 100644 --- a/ipld/graphsync/src/libp2p/protocol.rs +++ b/ipld/graphsync/src/libp2p/protocol.rs @@ -17,13 +17,23 @@ use unsigned_varint::codec; #[derive(Debug, Clone)] pub struct ProtocolConfig { protocol_id: Cow<'static, [u8]>, + max_transmit_size: usize, } -// TODO allow configuration of id with a constructor impl Default for ProtocolConfig { fn default() -> Self { Self { protocol_id: Cow::Borrowed(b"/ipfs/graphsync/1.0.0"), + max_transmit_size: 2048, + } + } +} + +impl ProtocolConfig { + pub fn new(id: impl Into>, max_transmit_size: usize) -> Self { + Self { + protocol_id: id.into(), + max_transmit_size, } } } @@ -47,8 +57,8 @@ where type Future = Pin> + Send>>; fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { - let length_codec = codec::UviBytes::default(); - // length_codec.set_max_len(self.max_transmit_size); + let mut length_codec = codec::UviBytes::default(); + length_codec.set_max_len(self.max_transmit_size); Box::pin(future::ok(Framed::new( socket, GraphSyncCodec { length_codec }, @@ -66,8 +76,8 @@ where type Future = Pin> + Send>>; fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { - let length_codec = codec::UviBytes::default(); - // length_codec.set_max_len(self.max_transmit_size); + let mut length_codec = codec::UviBytes::default(); + length_codec.set_max_len(self.max_transmit_size); Box::pin(future::ok(Framed::new( socket, GraphSyncCodec { length_codec }, From 3238af59ec555063bbe4508e73bc0dfdfb248e9c Mon Sep 17 00:00:00 2001 From: austinabell Date: Tue, 26 May 2020 15:15:59 -0400 Subject: [PATCH 4/4] remove commented out code --- ipld/graphsync/src/libp2p/behaviour.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/ipld/graphsync/src/libp2p/behaviour.rs b/ipld/graphsync/src/libp2p/behaviour.rs index 36e934893315..21336c4dc894 100644 --- a/ipld/graphsync/src/libp2p/behaviour.rs +++ b/ipld/graphsync/src/libp2p/behaviour.rs @@ -48,13 +48,6 @@ impl GraphSync { _extensions: Extensions, ) { todo!() - // self.events - // .push_back(NetworkBehaviourAction::NotifyHandler { - // peer_id, - // // TODO once request manager logic built out - // event: todo!(), - // handler: NotifyHandler::Any, - // }); } }