diff --git a/engine/src/multisig/client/ceremony_manager.rs b/engine/src/multisig/client/ceremony_manager.rs index 4e3a3242059..c8c11646c0f 100644 --- a/engine/src/multisig/client/ceremony_manager.rs +++ b/engine/src/multisig/client/ceremony_manager.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use crate::common::format_iterator; use crate::multisig::client::{self, MultisigOutcome}; +use crate::multisig_p2p::OutgoingMultisigStageMessages; use state_chain_runtime::AccountId; use client::{ @@ -23,7 +24,6 @@ use crate::multisig::{KeygenInfo, KeygenOutcome, MessageHash, SigningOutcome}; use super::ceremony_id_tracker::CeremonyIdTracker; use super::keygen::{AwaitCommitments1, HashContext, KeygenData, KeygenOptions}; -use super::MultisigMessage; type SigningStateRunner = StateRunner; type KeygenStateRunner = StateRunner; @@ -34,7 +34,7 @@ type KeygenStateRunner = StateRunner; pub struct CeremonyManager { my_account_id: AccountId, outcome_sender: MultisigOutcomeSender, - outgoing_p2p_message_sender: UnboundedSender<(AccountId, MultisigMessage)>, + outgoing_p2p_message_sender: UnboundedSender, signing_states: HashMap, keygen_states: HashMap, logger: slog::Logger, @@ -45,7 +45,7 @@ impl CeremonyManager { pub fn new( my_account_id: AccountId, outcome_sender: MultisigOutcomeSender, - outgoing_p2p_message_sender: UnboundedSender<(AccountId, MultisigMessage)>, + outgoing_p2p_message_sender: UnboundedSender, logger: &slog::Logger, ) -> Self { CeremonyManager { diff --git a/engine/src/multisig/client/common/broadcast.rs b/engine/src/multisig/client/common/broadcast.rs index fa0c6edd5e0..ca14c798a6c 100644 --- a/engine/src/multisig/client/common/broadcast.rs +++ b/engine/src/multisig/client/common/broadcast.rs @@ -4,7 +4,10 @@ use std::{ fmt::Display, }; -use crate::multisig::client::{MultisigData, MultisigMessage}; +use crate::{ + multisig::client::{MultisigData, MultisigMessage}, + multisig_p2p::OutgoingMultisigStageMessages, +}; use super::ceremony_stage::{CeremonyCommon, CeremonyStage, ProcessMessageResult, StageResult}; @@ -88,56 +91,59 @@ where type Result = Result; fn init(&mut self) { - // TODO Clean and remove dup code (Alastair Holmes 18.11.2021) - match self.processor.init() { - DataToSend::Broadcast(data) => { - for destination_idx in &self.common.all_idxs { - if *destination_idx == self.common.own_idx { - // Save our own share - self.messages.insert(self.common.own_idx, data.clone()); - } else { - let data: D = data.clone().into(); - self.common - .outgoing_p2p_message_sender - .send(( - self.common - .validator_mapping - .get_id(*destination_idx) - .expect("Unknown account index") - .clone(), - MultisigMessage { - ceremony_id: self.common.ceremony_id, - data: data.into(), - }, - )) - .expect("Could not send p2p message."); - } - } + let common = &self.common; + + let idx_to_id = |idx: &usize| { + common + .validator_mapping + .get_id(*idx) + .expect("Unknown account index") + .clone() + }; + + let (own_message, outgoing_messages) = match self.processor.init() { + DataToSend::Broadcast(stage_data) => { + let ceremony_data: D = stage_data.clone().into(); + ( + stage_data.clone(), + OutgoingMultisigStageMessages::Broadcast( + common.all_idxs.iter().map(idx_to_id).collect(), + MultisigMessage { + ceremony_id: common.ceremony_id, + data: ceremony_data.into(), + }, + ), + ) } - DataToSend::Private(messages) => { - for (destination_idx, data) in messages { - if destination_idx == self.common.own_idx { - self.messages.insert(self.common.own_idx, data); - } else { - let data: D = data.clone().into(); - self.common - .outgoing_p2p_message_sender - .send(( - self.common - .validator_mapping - .get_id(destination_idx) - .expect("Unknown account index") - .clone(), + DataToSend::Private(mut messages) => ( + messages + .remove(&common.own_idx) + .expect("Must include message to self"), + OutgoingMultisigStageMessages::Private( + messages + .into_iter() + .map(|(idx, stage_data)| { + let ceremony_data: D = stage_data.into(); + ( + idx_to_id(&idx), MultisigMessage { - ceremony_id: self.common.ceremony_id, - data: data.into(), + ceremony_id: common.ceremony_id, + data: ceremony_data.into(), }, - )) - .expect("Could not send p2p message."); - } - } - } - } + ) + }) + .collect(), + ), + ), + }; + + // Save our own share + self.messages.insert(common.own_idx, own_message); + + self.common + .outgoing_p2p_message_sender + .send(outgoing_messages) + .expect("Could not send p2p message."); } fn process_message(&mut self, signer_idx: usize, m: D) -> ProcessMessageResult { diff --git a/engine/src/multisig/client/common/ceremony_stage.rs b/engine/src/multisig/client/common/ceremony_stage.rs index b05cf36ce37..42cf9a34781 100644 --- a/engine/src/multisig/client/common/ceremony_stage.rs +++ b/engine/src/multisig/client/common/ceremony_stage.rs @@ -4,9 +4,9 @@ use dyn_clone::DynClone; use pallet_cf_vaults::CeremonyId; use tokio::sync::mpsc::UnboundedSender; -use crate::multisig::client::{utils::PartyIdxMapping, MultisigMessage}; - -use state_chain_runtime::AccountId; +use crate::{ + multisig::client::utils::PartyIdxMapping, multisig_p2p::OutgoingMultisigStageMessages, +}; /// Outcome of a given ceremony stage pub enum StageResult { @@ -62,7 +62,7 @@ pub struct CeremonyCommon { pub own_idx: usize, /// Indexes of parties participating in the ceremony pub all_idxs: BTreeSet, - pub outgoing_p2p_message_sender: UnboundedSender<(AccountId, MultisigMessage)>, + pub outgoing_p2p_message_sender: UnboundedSender, pub validator_mapping: Arc, pub logger: slog::Logger, } diff --git a/engine/src/multisig/client/mod.rs b/engine/src/multisig/client/mod.rs index c1275c93ed4..68503743b79 100644 --- a/engine/src/multisig/client/mod.rs +++ b/engine/src/multisig/client/mod.rs @@ -22,6 +22,7 @@ use crate::{ eth::utils::pubkey_to_eth_addr, logging::{CEREMONY_ID_KEY, REQUEST_TO_SIGN_EXPIRED}, multisig::{KeyDB, KeyId, MultisigInstruction}, + multisig_p2p::OutgoingMultisigStageMessages, }; use state_chain_runtime::AccountId; @@ -173,7 +174,7 @@ where key_store: KeyStore, pub ceremony_manager: CeremonyManager, multisig_outcome_sender: MultisigOutcomeSender, - outgoing_p2p_message_sender: UnboundedSender<(AccountId, MultisigMessage)>, + outgoing_p2p_message_sender: UnboundedSender, /// Requests awaiting a key pending_requests_to_sign: HashMap>, keygen_options: KeygenOptions, @@ -188,7 +189,7 @@ where my_account_id: AccountId, db: S, multisig_outcome_sender: MultisigOutcomeSender, - outgoing_p2p_message_sender: UnboundedSender<(AccountId, MultisigMessage)>, + outgoing_p2p_message_sender: UnboundedSender, keygen_options: KeygenOptions, logger: &slog::Logger, ) -> Self { diff --git a/engine/src/multisig/client/tests/helpers.rs b/engine/src/multisig/client/tests/helpers.rs index 8af3727af47..fd1a6f935a8 100644 --- a/engine/src/multisig/client/tests/helpers.rs +++ b/engine/src/multisig/client/tests/helpers.rs @@ -8,7 +8,10 @@ use std::{ use anyhow::Result; use cf_chains::eth::{AggKey, SchnorrVerificationComponents}; -use futures::{stream::Peekable, StreamExt}; +use futures::{ + stream::{self, Peekable}, + StreamExt, +}; use itertools::Itertools; use pallet_cf_vaults::CeremonyId; @@ -25,6 +28,7 @@ use crate::{ }, KeyId, MultisigInstruction, SchnorrSignature, }, + multisig_p2p::OutgoingMultisigStageMessages, }; use crate::testing::assert_ok; @@ -52,77 +56,41 @@ pub type MultisigClientNoDB = MultisigClient; use super::{ACCOUNT_IDS, KEYGEN_CEREMONY_ID, MESSAGE_HASH, SIGNER_IDS, SIGN_CEREMONY_ID}; -macro_rules! recv_data_keygen { - ($rx:expr, $variant: path) => {{ - let (_, m) = expect_next_with_timeout($rx).await; - - match m { - MultisigMessage { - data: MultisigData::Keygen($variant(inner)), - .. - } => inner, - _ => { - panic!("Received message is not {}", stringify!($variant)); - } - } +macro_rules! recv_broadcast { + ($rxs:expr, $ceremony_variant: path, $variant: path) => {{ + futures::stream::iter($rxs.iter_mut()) + .then(|(id, rx)| async move { + ( + id.clone(), + match expect_next_with_timeout(rx).await { + OutgoingMultisigStageMessages::Broadcast( + _, + MultisigMessage { + data: $ceremony_variant($variant(inner)), + .. + }, + ) => inner, + _ => { + panic!("Received message is not {}", stringify!($variant)); + } + }, + ) + }) + .collect::>() + .await }}; } -macro_rules! recv_all_data_keygen { - ($rxs:expr, $variant: path) => {{ - let mut messages = HashMap::new(); - - let count = $rxs.len(); - - for (id, rx) in $rxs.iter_mut() { - let data = recv_data_keygen!(rx, $variant); - messages.insert(id.clone(), data); - - // ignore (count(other nodes) - 1) messages - for _ in 0..count - 2 { - let _ = recv_data_keygen!(rx, $variant); - } - } - - messages - }}; -} - -macro_rules! recv_data_signing { - ($rx:expr, $variant: path) => {{ - let (_, m) = expect_next_with_timeout($rx).await; - - match m { - MultisigMessage { - data: MultisigData::Signing($variant(inner)), - .. - } => inner, - _ => { - panic!("Received message is not {}", stringify!($variant)); - } - } - }}; +macro_rules! recv_keygen_broadcast { + ($rxs:expr, $variant: path) => { + recv_broadcast!($rxs, MultisigData::Keygen, $variant) + }; } -macro_rules! recv_all_data_signing { - ($rxs:expr, $variant: path) => {{ - let mut messages = HashMap::new(); - - let signer_count = $rxs.len(); - - for (id, rx) in $rxs.iter_mut() { - let data = recv_data_signing!(rx, $variant); - messages.insert(id.clone(), data); - - // ignore (count(other nodes) - 1) messages - for _ in 0..signer_count - 2 { - let _ = recv_data_signing!(rx, $variant); - } - assert_channel_empty(rx).await; - } - - messages - }}; +macro_rules! recv_siging_broadcast { + ($rxs:expr, $variant: path) => { + recv_broadcast!($rxs, MultisigData::Signing, $variant) + }; } macro_rules! distribute_data_keygen_custom { @@ -188,7 +156,7 @@ pub(super) type MultisigOutcomeReceiver = Pin>>>; pub(super) type P2PMessageReceiver = - Pin>>>; + Pin>>>; pub struct Stage0Data { pub clients: HashMap, @@ -758,7 +726,7 @@ impl KeygenContext { c.process_multisig_instruction(MultisigInstruction::Keygen(keygen_info.clone())); } - let comm1s = recv_all_data_keygen!(p2p_rxs, KeygenData::Comm1); + let comm1s = recv_keygen_broadcast!(p2p_rxs, KeygenData::Comm1); println!("Received all comm1"); @@ -787,7 +755,7 @@ impl KeygenContext { .values() .for_each(|c| assert_ok!(c.ensure_ceremony_at_keygen_stage(2, &ceremony_id))); - let ver2s = recv_all_data_keygen!(p2p_rxs, KeygenData::Verify2); + let ver2s = recv_keygen_broadcast!(p2p_rxs, KeygenData::Verify2); let ver_com_stage2 = CommVerStage2Data { clients: clients.clone(), @@ -863,18 +831,7 @@ impl KeygenContext { // *** Collect all Secret3 - let mut sec3s: HashMap = HashMap::new(); - - for (sender_id, rx) in p2p_rxs.iter_mut() { - let mut sec3_map = HashMap::new(); - for i in 0..self.account_ids.len() - 1 { - println!("recv secret3 keygen, i: {}", i); - let (dest, sec3) = recv_secret3_keygen(rx).await; - sec3_map.insert(dest, sec3); - } - - sec3s.insert(sender_id.clone(), sec3_map); - } + let sec3s = recv_keygen_secret3(p2p_rxs).await; println!("Received all sec3"); @@ -917,7 +874,7 @@ impl KeygenContext { println!("Distributed all sec3"); - let complaints = recv_all_data_keygen!(p2p_rxs, KeygenData::Complaints4); + let complaints = recv_keygen_broadcast!(p2p_rxs, KeygenData::Complaints4); let comp_stage4 = Some(CompStage4Data { clients: clients.clone(), @@ -938,7 +895,7 @@ impl KeygenContext { println!("Distributed all complaints"); - let ver_complaints = recv_all_data_keygen!(p2p_rxs, KeygenData::VerifyComplaints5); + let ver_complaints = recv_keygen_broadcast!(p2p_rxs, KeygenData::VerifyComplaints5); let ver_comp_stage5 = Some(VerCompStage5Data { clients: clients.clone(), @@ -970,7 +927,7 @@ impl KeygenContext { if nodes_entered_blaming { println!("All clients entered blaming phase!"); - let responses6 = recv_all_data_keygen!(p2p_rxs, KeygenData::BlameResponse6); + let responses6 = recv_keygen_broadcast!(p2p_rxs, KeygenData::BlameResponse6); blame_responses6 = Some(BlameResponses6Data { clients: clients.clone(), resp6: responses6.clone(), @@ -990,7 +947,7 @@ impl KeygenContext { println!("Distributed all blame responses"); - let ver7 = recv_all_data_keygen!(p2p_rxs, KeygenData::VerifyBlameResponses7); + let ver7 = recv_keygen_broadcast!(p2p_rxs, KeygenData::VerifyBlameResponses7); ver_blame_responses7 = Some(VerBlameResponses7Data { clients: clients.clone(), ver7: ver7.clone(), @@ -1164,7 +1121,7 @@ impl KeygenContext { assert_ok!(c.ensure_at_signing_stage(1)); } - let comm1s = recv_all_data_signing!(p2p_rxs, SigningData::CommStage1); + let comm1s = recv_siging_broadcast!(p2p_rxs, SigningData::CommStage1); let sign_phase1 = SigningPhase1Data { clients: clients.clone(), @@ -1191,7 +1148,7 @@ impl KeygenContext { .for_each(|c| assert_ok!(c.ensure_at_signing_stage(2))); // *** Collect Ver2 messages *** - let ver2s = recv_all_data_signing!(p2p_rxs, SigningData::BroadcastVerificationStage2); + let ver2s = recv_siging_broadcast!(p2p_rxs, SigningData::BroadcastVerificationStage2); let sign_phase2 = SigningPhase2Data { clients: clients.clone(), @@ -1242,7 +1199,7 @@ impl KeygenContext { // *** Collect local sigs *** - let local_sigs = recv_all_data_signing!(p2p_rxs, SigningData::LocalSigStage3); + let local_sigs = recv_siging_broadcast!(p2p_rxs, SigningData::LocalSigStage3); let sign_phase3 = SigningPhase3Data { clients: clients.clone(), @@ -1265,7 +1222,7 @@ impl KeygenContext { }); // *** Collect Ver4 messages *** - let ver4s = recv_all_data_signing!(p2p_rxs, SigningData::VerifyLocalSigsStage4); + let ver4s = recv_siging_broadcast!(p2p_rxs, SigningData::VerifyLocalSigsStage4); let sign_phase4 = SigningPhase4Data { clients: clients.clone(), @@ -1479,19 +1436,38 @@ pub async fn expect_next_with_timeout( } } -async fn recv_secret3_keygen(rx: &mut P2PMessageReceiver) -> (AccountId, keygen::SecretShare3) { - if let ( - dest, - MultisigMessage { - data: MultisigData::Keygen(KeygenData::SecretShares3(sec3)), - .. - }, - ) = expect_next_with_timeout(rx).await - { - return (dest, sec3); - } else { - panic!("Received message is not Secret3 (keygen)"); - } +async fn recv_keygen_secret3( + p2p_rxs: &mut HashMap, +) -> HashMap> { + stream::iter(p2p_rxs.iter_mut()) + .then(|(id, rx)| async move { + if let OutgoingMultisigStageMessages::Private(messages) = + expect_next_with_timeout(rx).await + { + ( + id.clone(), + messages + .into_iter() + .map(move |(dest, message)| { + ( + dest, + match message { + MultisigMessage { + data: MultisigData::Keygen(KeygenData::SecretShares3(sec3)), + .. + } => sec3, + _ => panic!(), + }, + ) + }) + .collect(), + ) + } else { + panic!("Received message is not Secret3 (keygen)"); + } + }) + .collect::>() + .await } pub fn sig_data_to_p2p(data: impl Into) -> MultisigMessage { diff --git a/engine/src/multisig/mod.rs b/engine/src/multisig/mod.rs index 624e13a5be8..ac0a171867c 100644 --- a/engine/src/multisig/mod.rs +++ b/engine/src/multisig/mod.rs @@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; -use crate::{common, logging::COMPONENT_KEY}; +use crate::{common, logging::COMPONENT_KEY, multisig_p2p::OutgoingMultisigStageMessages}; use futures::StreamExt; use slog::o; use state_chain_runtime::AccountId; @@ -65,7 +65,7 @@ pub fn start_client( mut multisig_instruction_receiver: UnboundedReceiver, multisig_outcome_sender: UnboundedSender, mut incoming_p2p_message_receiver: UnboundedReceiver<(AccountId, MultisigMessage)>, - outgoing_p2p_message_sender: UnboundedSender<(AccountId, MultisigMessage)>, + outgoing_p2p_message_sender: UnboundedSender, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>, keygen_options: KeygenOptions, logger: &slog::Logger, diff --git a/engine/src/multisig_p2p.rs b/engine/src/multisig_p2p.rs index 910c18395d2..d10a4b2efe3 100644 --- a/engine/src/multisig_p2p.rs +++ b/engine/src/multisig_p2p.rs @@ -25,7 +25,9 @@ use zeroize::Zeroizing; use frame_support::StoragePrefixedMap; use crate::{ - common::{self, read_clean_and_decode_hex_str_file, rpc_error_into_anyhow_error}, + common::{ + self, format_iterator, read_clean_and_decode_hex_str_file, rpc_error_into_anyhow_error, + }, logging::COMPONENT_KEY, multisig::MultisigMessage, settings, @@ -38,6 +40,13 @@ pub enum AccountPeerMappingChange { Unregistered, } +// TODO: Consider if this should be removed, particularly once we no longer use Substrate for peering +#[derive(Debug)] +pub enum OutgoingMultisigStageMessages { + Broadcast(Vec, MultisigMessage), + Private(Vec<(AccountId, MultisigMessage)>), +} + /* TODO: This code should be merged into the multisig top-level function (start_client), primarily to avoid the problem where multisig sends messages before the mapping @@ -102,7 +111,7 @@ pub async fn start( state_chain_client: Arc>, latest_block_hash: H256, incoming_p2p_message_sender: UnboundedSender<(AccountId, MultisigMessage)>, - mut outgoing_p2p_message_receiver: UnboundedReceiver<(AccountId, MultisigMessage)>, + mut outgoing_p2p_message_receiver: UnboundedReceiver, mut account_mapping_change_receiver: UnboundedReceiver<( AccountId, sp_core::ed25519::Public, @@ -220,17 +229,33 @@ pub async fn start( Err(error) => slog::error!(logger, "Failed to receive P2P message: {}", error) } } - Some((account_id, message)) = outgoing_p2p_message_receiver.recv() => { - match async { - account_to_peer.get(&account_id).ok_or_else(|| anyhow::Error::msg(format!("Missing Peer Id mapping for Account Id: {}", account_id))) - }.and_then(|(peer_id, _, _)| { - client.send_message( - vec![peer_id.into()], - bincode::serialize(&message).unwrap() - ).map_err(rpc_error_into_anyhow_error) - }).await { - Ok(_) => slog::info!(logger, "Sent P2P message to: {}", account_id), - Err(error) => slog::error!(logger, "Failed to send P2P message to: {}. {}", account_id, error) + Some(messages) = outgoing_p2p_message_receiver.recv() => { + async fn send_messages<'a, AccountIds: 'a + IntoIterator + Clone>(client: &P2PRpcClient, account_to_peer: &BTreeMap, account_ids: AccountIds, message: MultisigMessage, logger: &slog::Logger) { + match async { + account_ids.clone().into_iter().map(|account_id| match account_to_peer.get(&account_id) { + Some((peer_id, _, _)) => Ok(peer_id.into()), + None => Err(anyhow::Error::msg(format!("Missing Peer Id mapping for Account Id: {}", account_id))), + }).collect::, _>>() + }.and_then(|peer_ids| { + client.send_message( + peer_ids, + bincode::serialize(&message).unwrap() + ).map_err(rpc_error_into_anyhow_error) + }).await { + Ok(_) => slog::info!(logger, "Sent P2P message to: {}", format_iterator(account_ids)), + Err(error) => slog::error!(logger, "Failed to send P2P message to: {}. {}", format_iterator(account_ids), error) + } + } + + match messages { + OutgoingMultisigStageMessages::Broadcast(account_ids, message) => { + send_messages(&client, &account_to_peer, account_ids.iter(), message, &logger).await; + }, + OutgoingMultisigStageMessages::Private(messages) => { + for (account_id, message) in messages { + send_messages(&client, &account_to_peer, std::iter::once(&account_id), message, &logger).await; + } + } } } Some((account_id, peer_public_key, account_peer_mapping_change)) = account_mapping_change_receiver.recv() => {