Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup GraphSync network interface #442

Merged
merged 6 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions encoding/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use cid::Error as CidError;
use serde_cbor::error::Error as CborError;
use std::fmt;
use std::io;
use thiserror::Error;

/// Error type for encoding and decoding data through any Forest supported protocol
Expand Down Expand Up @@ -56,6 +57,12 @@ impl From<CidError> for Error {
}
}

impl From<Error> for io::Error {
fn from(err: Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, err)
}
}

/// CodecProtocol defines the protocol in which the data is encoded or decoded
///
/// This is used with the encoding errors, to detail the encoding protocol or any other
Expand Down
8 changes: 8 additions & 0 deletions ipld/graphsync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ cid = { package = "forest_cid", path = "../cid", version = "0.1" }
forest_ipld = { path = "../" }
fnv = "1.0.6"
forest_encoding = { path = "../../encoding", version = "0.1" }
libp2p = "0.19"
futures = "0.3.2"
futures-util = "0.3.1"
futures_codec = "0.4.0"
log = "0.4.8"
bytes = "0.5.2"
unsigned-varint = { version = "0.4", features = ["futures-codec"] }
smallvec = "1.1.0"

[build-dependencies]
protoc-rust = "2.14.0"
2 changes: 2 additions & 0 deletions ipld/graphsync/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

// TODO evaluate exporting from libp2p mod
pub mod libp2p;
mod message;

pub use self::message::*;
Expand Down
109 changes: 109 additions & 0 deletions ipld/graphsync/src/libp2p/behaviour.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::config::GraphSyncConfig;
use super::handler::GraphSyncHandler;
use crate::{Extensions, GraphSyncMessage};
use cid::Cid;
use forest_ipld::selector::Selector;
use futures::task::Context;
use futures_util::task::Poll;
use libp2p::core::connection::ConnectionId;
use libp2p::swarm::{
protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
PollParameters,
};
use libp2p::{Multiaddr, PeerId};
use log::debug;
use std::collections::{HashSet, VecDeque};

/// The GraphSync behaviour that gets consumed by the Swarm.
#[derive(Default)]
pub struct GraphSync {
/// Config options for the service
config: GraphSyncConfig,

/// Queue of events to processed.
events: VecDeque<NetworkBehaviourAction<GraphSyncMessage, ()>>,

// TODO just temporary, will probably have to attach some data with peers
peers: HashSet<PeerId>,
}

impl GraphSync {
/// Creates a new GraphSync behaviour
pub fn new(config: GraphSyncConfig) -> Self {
Self {
config,
..Default::default()
}
}

/// Initiates GraphSync request to peer given root and selector.
pub fn send_request(
&mut self,
_peer_id: PeerId,
_root: Cid,
_selector: Selector,
_extensions: Extensions,
) {
todo!()
}
}

impl NetworkBehaviour for GraphSync {
type ProtocolsHandler = GraphSyncHandler;
// TODO this will need to be updated to include data emitted from the GS responses
type OutEvent = ();

fn new_handler(&mut self) -> Self::ProtocolsHandler {
GraphSyncHandler::new(
self.config.protocol_id.clone(),
self.config.max_transmit_size,
)
}

fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}

fn inject_connected(&mut self, peer_id: &PeerId) {
debug!("New peer connected: {:?}", peer_id);
self.peers.insert(peer_id.clone());
}

fn inject_disconnected(&mut self, peer_id: &PeerId) {
debug!("Peer disconnected: {:?}", peer_id);
self.peers.remove(peer_id);
}

fn inject_event(
&mut self,
peer_id: PeerId,
_connection: ConnectionId,
event: GraphSyncMessage,
) {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event,
handler: NotifyHandler::Any,
});
}

fn poll(
&mut self,
_: &mut Context,
_: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
Poll::Pending
}
}
43 changes: 43 additions & 0 deletions ipld/graphsync/src/libp2p/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::{proto, GraphSyncMessage};
use bytes::{Bytes, BytesMut};
use futures_codec::{Decoder, Encoder};
use protobuf::{parse_from_bytes, Message};
use std::convert::TryFrom;
use std::io;
use unsigned_varint::codec;

#[allow(dead_code)]
/// Codec used for encoding and decoding protobuf messages
pub struct GraphSyncCodec {
pub(crate) length_codec: codec::UviBytes,
}

impl Encoder for GraphSyncCodec {
type Error = io::Error;
type Item = GraphSyncMessage;

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let proto_msg = proto::Message::try_from(item).unwrap();
let buf: Vec<u8> = proto_msg.write_to_bytes()?;

self.length_codec.encode(Bytes::from(buf), dst)
}
}

impl Decoder for GraphSyncCodec {
type Error = io::Error;
type Item = GraphSyncMessage;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let packet = match self.length_codec.decode(src)? {
Some(p) => p,
None => return Ok(None),
};

let decoded_packet = parse_from_bytes::<proto::Message>(&packet)?;
Ok(Some(GraphSyncMessage::try_from(decoded_packet)?))
}
}
23 changes: 23 additions & 0 deletions ipld/graphsync/src/libp2p/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::borrow::Cow;

