Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A bare-bones GraphSync ResponseManager #511

Merged
merged 4 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ipld/graphsync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ unsigned-varint = { version = "0.4", features = ["futures-codec"] }
smallvec = "1.1.0"
async-trait = "0.1"
serde = { version = "1.0", features = ["derive"] }
ipld_blockstore = { path = "../blockstore" }

[build-dependencies]
protoc-rust = "2.14.0"
Expand Down
1 change: 0 additions & 1 deletion ipld/graphsync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ mod response_manager;
mod test_utils;

pub use self::message::*;
pub use response_manager::PeerResponseSender;

use cid::Cid;
use serde::{Deserialize, Serialize};
Expand Down
28 changes: 15 additions & 13 deletions ipld/graphsync/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ use forest_ipld::selector::Selector;
use std::collections::HashMap;
use std::convert::TryFrom;

/// The data associated with a new graphsync request.
#[derive(Debug, PartialEq, Clone)]
pub struct NewRequestPayload {
pub root: Cid,
pub selector: Selector,
pub priority: Priority,
pub extensions: Extensions,
}

/// Defines the data associated with each request type.
#[derive(Debug, PartialEq, Clone)]
pub enum Payload {
New {
root: Cid,
selector: Selector,
priority: Priority,
extensions: Extensions,
},
Update {
extensions: Extensions,
},
New(NewRequestPayload),
Update { extensions: Extensions },
Cancel,
}

