From 5b9c57b1ac3ee348700c739724f43917f46a829b Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 13 Dec 2017 15:04:58 +0100 Subject: [PATCH] Minimal collation work-flow and necessary traits (#25) * collator crate skeleton and description * parachain primitives: proof -> witness and egress format * collation of ingress queues through trait * add ingress collation test * structure for collated ingress * add collated ingress to proposal * witness -> proof * ingress collation and candidate creation + code cleanup * update collator lib to new definitions * address formatting grumble --- Cargo.lock | 8 ++ Cargo.toml | 1 + collator/Cargo.toml | 9 ++ collator/src/lib.rs | 218 ++++++++++++++++++++++++++++++++++++ primitives/src/block.rs | 24 ++++ primitives/src/parachain.rs | 9 +- 6 files changed, 268 insertions(+), 1 deletion(-) create mode 100644 collator/Cargo.toml create mode 100644 collator/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 2956ffe63d1d9..50708f94c0959 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -635,6 +635,14 @@ dependencies = [ "polkadot-state-machine 0.1.0", ] +[[package]] +name = "polkadot-collator" +version = "0.1.0" +dependencies = [ + "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "polkadot-primitives 0.1.0", +] + [[package]] name = "polkadot-contracts" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 84e987c436ece..1bc165bd8a744 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ polkadot-cli = { path = "cli", version = "0.1" } [workspace] members = [ "client", + "collator", "contracts", "primitives", "rpc", diff --git a/collator/Cargo.toml b/collator/Cargo.toml new file mode 100644 index 0000000000000..71e2606f678d3 --- /dev/null +++ b/collator/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "polkadot-collator" +version = "0.1.0" +authors = ["Parity Technologies "] +description = "Abstract collation logic" + +[dependencies] +polkadot-primitives = { path = "../primitives", version = "0.1" } +futures = "0.1.17" diff --git a/collator/src/lib.rs b/collator/src/lib.rs new file mode 100644 index 0000000000000..1752ecdb7eab5 --- /dev/null +++ b/collator/src/lib.rs @@ -0,0 +1,218 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Collation Logic. +//! +//! A collator node lives on a distinct parachain and submits a proposal for +//! a state transition, along with a proof for its validity +//! (what we might call a witness or block data). +//! +//! One of collators' other roles is to route messages between chains. +//! Each parachain produces a list of "egress" posts of messages for each other +//! parachain on each block, for a total of N^2 lists all together. +//! +//! We will refer to the egress list at relay chain block X of parachain A with +//! destination B as egress(X)[A -> B] +//! +//! On every block, each parachain will be intended to route messages from some +//! subset of all the other parachains. +//! +//! Since the egress information is unique to every block, when routing from a +//! parachain a collator must gather all egress posts from that parachain +//! up to the last point in history that messages were successfully routed +//! from that parachain, accounting for relay chain blocks where no candidate +//! from the collator's parachain was produced. +//! +//! In the case that all parachains route to each other and a candidate for the +//! collator's parachain was included in the last relay chain block, the collator +//! only has to gather egress posts from other parachains one block back in relay +//! chain history. +//! +//! This crate defines traits which provide context necessary for collation logic +//! to be performed, as the collation logic itself. + +extern crate futures; +extern crate polkadot_primitives as primitives; + +use std::collections::{BTreeSet, BTreeMap}; + +use futures::{stream, Stream, Future, IntoFuture}; +use primitives::parachain::{self, ConsolidatedIngress, Message, Id as ParaId}; + +/// Parachain context needed for collation. +/// +/// This can be implemented through an externally attached service or a stub. +pub trait ParachainContext { + /// Produce a candidate, given the latest ingress queue information. + fn produce_candidate>( + &self, + ingress: I, + ) -> (parachain::BlockData, primitives::Signature); +} + +/// Relay chain context needed to collate. +/// This encapsulates a network and local database which may store +/// some of the input. +pub trait RelayChainContext { + type Error; + + /// Future that resolves to the un-routed egress queues of a parachain. + /// The first item is the oldest. + type FutureEgress: IntoFuture>, Error=Self::Error>; + + /// Provide a set of all parachains meant to be routed to at a block. + fn routing_parachains(&self) -> BTreeSet; + + /// Get un-routed egress queues from a parachain to the local parachain. + fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress; +} + +/// Collate the necessary ingress queue using the given context. +// TODO: impl trait +pub fn collate_ingress<'a, R>(relay_context: R) + -> Box + 'a> + where + R: RelayChainContext, + R::Error: 'a, + R::FutureEgress: 'a, +{ + let mut egress_fetch = Vec::new(); + + for routing_parachain in relay_context.routing_parachains() { + let fetch = relay_context + .unrouted_egress(routing_parachain) + .into_future() + .map(move |egresses| (routing_parachain, egresses)); + + egress_fetch.push(fetch); + } + + // create a map ordered first by the depth of the egress queue + // and then by the parachain ID. + // + // then transform that into the consolidated egress queue. + let future = stream::futures_unordered(egress_fetch) + .fold(BTreeMap::new(), |mut map, (routing_id, egresses)| { + for (depth, egress) in egresses.into_iter().rev().enumerate() { + let depth = -(depth as i64); + map.insert((depth, routing_id), egress); + } + + Ok(map) + }) + .map(|ordered| ordered.into_iter().map(|((_, id), egress)| (id, egress))) + .map(|i| i.collect::>()) + .map(ConsolidatedIngress); + + Box::new(future) +} + +/// Produce a candidate for the parachain. +pub fn collate<'a, R, P>(local_id: ParaId, relay_context: R, para_context: P) + -> Box + 'a> + where + R: RelayChainContext, + R::Error: 'a, + R::FutureEgress: 'a, + P: ParachainContext + 'a, +{ + Box::new(collate_ingress(relay_context).map(move |ingress| { + let (block_data, signature) = para_context.produce_candidate( + ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg))) + ); + + parachain::Candidate { + parachain_index: local_id, + collator_signature: signature, + block: block_data, + unprocessed_ingress: ingress, + } + })) +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::{HashMap, BTreeSet}; + + use futures::Future; + use primitives::parachain::{Message, Id as ParaId}; + + pub struct DummyRelayChainCtx { + egresses: HashMap>>, + currently_routing: BTreeSet, + } + + impl RelayChainContext for DummyRelayChainCtx { + type Error = (); + type FutureEgress = Result>, ()>; + + fn routing_parachains(&self) -> BTreeSet { + self.currently_routing.clone() + } + + fn unrouted_egress(&self, id: ParaId) -> Result>, ()> { + Ok(self.egresses.get(&id).cloned().unwrap_or_default()) + } + } + + #[test] + fn collates_ingress() { + let route_from = |x: &[ParaId]| { + let mut set = BTreeSet::new(); + set.extend(x.iter().cloned()); + set + }; + + let message = |x: Vec| vec![Message(x)]; + + let dummy_ctx = DummyRelayChainCtx { + currently_routing: route_from(&[2.into(), 3.into()]), + egresses: vec![ + // egresses for `2`: last routed successfully 5 blocks ago. + (2.into(), vec![ + message(vec![1, 2, 3]), + message(vec![4, 5, 6]), + message(vec![7, 8]), + message(vec![10]), + message(vec![12]), + ]), + + // egresses for `3`: last routed successfully 3 blocks ago. + (3.into(), vec![ + message(vec![9]), + message(vec![11]), + message(vec![13]), + ]), + ].into_iter().collect(), + }; + + assert_eq!( + collate_ingress(dummy_ctx).wait().unwrap(), + ConsolidatedIngress(vec![ + (2.into(), message(vec![1, 2, 3])), + (2.into(), message(vec![4, 5, 6])), + (2.into(), message(vec![7, 8])), + (3.into(), message(vec![9])), + (2.into(), message(vec![10])), + (3.into(), message(vec![11])), + (2.into(), message(vec![12])), + (3.into(), message(vec![13])), + ] + )) + } +} diff --git a/primitives/src/block.rs b/primitives/src/block.rs index 40576352d677d..740b0c4cdada3 100644 --- a/primitives/src/block.rs +++ b/primitives/src/block.rs @@ -81,4 +81,28 @@ mod tests { ] }"#); } + + #[test] + fn test_body_serialization() { + assert_eq!(ser::to_string_pretty(&Body { + candidates: vec![ + parachain::Candidate { + parachain_index: 10.into(), + collator_signature: Default::default(), + unprocessed_ingress: Default::default(), + block: ::parachain::BlockData(vec![1, 3, 5, 8]), + } + ], + }), r#"{ + "candidates": [ + { + "parachainIndex": 10, + "collatorSignature": "0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "unprocessedIngress": [], + "block": "0x01030508" + } + ] +}"#); + } + } diff --git a/primitives/src/parachain.rs b/primitives/src/parachain.rs index 7e931bed84342..1249319aa8f4b 100644 --- a/primitives/src/parachain.rs +++ b/primitives/src/parachain.rs @@ -44,7 +44,7 @@ pub struct Candidate { /// Unprocessed ingress queue. /// /// Ordered by parachain ID and block number. - pub unprocessed_ingress: Vec<(u64, Vec)>, + pub unprocessed_ingress: ConsolidatedIngress, /// Block data pub block: BlockData, } @@ -53,6 +53,13 @@ pub struct Candidate { #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct Message(#[serde(with="bytes")] pub Vec); +/// Consolidated ingress queue data. +/// +/// This is just an ordered vector of other parachains' egress queues, +/// obtained according to the routing rules. +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub struct ConsolidatedIngress(pub Vec<(Id, Vec)>); + /// Parachain block data. /// /// contains everything required to validate para-block, may contain block and witness data