Skip to content

Commit

Permalink
A0-1413: remove legacy network (#719)
Browse files Browse the repository at this point in the history
* remove legacy network

* fix tests

* remove unnecessary comment

* fix test names
  • Loading branch information
maciejnems authored Nov 10, 2022
1 parent 8228b54 commit 83b21d9
Show file tree
Hide file tree
Showing 14 changed files with 335 additions and 1,210 deletions.
10 changes: 1 addition & 9 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,6 @@ fn setup(
),
ServiceError,
> {
config
.network
.extra_sets
.push(finality_aleph::peers_set_config(Protocol::Generic));
config
.network
.extra_sets
Expand Down Expand Up @@ -246,7 +242,7 @@ fn setup(

/// Builds a new service for a full client.
pub fn new_authority(
mut config: Configuration,
config: Configuration,
aleph_config: AlephCli,
) -> Result<TaskManager, ServiceError> {
let sc_service::PartialComponents {
Expand All @@ -259,10 +255,6 @@ pub fn new_authority(
transaction_pool,
other: (block_import, justification_tx, justification_rx, mut telemetry, metrics),
} = new_partial(&config)?;
config
.network
.extra_sets
.push(finality_aleph::peers_set_config(Protocol::Validator));

let backup_path = get_backup_path(
&aleph_config,
Expand Down
6 changes: 6 additions & 0 deletions docker/docker-compose.bridged.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ services:
networks:
- main
- Node0
environment:
- PUBLIC_VALIDATOR_ADDRESS=Node0:30343

Node1:
extends:
Expand All @@ -15,6 +17,7 @@ services:
- main
- Node1
environment:
- PUBLIC_VALIDATOR_ADDRESS=Node1:30344
- BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID

Node2:
Expand All @@ -25,6 +28,7 @@ services:
- main
- Node2
environment:
- PUBLIC_VALIDATOR_ADDRESS=Node2:30345
- BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID

Node3:
Expand All @@ -35,6 +39,7 @@ services:
- main
- Node3
environment:
- PUBLIC_VALIDATOR_ADDRESS=Node3:30346
- BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID

Node4:
Expand All @@ -45,6 +50,7 @@ services:
- main
- Node4
environment:
- PUBLIC_VALIDATOR_ADDRESS=Node4:30347
- BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID

networks:
Expand Down
8 changes: 0 additions & 8 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,6 @@ pub fn peers_set_config(protocol: Protocol) -> sc_network::config::NonDefaultSet
);

config.set_config = match protocol {
// No spontaneous connections, only reserved nodes added by the network logic.
Protocol::Validator => sc_network::config::SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: sc_network::config::NonReservedPeerMode::Deny,
},
Protocol::Generic => sc_network::config::SetConfig::default(),
Protocol::Authentication => sc_network::config::SetConfig::default(),
};
config
Expand Down
63 changes: 7 additions & 56 deletions finality-aleph/src/network/manager/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::{
};

use codec::{Decode, Encode};
use log::{debug, info, trace, warn};
use log::{debug, info, trace};

use crate::{
network::{
manager::{Authentication, SessionHandler},
DataCommand, Multiaddress, Protocol,
DataCommand, Multiaddress,
},
NodeIndex, SessionId,
};
Expand All @@ -33,7 +33,7 @@ impl<M: Multiaddress> DiscoveryMessage<M> {
}
}

/// Handles creating and responding to discovery messages.
/// Handles creating and rebroadcasting discovery messages.
pub struct Discovery<M: Multiaddress> {
cooldown: Duration,
last_broadcast: HashMap<NodeIndex, Instant>,
Expand All @@ -54,16 +54,6 @@ fn authentication_broadcast<M: Multiaddress>(
)
}

fn response<M: Multiaddress>(
authentication: Authentication<M>,
peer_id: M::PeerId,
) -> DiscoveryCommand<M> {
(
DiscoveryMessage::Authentication(authentication),
DataCommand::SendTo(peer_id, Protocol::Generic),
)
}

impl<M: Multiaddress> Discovery<M> {
/// Create a new discovery handler with the given response/broadcast cooldown.
pub fn new(cooldown: Duration) -> Self {
Expand Down Expand Up @@ -122,16 +112,6 @@ impl<M: Multiaddress> Discovery<M> {
}
let node_id = authentication.0.creator();
let mut messages = Vec::new();
match handler.peer_id(&node_id) {
Some(peer_id) => {
if let Some(handler_authentication) = handler.authentication() {
messages.push(response(handler_authentication, peer_id));
}
}
None => {
warn!(target: "aleph-network", "Id of correctly authenticated peer not present.")
}
}
if self.should_rebroadcast(&node_id) {
trace!(target: "aleph-network", "Rebroadcasting {:?}.", authentication);
self.last_broadcast.insert(node_id, Instant::now());
Expand Down Expand Up @@ -256,7 +236,7 @@ mod tests {
}

#[tokio::test]
async fn rebroadcasts_responds_and_accepts_addresses() {
async fn rebroadcasts_and_accepts_addresses() {
let (mut discovery, mut handlers, _) = build().await;
let authentication = handlers[1].authentication().unwrap();
let handler = &mut handlers[0];
Expand All @@ -265,19 +245,15 @@ mod tests {
handler,
);
assert_eq!(addresses, authentication.0.addresses());
assert_eq!(commands.len(), 2);
assert_eq!(commands.len(), 1);
assert!(commands.iter().any(|command| matches!(command, (
DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication),
DataCommand::Broadcast,
) if rebroadcast_authentication == &authentication)));
assert!(commands.iter().any(|command| matches!(command, (
DiscoveryMessage::Authentication(authentication),
DataCommand::SendTo(_, _),
) if *authentication == handler.authentication().unwrap())));
}

#[tokio::test]
async fn non_validators_rebroadcasts_responds() {
async fn non_validators_rebroadcasts() {
let (mut discovery, handlers, mut non_validator) = build().await;
let authentication = handlers[1].authentication().unwrap();
let (addresses, commands) = discovery.handle_message(
Expand All @@ -293,7 +269,7 @@ mod tests {
}

#[tokio::test]
async fn does_not_rebroadcast_nor_respond_to_wrong_authentications() {
async fn does_not_rebroadcast_wrong_authentications() {
let (mut discovery, mut handlers, _) = build().await;
let (auth_data, _) = handlers[1].authentication().unwrap();
let (_, signature) = handlers[2].authentication().unwrap();
Expand All @@ -307,31 +283,6 @@ mod tests {
assert!(commands.is_empty());
}

#[tokio::test]
async fn does_not_rebroadcast_quickly_but_still_responds() {
let (mut discovery, mut handlers, _) = build().await;
let authentication = handlers[1].authentication().unwrap();
let handler = &mut handlers[0];
discovery.handle_message(
DiscoveryMessage::AuthenticationBroadcast(authentication.clone()),
handler,
);
let (addresses, commands) = discovery.handle_message(
DiscoveryMessage::AuthenticationBroadcast(authentication.clone()),
handler,
);
assert_eq!(addresses.len(), authentication.0.addresses().len());
assert_eq!(
addresses[0].encode(),
authentication.0.addresses()[0].encode()
);
assert_eq!(commands.len(), 1);
assert!(matches!(&commands[0], (
DiscoveryMessage::Authentication(authentication),
DataCommand::SendTo(_, _),
) if *authentication == handler.authentication().unwrap()));
}

#[tokio::test]
async fn rebroadcasts_after_cooldown() {
let (mut discovery, mut handlers, _) = build().await;
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/network/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub type Authentication<M> = (AuthData<M>, Signature);
/// The order of the data and session_id is fixed in encode and the decode expects it to be data, session_id.
/// Since data is versioned, i.e. it's encoding starts with a version number in the standardized way,
/// this will allow us to retrofit versioning here if we ever need to change this structure.
#[derive(Clone)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DataInSession<D: Data> {
pub data: D,
pub session_id: SessionId,
Expand Down
28 changes: 6 additions & 22 deletions finality-aleph/src/network/manager/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
Connections, Discovery, DiscoveryMessage, NetworkData, SessionHandler,
SessionHandlerError,
},
ConnectionCommand, Data, DataCommand, Multiaddress, NetworkIdentity, PeerId, Protocol,
ConnectionCommand, Data, DataCommand, Multiaddress, NetworkIdentity, PeerId,
},
MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL,
};
Expand Down Expand Up @@ -448,22 +448,12 @@ impl<NI: NetworkIdentity, D: Data> Service<NI, D> {
Recipient::Everyone => (0..handler.node_count().0)
.map(NodeIndex)
.flat_map(|node_id| handler.peer_id(&node_id))
.map(|peer_id| {
(
to_send.clone(),
DataCommand::SendTo(peer_id, Protocol::Validator),
)
})
.map(|peer_id| (to_send.clone(), DataCommand::SendTo(peer_id)))
.collect(),
Recipient::Node(node_id) => handler
.peer_id(&node_id)
.into_iter()
.map(|peer_id| {
(
to_send.clone(),
DataCommand::SendTo(peer_id, Protocol::Validator),
)
})
.map(|peer_id| (to_send.clone(), DataCommand::SendTo(peer_id)))
.collect(),
}
} else {
Expand Down Expand Up @@ -782,7 +772,7 @@ mod tests {
network::{
manager::{DiscoveryMessage, NetworkData},
mock::{crypto_basics, MockNetworkIdentity},
ConnectionCommand, DataCommand, Protocol,
ConnectionCommand, DataCommand,
},
Recipient, SessionId,
};
Expand Down Expand Up @@ -934,13 +924,10 @@ mod tests {
addresses.into_iter().collect()
))
);
assert_eq!(data.len(), 2);
assert_eq!(data.len(), 1);
assert!(data
.iter()
.any(|(_, command)| command == &DataCommand::Broadcast));
assert!(data
.iter()
.any(|(_, command)| matches!(command, &DataCommand::SendTo(_, _))));
}

#[tokio::test]
Expand Down Expand Up @@ -975,10 +962,7 @@ mod tests {
let messages = service.on_user_message(2137, session_id, Recipient::Everyone);
assert_eq!(messages.len(), 1);
let (network_data, data_command) = &messages[0];
assert!(matches!(
data_command,
DataCommand::SendTo(_, Protocol::Validator)
));
assert!(matches!(data_command, DataCommand::SendTo(_)));
assert_eq!(network_data, &NetworkData::Data(2137, session_id));
}
}
38 changes: 20 additions & 18 deletions finality-aleph/src/network/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
collections::{HashSet, VecDeque},
fmt,
sync::Arc,
time::Duration,
};

use aleph_primitives::KEY_TYPE;
Expand All @@ -14,6 +15,7 @@ use futures::{
use parking_lot::Mutex;
use rand::random;
use sp_keystore::{testing::KeyStore, CryptoStore};
use tokio::time::timeout;

use crate::{
crypto::{AuthorityPen, AuthorityVerifier},
Expand Down Expand Up @@ -106,6 +108,8 @@ pub struct Channel<T>(
pub Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<T>>>,
);

const TIMEOUT_FAIL: Duration = Duration::from_secs(10);

impl<T> Channel<T> {
pub fn new() -> Self {
let (tx, rx) = mpsc::unbounded();
Expand All @@ -117,7 +121,19 @@ impl<T> Channel<T> {
}

pub async fn next(&mut self) -> Option<T> {
self.1.lock().await.next().await
timeout(TIMEOUT_FAIL, self.1.lock().await.next())
.await
.ok()
.flatten()
}

pub async fn take(&mut self, n: usize) -> Vec<T> {
timeout(
TIMEOUT_FAIL,
self.1.lock().await.by_ref().take(n).collect::<Vec<_>>(),
)
.await
.unwrap_or(Vec::new())
}

pub async fn try_next(&self) -> Option<T> {
Expand All @@ -142,38 +158,24 @@ pub type MockData = Vec<u8>;
type MessageForUser<D, M> = (NetworkData<D, M>, DataCommand<<M as Multiaddress>::PeerId>);
type NetworkServiceIO<M> = NetworkIO<NetworkData<MockData, M>, M>;

pub struct MockIO<M: Multiaddress, LM: Multiaddress> {
pub struct MockIO<M: Multiaddress> {
pub messages_for_user: mpsc::UnboundedSender<MessageForUser<MockData, M>>,
pub messages_from_user: mpsc::UnboundedReceiver<NetworkData<MockData, M>>,
pub commands_for_manager: mpsc::UnboundedSender<ConnectionCommand<M>>,
pub legacy_messages_for_user: mpsc::UnboundedSender<MessageForUser<MockData, LM>>,
pub legacy_messages_from_user: mpsc::UnboundedReceiver<NetworkData<MockData, LM>>,
pub legacy_commands_for_manager: mpsc::UnboundedSender<ConnectionCommand<LM>>,
}

impl<M: Multiaddress + 'static, LM: Multiaddress + 'static> MockIO<M, LM> {
pub fn new() -> (MockIO<M, LM>, NetworkServiceIO<M>, NetworkServiceIO<LM>) {
impl<M: Multiaddress + 'static> MockIO<M> {
pub fn new() -> (MockIO<M>, NetworkServiceIO<M>) {
let (mock_messages_for_user, messages_from_user) = mpsc::unbounded();
let (messages_for_user, mock_messages_from_user) = mpsc::unbounded();
let (mock_commands_for_manager, commands_from_manager) = mpsc::unbounded();
let (legacy_mock_messages_for_user, legacy_messages_from_user) = mpsc::unbounded();
let (legacy_messages_for_user, legacy_mock_messages_from_user) = mpsc::unbounded();
let (legacy_mock_commands_for_manager, legacy_commands_from_manager) = mpsc::unbounded();
(
MockIO {
messages_for_user: mock_messages_for_user,
messages_from_user: mock_messages_from_user,
commands_for_manager: mock_commands_for_manager,
legacy_messages_for_user: legacy_mock_messages_for_user,
legacy_messages_from_user: legacy_mock_messages_from_user,
legacy_commands_for_manager: legacy_mock_commands_for_manager,
},
NetworkServiceIO::new(messages_from_user, messages_for_user, commands_from_manager),
NetworkServiceIO::new(
legacy_messages_from_user,
legacy_messages_for_user,
legacy_commands_from_manager,
),
)
}
}
Expand Down
Loading

0 comments on commit 83b21d9

Please sign in to comment.