Skip to content

Commit

Permalink
Batch P2P Messages
Browse files Browse the repository at this point in the history
  • Loading branch information
AlastairHolmes committed Jan 11, 2022
1 parent 9b54117 commit 79de04c
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 175 deletions.
6 changes: 3 additions & 3 deletions engine/src/multisig/client/ceremony_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use crate::common::format_iterator;
use crate::multisig::client::{self, MultisigOutcome};
use crate::multisig_p2p::OutgoingMultisigStageMessages;
use state_chain_runtime::AccountId;

use client::{
Expand All @@ -23,7 +24,6 @@ use crate::multisig::{KeygenInfo, KeygenOutcome, MessageHash, SigningOutcome};

use super::ceremony_id_tracker::CeremonyIdTracker;
use super::keygen::{AwaitCommitments1, HashContext, KeygenData, KeygenOptions};
use super::MultisigMessage;

type SigningStateRunner = StateRunner<SigningData, SchnorrSignature>;
type KeygenStateRunner = StateRunner<KeygenData, KeygenResultInfo>;
Expand All @@ -34,7 +34,7 @@ type KeygenStateRunner = StateRunner<KeygenData, KeygenResultInfo>;
pub struct CeremonyManager {
my_account_id: AccountId,
outcome_sender: MultisigOutcomeSender,
outgoing_p2p_message_sender: UnboundedSender<(AccountId, MultisigMessage)>,
outgoing_p2p_message_sender: UnboundedSender<OutgoingMultisigStageMessages>,
signing_states: HashMap<CeremonyId, SigningStateRunner>,
keygen_states: HashMap<CeremonyId, KeygenStateRunner>,
logger: slog::Logger,
Expand All @@ -45,7 +45,7 @@ impl CeremonyManager {
pub fn new(
my_account_id: AccountId,
outcome_sender: MultisigOutcomeSender,
outgoing_p2p_message_sender: UnboundedSender<(AccountId, MultisigMessage)>,
outgoing_p2p_message_sender: UnboundedSender<OutgoingMultisigStageMessages>,
logger: &slog::Logger,
) -> Self {
CeremonyManager {
Expand Down
102 changes: 54 additions & 48 deletions engine/src/multisig/client/common/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::{
fmt::Display,
};

use crate::multisig::client::{MultisigData, MultisigMessage};
use crate::{
multisig::client::{MultisigData, MultisigMessage},
multisig_p2p::OutgoingMultisigStageMessages,
};

use super::ceremony_stage::{CeremonyCommon, CeremonyStage, ProcessMessageResult, StageResult};

Expand Down Expand Up @@ -88,56 +91,59 @@ where
type Result = Result;

fn init(&mut self) {
// TODO Clean and remove dup code (Alastair Holmes 18.11.2021)
match self.processor.init() {
DataToSend::Broadcast(data) => {
for destination_idx in &self.common.all_idxs {
if *destination_idx == self.common.own_idx {
// Save our own share
self.messages.insert(self.common.own_idx, data.clone());
} else {
let data: D = data.clone().into();
self.common
.outgoing_p2p_message_sender
.send((
self.common
.validator_mapping
.get_id(*destination_idx)
.expect("Unknown account index")
.clone(),
MultisigMessage {
ceremony_id: self.common.ceremony_id,
data: data.into(),
},
))
.expect("Could not send p2p message.");
}
}
let common = &self.common;

let idx_to_id = |idx: &usize| {
common
.validator_mapping
.get_id(*idx)
.expect("Unknown account index")
.clone()
};

let (own_message, outgoing_messages) = match self.processor.init() {
DataToSend::Broadcast(stage_data) => {
let ceremony_data: D = stage_data.clone().into();
(
stage_data.clone(),
OutgoingMultisigStageMessages::Broadcast(
common.all_idxs.iter().map(idx_to_id).collect(),
MultisigMessage {
ceremony_id: common.ceremony_id,
data: ceremony_data.into(),
},
),
)
}
DataToSend::Private(messages) => {
for (destination_idx, data) in messages {
if destination_idx == self.common.own_idx {
self.messages.insert(self.common.own_idx, data);
} else {
let data: D = data.clone().into();
self.common
.outgoing_p2p_message_sender
.send((
self.common
.validator_mapping
.get_id(destination_idx)
.expect("Unknown account index")
.clone(),
DataToSend::Private(mut messages) => (
messages
.remove(&common.own_idx)
.expect("Must include message to self"),
OutgoingMultisigStageMessages::Private(
messages
.into_iter()
.map(|(idx, stage_data)| {
let ceremony_data: D = stage_data.into();
(
idx_to_id(&idx),
MultisigMessage {
ceremony_id: self.common.ceremony_id,
data: data.into(),
ceremony_id: common.ceremony_id,
data: ceremony_data.into(),
},
))
.expect("Could not send p2p message.");
}
}
}
}
)
})
.collect(),
),
),
};

// Save our own share
self.messages.insert(common.own_idx, own_message);

self.common
.outgoing_p2p_message_sender
.send(outgoing_messages)
.expect("Could not send p2p message.");
}

fn process_message(&mut self, signer_idx: usize, m: D) -> ProcessMessageResult {
Expand Down
8 changes: 4 additions & 4 deletions engine/src/multisig/client/common/ceremony_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use dyn_clone::DynClone;
use pallet_cf_vaults::CeremonyId;
use tokio::sync::mpsc::UnboundedSender;

use crate::multisig::client::{utils::PartyIdxMapping, MultisigMessage};

use state_chain_runtime::AccountId;
use crate::{
multisig::client::utils::PartyIdxMapping, multisig_p2p::OutgoingMultisigStageMessages,
};

/// Outcome of a given ceremony stage
pub enum StageResult<M, Result> {
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct CeremonyCommon {
pub own_idx: usize,
/// Indexes of parties participating in the ceremony
pub all_idxs: BTreeSet<usize>,
pub outgoing_p2p_message_sender: UnboundedSender<(AccountId, MultisigMessage)>,
pub outgoing_p2p_message_sender: UnboundedSender<OutgoingMultisigStageMessages>,
pub validator_mapping: Arc<PartyIdxMapping>,
pub logger: slog::Logger,
}
Expand Down
5 changes: 3 additions & 2 deletions engine/src/multisig/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
eth::utils::pubkey_to_eth_addr,
logging::{CEREMONY_ID_KEY, REQUEST_TO_SIGN_EXPIRED},
multisig::{KeyDB, KeyId, MultisigInstruction},
multisig_p2p::OutgoingMultisigStageMessages,
};

use state_chain_runtime::AccountId;
Expand Down Expand Up @@ -173,7 +174,7 @@ where
key_store: KeyStore<S>,
pub ceremony_manager: CeremonyManager,
multisig_outcome_sender: MultisigOutcomeSender,
outgoing_p2p_message_sender: UnboundedSender<(AccountId, MultisigMessage)>,
outgoing_p2p_message_sender: UnboundedSender<OutgoingMultisigStageMessages>,
/// Requests awaiting a key
pending_requests_to_sign: HashMap<KeyId, Vec<PendingSigningInfo>>,
keygen_options: KeygenOptions,
Expand All @@ -188,7 +189,7 @@ where
my_account_id: AccountId,
db: S,
multisig_outcome_sender: MultisigOutcomeSender,
outgoing_p2p_message_sender: UnboundedSender<(AccountId, MultisigMessage)>,
outgoing_p2p_message_sender: UnboundedSender<OutgoingMultisigStageMessages>,
keygen_options: KeygenOptions,
logger: &slog::Logger,
) -> Self {
Expand Down
Loading

0 comments on commit 79de04c

Please sign in to comment.