/// Configuration parameters for the GraphSync protocol.
#[derive(Clone)]
pub struct GraphSyncConfig {
/// The protocol id to negotiate this protocol (default is `/ipfs/graphsync/1.0.0`).
pub protocol_id: Cow<'static, [u8]>,

/// The maximum byte size for messages sent over the network.
pub max_transmit_size: usize,
}

impl Default for GraphSyncConfig {
fn default() -> Self {
Self {
protocol_id: Cow::Borrowed(b"/ipfs/graphsync/1.0.0"),
max_transmit_size: 2048,
}
}
}
160 changes: 160 additions & 0 deletions ipld/graphsync/src/libp2p/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::codec::GraphSyncCodec;
use super::protocol::ProtocolConfig;
use crate::GraphSyncMessage;
use futures_codec::Framed;
use libp2p::swarm::{
KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use libp2p::{InboundUpgrade, OutboundUpgrade};
use log::trace;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::collections::VecDeque;
use std::io;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

// TODO move this to config option
const TIMEOUT: u64 = 10;

/// Handler implementation for GraphSync protocol.
pub struct GraphSyncHandler {
/// Upgrade configuration for the GraphSync protocol.
listen_protocol: SubstreamProtocol<ProtocolConfig>,

/// Map of current substreams awaiting a response to a GraphSync request.
inbound_substreams: VecDeque<InboundSubstreamState>,

/// Queue of outbound substreams to open.
dial_queue: SmallVec<[GraphSyncMessage; 4]>,

/// Current number of concurrent outbound substreams being opened.
dial_negotiated: u32,

/// Maximum number of concurrent outbound substreams being opened. Value is never modified.
_max_dial_negotiated: u32,

/// Value to return from `connection_keep_alive`.
keep_alive: KeepAlive,

/// If `Some`, something bad happened and we should shut down the handler with an error.
pending_error: Option<ProtocolsHandlerUpgrErr<io::Error>>,
}

impl GraphSyncHandler {
/// Constructor for new RPC handler
pub fn new(id: impl Into<Cow<'static, [u8]>>, max_transmit_size: usize) -> Self {
Self {
listen_protocol: SubstreamProtocol::new(ProtocolConfig::new(id, max_transmit_size)),
..Default::default()
}
}

/// Opens an outbound substream with `upgrade`.
#[inline]
fn send_request(&mut self, upgrade: GraphSyncMessage) {
self.keep_alive = KeepAlive::Yes;
self.dial_queue.push(upgrade);
}
}

impl Default for GraphSyncHandler {
fn default() -> Self {
Self {
listen_protocol: SubstreamProtocol::new(ProtocolConfig::default()),
inbound_substreams: Default::default(),
dial_queue: Default::default(),
dial_negotiated: 0,
_max_dial_negotiated: 8,
keep_alive: KeepAlive::Yes,
pending_error: None,
}
}
}

// TODO remove allow dead_code on impl
#[allow(dead_code, clippy::large_enum_variant)]
/// State of the inbound substream, opened either by us or by the remote.
enum InboundSubstreamState {
/// Waiting for a message from the remote. The idle state for an inbound substream.
WaitingInput(Framed<NegotiatedSubstream, GraphSyncCodec>),
/// The substream is being closed.
Closing(Framed<NegotiatedSubstream, GraphSyncCodec>),
/// An error occurred during processing.
Poisoned,
}

impl ProtocolsHandler for GraphSyncHandler {
type InEvent = GraphSyncMessage;
type OutEvent = GraphSyncMessage;
type Error = io::Error;
type InboundProtocol = ProtocolConfig;
type OutboundProtocol = ProtocolConfig;
type OutboundOpenInfo = GraphSyncMessage;

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
self.listen_protocol.clone()
}

fn inject_fully_negotiated_inbound(
&mut self,
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
) {
// new inbound substream. Push to back of inbound queue
trace!("New inbound substream request");
self.inbound_substreams
.push_back(InboundSubstreamState::WaitingInput(substream));
}

fn inject_fully_negotiated_outbound(
&mut self,
_out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_event: Self::OutboundOpenInfo,
) {
self.dial_negotiated -= 1;

if self.dial_negotiated == 0 && self.dial_queue.is_empty() {
self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(TIMEOUT));
}

// TODO handle outbound when events are emitted from service
// self.events_out.push(out);
}

fn inject_event(&mut self, event: Self::InEvent) {
self.send_request(event);
}

fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr<io::Error>,
) {
if self.pending_error.is_none() {
self.pending_error = Some(error);
}
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}

#[allow(clippy::type_complexity)]
fn poll(
&mut self,
_cx: &mut Context,
) -> Poll<
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
todo!()
}
}
12 changes: 12 additions & 0 deletions ipld/graphsync/src/libp2p/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

mod behaviour;
mod codec;
mod config;
mod handler;
mod protocol;

pub use self::behaviour::*;
pub use self::codec::*;
pub use self::handler::*;
Loading