Expand All @@ -43,12 +45,12 @@ impl GraphSyncRequest {
) -> Self {
Self {
id,
payload: Payload::New {
payload: Payload::New(NewRequestPayload {
root,
selector,
priority,
extensions: extensions.unwrap_or_default(),
},
}),
}
}
/// Generate a GraphSyncRequest to update an in progress request with extensions.
Expand Down Expand Up @@ -134,12 +136,12 @@ impl TryFrom<GraphSyncMessage> for proto::Message {
.requests
.into_iter()
.map(|(_, req)| match req.payload {
Payload::New {
Payload::New(NewRequestPayload {
root,
selector,
priority,
extensions,
} => Ok(proto::Message_Request {
}) => Ok(proto::Message_Request {
id: req.id,
// Cid bytes format (not cbor encoded)
root: root.to_bytes(),
Expand Down
166 changes: 163 additions & 3 deletions ipld/graphsync/src/response_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,173 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

#![allow(dead_code)]

mod link_tracker;
mod peer_response_sender;
mod response_builder;

pub use peer_response_sender::PeerResponseSender;

use link_tracker::LinkTracker;
use peer_response_sender::{PeerMessageHandler, PeerResponseSender};
use response_builder::ResponseBuilder;

// TODO: implement the `PeerResponseManager` type
use super::{
Extensions, GraphSyncRequest, NewRequestPayload, Payload, RequestID, ResponseStatusCode,
};
use async_trait::async_trait;
use cid::Cid;
use forest_ipld::{selector::LinkResolver, Ipld};
use ipld_blockstore::BlockStore;
use libp2p::core::PeerId;
use std::{collections::HashMap, sync::Arc};

/// Handles incoming graphsync requests from the network, initiates selector traversals, and transmits responses.
pub struct ResponseManager {
peer_response_senders: HashMap<PeerId, PeerResponseSender>,
}

impl ResponseManager {
/// Creates a new response manager.
pub fn new() -> Self {
Self {
peer_response_senders: HashMap::new(),
}
}

/// Returns the response sender associated with the given peer.
fn sender_for_peer(&mut self, peer: PeerId) -> &mut PeerResponseSender {
self.peer_response_senders
.entry(peer.clone())
.or_insert_with(|| PeerResponseSender::new(peer))
}

/// Executes the given request.
pub async fn execute_request<L, H>(
&mut self,
peer: PeerId,
request: GraphSyncRequest,
loader: L,
handler: &mut H,
) -> Result<(), String>
where
L: LinkResolver + Send + Sync,
H: PeerMessageHandler,
{
match request.payload {
Payload::New(payload) => {
self.new_request(peer, request.id, payload, loader, handler)
.await
}
Payload::Update { extensions } => self.update_request(request.id, extensions).await,
Payload::Cancel => self.cancel_request(request.id).await,
}
}

/// Executes a new request.
async fn new_request<L, H>(
&mut self,
peer_id: PeerId,
request_id: RequestID,
payload: NewRequestPayload,
mut loader: L,
handler: &mut H,
) -> Result<(), String>
where
L: LinkResolver + Send + Sync,
H: PeerMessageHandler,
{
// TODO: look for the do-not-send-cids extension
let NewRequestPayload { root, selector, .. } = payload;
let sender = self.sender_for_peer(peer_id);

let ipld: Ipld = match loader.load_link(&root).await? {
Some(ipld) => ipld,
None => {
sender.finish_request_with_error(
request_id,
ResponseStatusCode::RequestFailedContentNotFound,
);
return sender.flush(handler).await;
}
};

let intercepted = InterceptedLoader::new(loader, |cid, block| {
let data = block
.map(|ipld| forest_encoding::to_vec(ipld))
.transpose()
.map_err(|e| e.to_string())?;
sender.send_response(request_id, cid.clone(), data);
Ok(())
});

// we ignore the callback parameters because we're only interested in the
// loaded blocks, which the intercepted loader takes care of
selector
.walk_all(&ipld, Some(intercepted), |_, _, _| Ok(()))
.await
.map_err(|e| e.to_string())?;
sender.flush(handler).await
}

/// Updates an ongoing request.
async fn update_request(
&mut self,
_id: RequestID,
_extensions: Extensions,
) -> Result<(), String> {
// we can't implement this yet because requests are currently executed in one go
todo!()
}

/// Cancels an ongoing request.
async fn cancel_request(&mut self, _id: RequestID) -> Result<(), String> {
// we can't implement this yet because requests are currently executed in one go
todo!()
}
}

/// A block loader that wraps another loader and calls a callback whenever
/// a link is loaded with the cid and the corresponding block.
struct InterceptedLoader<L, F> {
loader: L,
f: F,
}

impl<L, F> InterceptedLoader<L, F>
where
L: LinkResolver + Send + Sync,
F: FnMut(&Cid, Option<&Ipld>) -> Result<(), String> + Send + Sync,
{
fn new(loader: L, f: F) -> Self {
Self { loader, f }
}
}

#[async_trait]
impl<L, F> LinkResolver for InterceptedLoader<L, F>
where
L: LinkResolver + Send + Sync,
F: FnMut(&Cid, Option<&Ipld>) -> Result<(), String> + Send + Sync,
{
async fn load_link(&mut self, link: &Cid) -> Result<Option<Ipld>, String> {
let ipld = self.loader.load_link(link).await?;
(self.f)(link, ipld.as_ref())?;
Ok(ipld)
}
}

/// A block loader that loads the blocks from a blockstore.
// TODO: put this type somewhere else, graphsync doesn't need to know about blockstores
struct BlockStoreLoader<BS> {
blockstore: Arc<BS>,
}

#[async_trait]
impl<BS> LinkResolver for BlockStoreLoader<BS>
where
BS: BlockStore + Send + Sync,
{
async fn load_link(&mut self, link: &Cid) -> Result<Option<Ipld>, String> {
self.blockstore.get(link).map_err(|e| e.to_string())
}
}
1 change: 0 additions & 1 deletion ipld/graphsync/src/response_manager/response_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ impl ResponseBuilder {
}

/// Returns true if there is no content to send.
#[allow(unused)]
pub fn is_empty(&self) -> bool {
self.blocks.is_empty() && self.outgoing_responses.is_empty()
}
Expand Down
6 changes: 3 additions & 3 deletions ipld/src/selector/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ pub enum VisitReason {
#[async_trait]
pub trait LinkResolver {
/// Resolves a Cid link into it's respective Ipld node, if it exists.
async fn load_link(&self, link: &Cid) -> Result<Option<Ipld>, String>;
async fn load_link(&mut self, link: &Cid) -> Result<Option<Ipld>, String>;
}

#[async_trait]
impl LinkResolver for () {
#[allow(unused_variables, clippy::trivially_copy_pass_by_ref)]
async fn load_link(&self, link: &Cid) -> Result<Option<Ipld>, String> {
async fn load_link(&mut self, link: &Cid) -> Result<Option<Ipld>, String> {
Err("load_link not implemented on the LinkResolver for default implementation".into())
}
}
Expand Down Expand Up @@ -115,7 +115,7 @@ where
{
// Resolve any links transparently before traversing
if let Ipld::Link(cid) = ipld {
if let Some(resolver) = &self.resolver {
if let Some(resolver) = &mut self.resolver {
self.last_block = Some(LastBlockInfo {
path: self.path.clone(),
link: cid.clone(),
Expand Down
2 changes: 1 addition & 1 deletion ipld/tests/walk_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ struct TestLinkResolver(MemoryDB);

#[async_trait]
impl LinkResolver for TestLinkResolver {
async fn load_link(&self, link: &Cid) -> Result<Option<Ipld>, String> {
async fn load_link(&mut self, link: &Cid) -> Result<Option<Ipld>, String> {
self.0.get(link).map_err(|e| e.to_string())
}
}
Expand Down