From 581e49ccb23ace6d3fca971b3a6c8bfa12330a31 Mon Sep 17 00:00:00 2001 From: Tim Vermeulen Date: Thu, 28 May 2020 18:40:45 +0200 Subject: [PATCH 1/3] Implement a bare-bones ResponseManager --- ipld/graphsync/Cargo.toml | 1 + ipld/graphsync/src/lib.rs | 1 - ipld/graphsync/src/message/mod.rs | 28 +-- ipld/graphsync/src/response_manager/mod.rs | 168 +++++++++++++++++- .../src/response_manager/response_builder.rs | 1 - ipld/src/selector/walk.rs | 6 +- ipld/tests/walk_tests.rs | 2 +- 7 files changed, 183 insertions(+), 24 deletions(-) diff --git a/ipld/graphsync/Cargo.toml b/ipld/graphsync/Cargo.toml index 02a21275fda3..42a1c1099d75 100644 --- a/ipld/graphsync/Cargo.toml +++ b/ipld/graphsync/Cargo.toml @@ -20,6 +20,7 @@ unsigned-varint = { version = "0.4", features = ["futures-codec"] } smallvec = "1.1.0" async-trait = "0.1" serde = { version = "1.0", features = ["derive"] } +ipld_blockstore = { path = "../blockstore" } [build-dependencies] protoc-rust = "2.14.0" diff --git a/ipld/graphsync/src/lib.rs b/ipld/graphsync/src/lib.rs index e6462980f611..0aae3556a713 100644 --- a/ipld/graphsync/src/lib.rs +++ b/ipld/graphsync/src/lib.rs @@ -10,7 +10,6 @@ mod response_manager; mod test_utils; pub use self::message::*; -pub use response_manager::PeerResponseSender; use cid::Cid; use serde::{Deserialize, Serialize}; diff --git a/ipld/graphsync/src/message/mod.rs b/ipld/graphsync/src/message/mod.rs index e498d8ab6fc0..3b5f9ed1f0d4 100644 --- a/ipld/graphsync/src/message/mod.rs +++ b/ipld/graphsync/src/message/mod.rs @@ -11,18 +11,20 @@ use forest_ipld::selector::Selector; use std::collections::HashMap; use std::convert::TryFrom; +/// The data associated with a new graphsync request. +#[derive(Debug, PartialEq, Clone)] +pub struct NewRequestPayload { + pub root: Cid, + pub selector: Selector, + pub priority: Priority, + pub extensions: Extensions, +} + /// Defines the data associated with each request type. #[derive(Debug, PartialEq, Clone)] pub enum Payload { - New { - root: Cid, - selector: Selector, - priority: Priority, - extensions: Extensions, - }, - Update { - extensions: Extensions, - }, + New(NewRequestPayload), + Update { extensions: Extensions }, Cancel, } @@ -43,12 +45,12 @@ impl GraphSyncRequest { ) -> Self { Self { id, - payload: Payload::New { + payload: Payload::New(NewRequestPayload { root, selector, priority, extensions: extensions.unwrap_or_default(), - }, + }), } } /// Generate a GraphSyncRequest to update an in progress request with extensions. @@ -134,12 +136,12 @@ impl TryFrom for proto::Message { .requests .into_iter() .map(|(_, req)| match req.payload { - Payload::New { + Payload::New(NewRequestPayload { root, selector, priority, extensions, - } => Ok(proto::Message_Request { + }) => Ok(proto::Message_Request { id: req.id, // Cid bytes format (not cbor encoded) root: root.to_bytes(), diff --git a/ipld/graphsync/src/response_manager/mod.rs b/ipld/graphsync/src/response_manager/mod.rs index b50cbfc1d717..5dc07115344d 100644 --- a/ipld/graphsync/src/response_manager/mod.rs +++ b/ipld/graphsync/src/response_manager/mod.rs @@ -1,13 +1,171 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT +#![allow(dead_code)] mod link_tracker; mod peer_response_sender; mod response_builder; -pub use peer_response_sender::PeerResponseSender; - use link_tracker::LinkTracker; +use peer_response_sender::{PeerMessageHandler, PeerResponseSender}; use response_builder::ResponseBuilder; -// TODO: implement the `PeerResponseManager` type +use super::{ + Extensions, GraphSyncRequest, NewRequestPayload, Payload, RequestID, ResponseStatusCode, +}; +use async_trait::async_trait; +use cid::Cid; +use forest_encoding; +use forest_ipld::{selector::LinkResolver, Ipld}; +use ipld_blockstore::BlockStore; +use libp2p::core::PeerId; +use std::{collections::HashMap, sync::Arc}; + +/// Handles incoming graphsync requests from the network, initiates selector traversals, and transmits responses. +pub struct ResponseManager { + peer_response_senders: HashMap, +} + +impl ResponseManager { + /// Creates a new response manager. + pub fn new() -> Self { + Self { + peer_response_senders: HashMap::new(), + } + } + + /// Returns the response sender associated with the given peer. + fn sender_for_peer(&mut self, peer: PeerId) -> &mut PeerResponseSender { + self.peer_response_senders + .entry(peer.clone()) + .or_insert_with(|| PeerResponseSender::new(peer)) + } + + /// Executes the given request. + pub async fn execute_request( + &mut self, + peer: PeerId, + request: GraphSyncRequest, + loader: L, + handler: &mut H, + ) -> Result<(), String> + where + L: LinkResolver + Send + Sync, + H: PeerMessageHandler, + { + match request.payload { + Payload::New(payload) => { + self.new_request(peer, request.id, payload, loader, handler) + .await + } + Payload::Update { extensions } => self.update_request(request.id, extensions).await, + Payload::Cancel => self.cancel_request(request.id).await, + } + } + + /// Executes a new request. + async fn new_request( + &mut self, + peer_id: PeerId, + request_id: RequestID, + payload: NewRequestPayload, + mut loader: L, + handler: &mut H, + ) -> Result<(), String> + where + L: LinkResolver + Send + Sync, + H: PeerMessageHandler, + { + // TODO: look for the do-not-send-cids extension + let NewRequestPayload { root, selector, .. } = payload; + let sender = self.sender_for_peer(peer_id); + + let ipld: Ipld = match loader.load_link(&root).await? { + Some(ipld) => ipld, + None => { + sender.finish_request_with_error( + request_id, + ResponseStatusCode::RequestFailedContentNotFound, + ); + return sender.flush(handler).await; + } + }; + + let intercepted = InterceptedLoader::new(loader, |cid, block| { + let data = block + .map(|ipld| forest_encoding::to_vec(ipld)) + .transpose() + .map_err(|e| e.to_string())?; + sender.send_response(request_id, cid.clone(), data); + Ok(()) + }); + + // we ignore the callback parameters because we're only interested in the + // loaded blocks, which the intercepted loader takes care of + selector + .walk_all(&ipld, Some(intercepted), |_, _, _| Ok(())) + .await + .map_err(|e| e.to_string())?; + sender.flush(handler).await + } + + /// Updates an ongoing request. + async fn update_request( + &mut self, + _id: RequestID, + _extensions: Extensions, + ) -> Result<(), String> { + // we can't implement this yet because requests are currently executed in one go + todo!() + } + + /// Cancels an ongoing request. + async fn cancel_request(&mut self, _id: RequestID) -> Result<(), String> { + // we can't implement this yet because requests are currently executed in one go + todo!() + } +} + +/// A block loader that wraps another loader and calls a callback whenever +/// a link is loaded with the cid and the corresponding block. +struct InterceptedLoader { + loader: L, + f: F, +} + +impl InterceptedLoader +where + L: LinkResolver + Send + Sync, + F: FnMut(&Cid, Option<&Ipld>) -> Result<(), String> + Send + Sync, +{ + fn new(loader: L, f: F) -> Self { + Self { loader, f } + } +} + +#[async_trait] +impl LinkResolver for InterceptedLoader +where + L: LinkResolver + Send + Sync, + F: FnMut(&Cid, Option<&Ipld>) -> Result<(), String> + Send + Sync, +{ + async fn load_link(&mut self, link: &Cid) -> Result, String> { + let ipld = self.loader.load_link(link).await?; + (self.f)(link, ipld.as_ref())?; + Ok(ipld) + } +} + +/// A block loader that loads the blocks from a blockstore. +// TODO: put this type somewhere else, graphsync doesn't need to know about blockstores +struct BlockStoreLoader { + blockstore: Arc, +} + +#[async_trait] +impl LinkResolver for BlockStoreLoader +where + BS: BlockStore + Send + Sync, +{ + async fn load_link(&mut self, link: &Cid) -> Result, String> { + self.blockstore.get(link).map_err(|e| e.to_string()) + } +} diff --git a/ipld/graphsync/src/response_manager/response_builder.rs b/ipld/graphsync/src/response_manager/response_builder.rs index 67326087689c..18323ca5e7f5 100644 --- a/ipld/graphsync/src/response_manager/response_builder.rs +++ b/ipld/graphsync/src/response_manager/response_builder.rs @@ -74,7 +74,6 @@ impl ResponseBuilder { } /// Returns true if there is no content to send. - #[allow(unused)] pub fn is_empty(&self) -> bool { self.blocks.is_empty() && self.outgoing_responses.is_empty() } diff --git a/ipld/src/selector/walk.rs b/ipld/src/selector/walk.rs index 91e004d1b319..25f4d80f05cc 100644 --- a/ipld/src/selector/walk.rs +++ b/ipld/src/selector/walk.rs @@ -63,13 +63,13 @@ pub enum VisitReason { #[async_trait] pub trait LinkResolver { /// Resolves a Cid link into it's respective Ipld node, if it exists. - async fn load_link(&self, link: &Cid) -> Result, String>; + async fn load_link(&mut self, link: &Cid) -> Result, String>; } #[async_trait] impl LinkResolver for () { #[allow(unused_variables, clippy::trivially_copy_pass_by_ref)] - async fn load_link(&self, link: &Cid) -> Result, String> { + async fn load_link(&mut self, link: &Cid) -> Result, String> { Err("load_link not implemented on the LinkResolver for default implementation".into()) } } @@ -115,7 +115,7 @@ where { // Resolve any links transparently before traversing if let Ipld::Link(cid) = ipld { - if let Some(resolver) = &self.resolver { + if let Some(resolver) = &mut self.resolver { self.last_block = Some(LastBlockInfo { path: self.path.clone(), link: cid.clone(), diff --git a/ipld/tests/walk_tests.rs b/ipld/tests/walk_tests.rs index f4f7e40d362f..3d2447c4fa70 100644 --- a/ipld/tests/walk_tests.rs +++ b/ipld/tests/walk_tests.rs @@ -124,7 +124,7 @@ struct TestLinkResolver(MemoryDB); #[async_trait] impl LinkResolver for TestLinkResolver { - async fn load_link(&self, link: &Cid) -> Result, String> { + async fn load_link(&mut self, link: &Cid) -> Result, String> { self.0.get(link).map_err(|e| e.to_string()) } } From f2408c20f688dcf73615e9ef16bd6e4c2f5fb3af Mon Sep 17 00:00:00 2001 From: Tim Vermeulen Date: Mon, 22 Jun 2020 14:46:13 +0200 Subject: [PATCH 2/3] Add license --- ipld/graphsync/src/response_manager/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ipld/graphsync/src/response_manager/mod.rs b/ipld/graphsync/src/response_manager/mod.rs index 5dc07115344d..123f0fbd3a44 100644 --- a/ipld/graphsync/src/response_manager/mod.rs +++ b/ipld/graphsync/src/response_manager/mod.rs @@ -1,3 +1,6 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + #![allow(dead_code)] mod link_tracker; From 889816b9876f579183354ad16813dd1ecfd74b18 Mon Sep 17 00:00:00 2001 From: Tim Vermeulen Date: Mon, 22 Jun 2020 21:36:46 +0200 Subject: [PATCH 3/3] Remove unused import --- ipld/graphsync/src/response_manager/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/ipld/graphsync/src/response_manager/mod.rs b/ipld/graphsync/src/response_manager/mod.rs index 123f0fbd3a44..205e81e69113 100644 --- a/ipld/graphsync/src/response_manager/mod.rs +++ b/ipld/graphsync/src/response_manager/mod.rs @@ -16,7 +16,6 @@ use super::{ }; use async_trait::async_trait; use cid::Cid; -use forest_encoding; use forest_ipld::{selector::LinkResolver, Ipld}; use ipld_blockstore::BlockStore; use libp2p::core::PeerId;