Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(dht): split dht to dht and udp structs #448

Merged
merged 1 commit into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions examples/dht_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ use std::net::SocketAddr;

use tox_crypto::*;
use tox_packet::dht::packed_node::PackedNode;
use tox_core::dht::server::*;
use tox_core::dht::server::Server as DhtServer;
use tox_core::dht::server_ext::dht_run_socket;
use tox_core::dht::lan_discovery::*;
use tox_core::udp::Server as UdpServer;
use tox_core::stats::Stats;

mod common;
Expand Down Expand Up @@ -49,24 +50,26 @@ async fn main() -> Result<(), Error> {
let mut lan_discovery_sender =
LanDiscoverySender::new(tx.clone(), server_pk, local_addr.is_ipv6());

let mut server = Server::new(tx, server_pk, server_sk);
server.set_bootstrap_info(3_000_000_000, Box::new(|_| b"This is tox-rs".to_vec()));
server.enable_lan_discovery(true);
server.enable_ipv6_mode(local_addr.is_ipv6());
let mut dht_server = DhtServer::new(tx, server_pk, server_sk);
dht_server.set_bootstrap_info(3_000_000_000, Box::new(|_| b"This is tox-rs".to_vec()));
dht_server.enable_lan_discovery(true);
dht_server.enable_ipv6_mode(local_addr.is_ipv6());

// Bootstrap from nodes
for &(pk, saddr) in &common::BOOTSTRAP_NODES {
let bootstrap_pn = as_packed_node(pk, saddr);

server.add_initial_bootstrap(bootstrap_pn);
dht_server.add_initial_bootstrap(bootstrap_pn);
}

let udp_server = UdpServer::new(dht_server);

let socket = common::bind_socket(local_addr).await;

info!("Running DHT server on {}", local_addr);

futures::select! {
res = dht_run_socket(&server, socket, rx, stats).fuse() => res.map_err(Error::from),
res = dht_run_socket(&udp_server, socket, rx, stats).fuse() => res.map_err(Error::from),
res = lan_discovery_sender.run().fuse() => res.map_err(Error::from),
}
}
14 changes: 8 additions & 6 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use tox_packet::friend_connection::*;
use tox_packet::onion::InnerOnionResponse;
use tox_packet::relay::DataPayload;
use tox_packet::toxid::ToxId;
use tox_core::dht::server::Server;
use tox_core::dht::server::Server as DhtServer;
use tox_core::dht::server_ext::dht_run_socket;
use tox_core::dht::lan_discovery::LanDiscoverySender;
use tox_core::udp::Server as UdpServer;
use tox_core::friend_connection::FriendConnections;
use tox_core::net_crypto::{NetCrypto, NetCryptoNewArgs};
use tox_core::onion::client::OnionClient;
Expand Down Expand Up @@ -70,7 +71,7 @@ async fn main() -> Result<(), Error> {

let (tcp_incoming_tx, mut tcp_incoming_rx) = mpsc::unbounded();

let mut dht_server = Server::new(tx.clone(), dht_pk, dht_sk.clone());
let mut dht_server = DhtServer::new(tx.clone(), dht_pk, dht_sk.clone());
dht_server.enable_lan_discovery(true);
dht_server.enable_ipv6_mode(local_addr.is_ipv6());

Expand All @@ -97,9 +98,6 @@ async fn main() -> Result<(), Error> {
let (net_crypto_tcp_tx, mut net_crypto_tcp_rx) = mpsc::channel(32);
net_crypto.set_tcp_sink(net_crypto_tcp_tx).await;

dht_server.set_net_crypto(net_crypto.clone());
dht_server.set_onion_client(onion_client.clone());

let friend_connections = FriendConnections::new(
real_sk,
real_pk,
Expand All @@ -122,6 +120,10 @@ async fn main() -> Result<(), Error> {
onion_client.add_path_node(node).await;
}

let mut udp_server = UdpServer::new(dht_server);
udp_server.set_net_crypto(net_crypto.clone());
udp_server.set_onion_client(onion_client.clone());

let net_crypto_tcp_future = async {
while let Some((packet, pk)) = net_crypto_tcp_rx.next().await {
tcp_connections.send_data(pk, packet).await?;
Expand Down Expand Up @@ -211,7 +213,7 @@ async fn main() -> Result<(), Error> {
}

futures::select!(
res = dht_run_socket(&dht_server, socket, rx, stats).fuse() => res.map_err(Error::from),
res = dht_run_socket(&udp_server, socket, rx, stats).fuse() => res.map_err(Error::from),
res = lan_discovery_sender.run().fuse() => res.map_err(Error::from),
res = tcp_connections.run().fuse() => res.map_err(Error::from),
res = onion_client.run().fuse() => res.map_err(Error::from),
Expand Down
622 changes: 161 additions & 461 deletions tox_core/src/dht/server/mod.rs

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions tox_core/src/dht/server_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use failure::Fail;

use crate::dht::codec::*;
use tox_packet::dht::Packet;
use crate::dht::server::Server;
use crate::udp::Server;
use crate::stats::Stats;

/// Run DHT server on `UdpSocket`.
pub async fn dht_run_socket(
dht: &Server,
udp: &Server,
socket: UdpSocket,
mut rx: Receiver<(Packet, SocketAddr)>,
stats: Stats
Expand All @@ -32,7 +32,7 @@ pub async fn dht_run_socket(
match event {
Ok((packet, addr)) => {
trace!("Received packet {:?}", packet);
let res = dht.handle_packet(packet, addr).await;
let res = udp.handle_packet(packet, addr).await;

if let Err(ref err) = res {
error!("Failed to handle packet: {:?}", err);
Expand Down Expand Up @@ -74,7 +74,7 @@ pub async fn dht_run_socket(
futures::select! {
read = network_reader.fuse() => read,
write = network_writer.fuse() => write,
run = dht.run().fuse() => {
run = udp.dht.run().fuse() => { // TODO: should we run it here?
let res: Result<_, _> = run;
res.map_err(|e| Error::new(ErrorKind::Other, e.compat()))
}
Expand All @@ -90,6 +90,7 @@ mod tests {

use tox_crypto::*;
use tox_packet::dht::*;
use crate::dht::server::Server as DhtServer;

#[tokio::test]
async fn run_socket() {
Expand All @@ -100,7 +101,7 @@ mod tests {

let (tx, rx) = mpsc::channel(32);

let server = Server::new(tx, server_pk, server_sk);
let server = Server::new(DhtServer::new(tx, server_pk, server_sk));

// Bind server socket
let server_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
Expand Down
9 changes: 3 additions & 6 deletions tox_core/src/friend_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ mod tests {
let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
let (lossless_tx, lossless_rx) = mpsc::unbounded();
let (lossy_tx, _lossy_rx) = mpsc::unbounded();
let mut dht = DhtServer::new(udp_tx.clone(), dht_pk, dht_sk.clone());
let dht = DhtServer::new(udp_tx.clone(), dht_pk, dht_sk.clone());
let tcp_connections = TcpConnections::new(dht_pk, dht_sk.clone(), tcp_incoming_tx);
let onion_client = OnionClient::new(dht.clone(), tcp_connections.clone(), real_sk.clone(), real_pk);
let precomputed_keys = PrecomputedCache::new(dht_sk.clone(), 1);
Expand All @@ -450,8 +450,6 @@ mod tests {
real_sk: real_sk.clone(),
precomputed_keys,
});
dht.set_onion_client(onion_client.clone());
dht.set_net_crypto(net_crypto.clone());
let friend_connections = FriendConnections::new(
real_sk,
real_pk,
Expand Down Expand Up @@ -962,9 +960,8 @@ mod tests {
let handshake = CryptoHandshake::new(&precomputed_key, &handshake_payload, cookie);

let net_crypto = friend_connections.net_crypto.clone();
let dht = friend_connections.dht.clone();
let packets_future = async {
dht.handle_packet(DhtPacket::CryptoHandshake(handshake), friend_saddr).await.unwrap();
net_crypto.handle_udp_crypto_handshake(&handshake, friend_saddr).await.unwrap();

let session_pk = net_crypto.get_session_pk(&friend_pk).await.unwrap();
let session_precomputed_key = precompute(&session_pk, &friend_session_sk);
Expand All @@ -976,7 +973,7 @@ mod tests {
};
let crypto_data = CryptoData::new(&session_precomputed_key, sent_nonce, &crypto_data_payload);

dht.handle_packet(DhtPacket::CryptoData(crypto_data), friend_saddr).await.unwrap();
net_crypto.handle_udp_crypto_data(&crypto_data, friend_saddr).await.unwrap();

let (received, _lossless_rx) = lossless_rx.into_future().await;
let (received_pk, received_data) = received.unwrap();
Expand Down
1 change: 1 addition & 0 deletions tox_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ pub mod net_crypto;
pub mod utils;
pub mod friend_connection;
pub mod stats;
pub mod udp;
2 changes: 1 addition & 1 deletion tox_core/src/onion/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2323,7 +2323,7 @@ mod tests {
id: request_payload.id,
};
let response_packet = NodesResponse::new(&shared_secret, &node_pk, &response_payload);
onion_client.dht.handle_packet(Packet::NodesResponse(response_packet), node.saddr).await.unwrap();
onion_client.dht.handle_nodes_resp(response_packet, node.saddr).await.unwrap();

let mut state = onion_client.state.lock().await;

Expand Down
163 changes: 163 additions & 0 deletions tox_core/src/udp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use crate::dht::ip_port::IsGlobal;
use crate::dht::server::Server as DhtServer;
use crate::dht::server::errors::*;
use crate::net_crypto::NetCrypto;
use crate::onion::client::OnionClient;
use failure::Fail;
use tox_packet::dht::*;
use tox_packet::onion::*;
use std::net::SocketAddr;

/// UDP server that handles DHT, onion and net_crypto packets. Onion and
/// net_crypro handlers are optional since appropriate packets are not handled
/// when operating in DHT server mode.
pub struct Server {
/// DHT server.
pub dht: DhtServer,
/// Onion client that handles `OnionDataResponse` and
/// `OnionAnnounceResponse` packets. It can be `None` in case of pure
/// bootstrap server.
onion_client: Option<OnionClient>,
/// Net crypto module that handles `CookieRequest`, `CookieResponse`,
/// `CryptoHandshake` and `CryptoData` packets. It can be `None` in case of
/// pure bootstrap server when we don't have friends and therefore don't
/// have to handle related packets.
net_crypto: Option<NetCrypto>,
}

impl Server {
/// Create new `Server` instance.
pub fn new(dht: DhtServer) -> Self {
Self {
dht,
onion_client: None,
net_crypto: None,
}
}

/// Function to handle incoming packets and send responses if necessary.
pub async fn handle_packet(&self, packet: Packet, addr: SocketAddr) -> Result<(), HandlePacketError> {
match packet {
Packet::PingRequest(packet) =>
self.dht.handle_ping_req(packet, addr).await,
Packet::PingResponse(packet) =>
self.dht.handle_ping_resp(packet, addr).await,
Packet::NodesRequest(packet) =>
self.dht.handle_nodes_req(packet, addr).await,
Packet::NodesResponse(packet) =>
self.dht.handle_nodes_resp(packet, addr).await,
Packet::CookieRequest(packet) =>
self.handle_cookie_request(&packet, addr).await,
Packet::CookieResponse(packet) =>
self.handle_cookie_response(&packet, addr).await,
Packet::CryptoHandshake(packet) =>
self.handle_crypto_handshake(&packet, addr).await,
Packet::DhtRequest(packet) =>
self.dht.handle_dht_req(packet, addr).await,
Packet::LanDiscovery(packet) =>
self.dht.handle_lan_discovery(&packet, addr).await,
Packet::OnionRequest0(packet) =>
self.dht.handle_onion_request_0(packet, addr).await,
Packet::OnionRequest1(packet) =>
self.dht.handle_onion_request_1(packet, addr).await,
Packet::OnionRequest2(packet) =>
self.dht.handle_onion_request_2(packet, addr).await,
Packet::OnionAnnounceRequest(packet) =>
self.dht.handle_onion_announce_request(packet, addr).await,
Packet::OnionDataRequest(packet) =>
self.dht.handle_onion_data_request(packet).await,
Packet::OnionResponse3(packet) =>
self.dht.handle_onion_response_3(packet).await,
Packet::OnionResponse2(packet) =>
self.dht.handle_onion_response_2(packet).await,
Packet::OnionResponse1(packet) =>
self.dht.handle_onion_response_1(packet).await,
Packet::BootstrapInfo(packet) =>
self.dht.handle_bootstrap_info(&packet, addr).await,
Packet::CryptoData(packet) =>
self.handle_crypto_data(&packet, addr).await,
Packet::OnionDataResponse(packet) =>
self.handle_onion_data_response(&packet).await,
Packet::OnionAnnounceResponse(packet) =>
self.handle_onion_announce_response(&packet, addr).await,
}
}

/// Handle received `OnionDataResponse` packet and pass it to `onion_client` module.
async fn handle_onion_data_response(&self, packet: &OnionDataResponse) -> Result<(), HandlePacketError> {
if let Some(ref onion_client) = self.onion_client {
onion_client.handle_data_response(packet).await
.map_err(|e| e.context(HandlePacketErrorKind::HandleOnionClient).into())
} else {
Err(HandlePacketError::from(HandlePacketErrorKind::OnionClient))
}
}

/// Handle received `OnionAnnounceResponse` packet and pass it to `onion_client` module.
async fn handle_onion_announce_response(&self, packet: &OnionAnnounceResponse, addr: SocketAddr) -> Result<(), HandlePacketError> {
if let Some(ref onion_client) = self.onion_client {
onion_client.handle_announce_response(packet, IsGlobal::is_global(&addr.ip())).await
.map_err(|e| e.context(HandlePacketErrorKind::HandleOnionClient).into())
} else {
Err(HandlePacketError::from(HandlePacketErrorKind::OnionClient))
}
}

/// Set `onion_client` module.
pub fn set_onion_client(&mut self, onion_client: OnionClient) {
self.onion_client = Some(onion_client);
}

/// Handle received `CookieRequest` packet and pass it to `net_crypto`
/// module.
pub async fn handle_cookie_request(&self, packet: &CookieRequest, addr: SocketAddr)
-> Result<(), HandlePacketError> {
if let Some(ref net_crypto) = self.net_crypto {
net_crypto.handle_udp_cookie_request(packet, addr).await
.map_err(|e| e.context(HandlePacketErrorKind::HandleNetCrypto).into())
} else {
Err(
HandlePacketError::from(HandlePacketErrorKind::NetCrypto)
)
}
}

/// Handle received `CookieResponse` packet and pass it to `net_crypto`
/// module.
pub async fn handle_cookie_response(&self, packet: &CookieResponse, addr: SocketAddr)
-> Result<(), HandlePacketError> {
if let Some(ref net_crypto) = self.net_crypto {
net_crypto.handle_udp_cookie_response(packet, addr).await
.map_err(|e| e.context(HandlePacketErrorKind::HandleNetCrypto).into())
} else {
Err(HandlePacketError::from(HandlePacketErrorKind::NetCrypto))
}
}

/// Handle received `CryptoHandshake` packet and pass it to `net_crypto`
/// module.
pub async fn handle_crypto_handshake(&self, packet: &CryptoHandshake, addr: SocketAddr)
-> Result<(), HandlePacketError> {
if let Some(ref net_crypto) = self.net_crypto {
net_crypto.handle_udp_crypto_handshake(packet, addr).await
.map_err(|e| e.context(HandlePacketErrorKind::HandleNetCrypto).into())
} else {
Err(HandlePacketError::from(HandlePacketErrorKind::NetCrypto))
}
}

/// Handle received `CryptoData` packet and pass it to `net_crypto` module.
pub async fn handle_crypto_data(&self, packet: &CryptoData, addr: SocketAddr) -> Result<(), HandlePacketError> {
if let Some(ref net_crypto) = self.net_crypto {
net_crypto.handle_udp_crypto_data(packet, addr).await
.map_err(|e| e.context(HandlePacketErrorKind::HandleNetCrypto).into())
} else {
Err(HandlePacketError::from(HandlePacketErrorKind::NetCrypto))
}
}

/// Set `net_crypto` module.
pub fn set_net_crypto(&mut self, net_crypto: NetCrypto) {
self.net_crypto = Some(net_crypto);
}
}