diff --git a/Cargo.lock b/Cargo.lock index fb18016..b6bb22f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -591,6 +591,20 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "delegate" version = "0.12.0" @@ -959,6 +973,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -1275,6 +1295,7 @@ dependencies = [ "more-asserts", "num_enum", "once_cell", + "parking_lot", "pnet", "rand", "rand_core", @@ -1306,6 +1327,7 @@ dependencies = [ "bytesize", "clap", "ctrlc", + "dashmap", "delegate", "educe", "ipnet", @@ -1316,6 +1338,7 @@ dependencies = [ "metrics", "metrics-util", "more-asserts", + "parking_lot", "pnet", "ppp", "pwhash", @@ -1341,6 +1364,16 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.22" @@ -1580,6 +1613,29 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "paste" version = "1.0.15" @@ -1877,6 +1933,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_syscall" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.11.1" @@ -1945,6 +2010,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.217" diff --git a/lightway-core/Cargo.toml b/lightway-core/Cargo.toml index 910d68c..df4c695 100644 --- a/lightway-core/Cargo.toml +++ b/lightway-core/Cargo.toml @@ -29,6 +29,7 @@ metrics.workspace = true more-asserts.workspace = true num_enum = "0.7.0" once_cell = "1.19.0" +parking_lot = "0.12" pnet.workspace = true rand.workspace = true rand_core = "0.6.4" diff --git a/lightway-core/src/connection.rs b/lightway-core/src/connection.rs index b1cd608..1ec7f08 100644 --- a/lightway-core/src/connection.rs +++ b/lightway-core/src/connection.rs @@ -6,13 +6,14 @@ mod io_adapter; mod key_update; use bytes::{Bytes, BytesMut}; +use parking_lot::Mutex; use rand::Rng; use std::borrow::Cow; use std::net::AddrParseError; use std::num::{NonZeroU16, Wrapping}; use std::{ net::SocketAddr, - sync::{Arc, Mutex}, + sync::Arc, time::{Duration, Instant}, }; use thiserror::Error; @@ -936,7 +937,7 @@ impl Connection { ref mut pending_session_id, .. } => { - let new_session_id = rng.lock().unwrap().gen(); + let new_session_id = rng.lock().gen(); self.session.io_cb_mut().set_session_id(new_session_id); diff --git a/lightway-core/src/connection/builders.rs b/lightway-core/src/connection/builders.rs index 64952ef..cb38dd7 100644 --- a/lightway-core/src/connection/builders.rs +++ b/lightway-core/src/connection/builders.rs @@ -278,7 +278,7 @@ impl<'a, AppState: Send + 'static> ServerConnectionBuilder<'a, AppState> { let auth = ctx.auth.clone(); let ip_pool = ctx.ip_pool.clone(); - let session_id = ctx.rng.lock().unwrap().gen(); + let session_id = ctx.rng.lock().gen(); let outside_mtu = MAX_OUTSIDE_MTU; diff --git a/lightway-core/src/context.rs b/lightway-core/src/context.rs index 24466ae..6368e8a 100644 --- a/lightway-core/src/context.rs +++ b/lightway-core/src/context.rs @@ -1,8 +1,9 @@ pub mod ip_pool; mod server_auth; +use parking_lot::Mutex; use rand::SeedableRng; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use thiserror::Error; use crate::{ diff --git a/lightway-server/Cargo.toml b/lightway-server/Cargo.toml index d5b2d8a..7aec204 100644 --- a/lightway-server/Cargo.toml +++ b/lightway-server/Cargo.toml @@ -24,6 +24,7 @@ bytes.workspace = true bytesize.workspace = true clap.workspace = true ctrlc.workspace = true +dashmap = "6.1.0" delegate.workspace = true educe.workspace = true ipnet.workspace = true @@ -33,6 +34,7 @@ lightway-app-utils.workspace = true lightway-core = { workspace = true, features = ["postquantum"] } metrics.workspace = true metrics-util = "0.19.0" +parking_lot = "0.12.3" pnet.workspace = true ppp = "2.2.0" pwhash = "1.0.0" diff --git a/lightway-server/src/connection.rs b/lightway-server/src/connection.rs index d60f1f7..007caf2 100644 --- a/lightway-server/src/connection.rs +++ b/lightway-server/src/connection.rs @@ -1,8 +1,9 @@ use bytes::BytesMut; use delegate::delegate; +use parking_lot::Mutex; use std::{ net::{Ipv4Addr, SocketAddr}, - sync::{Arc, Mutex, Weak}, + sync::{Arc, Weak}, }; use tracing::{trace, warn}; @@ -80,11 +81,9 @@ impl Connection { conn.lw_conn .lock() - .unwrap() .app_state_mut() .conn .set(Arc::downgrade(&conn)) - .unwrap(); let mut join_set = tokio::task::JoinSet::new(); ticker_task.spawn(Arc::downgrade(&conn), &mut join_set); @@ -93,7 +92,7 @@ impl Connection { } delegate! { - to self.lw_conn.lock().unwrap() { + to self.lw_conn.lock() { pub fn tls_protocol_version(&self) -> ProtocolVersion; pub fn connection_type(&self) -> ConnectionType; pub fn session_id(&self) -> SessionId; @@ -143,7 +142,7 @@ impl Connection { } pub fn begin_session_id_rotation(self: &Arc) { - let mut conn = self.lw_conn.lock().unwrap(); + let mut conn = self.lw_conn.lock(); // A rotation is already in flight, nothing to be done this // time. @@ -169,12 +168,12 @@ impl Connection { // Use this only during shutdown, after clearing all connections from // connection_manager pub fn lw_disconnect(self: Arc) -> ConnectionResult<()> { - self.lw_conn.lock().unwrap().disconnect() + self.lw_conn.lock().disconnect() } pub fn disconnect(&self) -> ConnectionResult<()> { self.manager.remove_connection(self); - self.lw_conn.lock().unwrap().disconnect() + self.lw_conn.lock().disconnect() } } diff --git a/lightway-server/src/connection_manager.rs b/lightway-server/src/connection_manager.rs index a0758c0..84eca93 100644 --- a/lightway-server/src/connection_manager.rs +++ b/lightway-server/src/connection_manager.rs @@ -1,12 +1,13 @@ mod connection_map; +use dashmap::DashMap; use delegate::delegate; +use parking_lot::Mutex; use std::{ - collections::HashMap, net::SocketAddr, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, Mutex, Weak, + Arc, Weak, }, }; use thiserror::Error; @@ -103,7 +104,7 @@ async fn evict_expired_connections(manager: Weak) { pub(crate) struct ConnectionManager { ctx: ServerContext, connections: Mutex>, - pending_session_id_rotations: Mutex>>, + pending_session_id_rotations: DashMap>, /// Total number of sessions there have ever been total_sessions: AtomicUsize, } @@ -214,7 +215,7 @@ impl ConnectionManager { let conn_manager = Arc::new(Self { ctx, connections: Mutex::new(Default::default()), - pending_session_id_rotations: Mutex::new(Default::default()), + pending_session_id_rotations: Default::default(), total_sessions: Default::default(), }); @@ -236,7 +237,7 @@ impl ConnectionManager { } pub(crate) fn pending_session_id_rotations_count(&self) -> usize { - self.pending_session_id_rotations.lock().unwrap().len() + self.pending_session_id_rotations.len() } pub(crate) fn create_streaming_connection( @@ -253,7 +254,7 @@ impl ConnectionManager { outside_io, )?; // TODO: what if addr was already present? - self.connections.lock().unwrap().insert(&conn)?; + self.connections.lock().insert(&conn)?; Ok(conn) } @@ -286,7 +287,7 @@ impl ConnectionManager { where F: FnOnce() -> OutsideIOSendCallbackArg, { - match self.connections.lock().unwrap().lookup(addr, session_id) { + match self.connections.lock().lookup(addr, session_id) { connection_map::Entry::Occupied(c) => { if session_id == SessionId::EMPTY || c.session_id() == session_id { let update_peer_address = addr != c.peer_addr(); @@ -314,8 +315,6 @@ impl ConnectionManager { // Maybe this is a pending session rotation if let Some(c) = self .pending_session_id_rotations - .lock() - .unwrap() .get(&session_id) { let update_peer_address = addr != c.peer_addr(); @@ -333,19 +332,18 @@ impl ConnectionManager { self: &Arc, addr: SocketAddr, ) -> Option> { - self.connections.lock().unwrap().find_by(addr) + self.connections.lock().find_by(addr) } pub(crate) fn set_peer_addr(&self, conn: &Arc, new_addr: SocketAddr) { let old_addr = conn.set_peer_addr(new_addr); self.connections .lock() - .unwrap() .update_socketaddr_for_connection(old_addr, new_addr); } pub(crate) fn remove_connection(&self, conn: &Connection) { - self.connections.lock().unwrap().remove(conn) + self.connections.lock().remove(conn) } pub(crate) fn begin_session_id_rotation( @@ -354,8 +352,6 @@ impl ConnectionManager { new_session_id: SessionId, ) { self.pending_session_id_rotations - .lock() - .unwrap() .insert(new_session_id, conn.clone()); metrics::udp_session_rotation_begin(); @@ -368,12 +364,9 @@ impl ConnectionManager { new: SessionId, ) { self.pending_session_id_rotations - .lock() - .unwrap() .remove(&new); self.connections .lock() - .unwrap() .update_session_id_for_connection(old, new); metrics::udp_session_rotation_finalized(); @@ -382,7 +375,6 @@ impl ConnectionManager { pub(crate) fn online_connection_activity(&self) -> Vec { self.connections .lock() - .unwrap() .iter_connections() .filter_map(|c| match c.state() { State::Online => Some(c.activity()), @@ -395,7 +387,7 @@ impl ConnectionManager { fn evict_idle_connections(&self) { tracing::trace!("Aging connections"); - for conn in self.connections.lock().unwrap().iter_connections() { + for conn in self.connections.lock().iter_connections() { let age = conn.activity().last_outside_data_received.elapsed(); if age > CONNECTION_MAX_IDLE_AGE { tracing::info!(session = ?conn.session_id(), age = ?age, "Disconnecting idle connection"); @@ -415,7 +407,7 @@ impl ConnectionManager { fn evict_expired_connections(&self) { tracing::trace!("Expiring connections"); - for conn in self.connections.lock().unwrap().iter_connections() { + for conn in self.connections.lock().iter_connections() { let Ok(expired) = conn.authentication_expired() else { continue; }; @@ -433,7 +425,7 @@ impl ConnectionManager { } pub(crate) fn close_all_connections(&self) { - let connections = self.connections.lock().unwrap().remove_connections(); + let connections = self.connections.lock().remove_connections(); for conn in connections { let _ = conn.lw_disconnect(); }