diff --git a/.cargo/config.toml b/.cargo/config.toml index bda566bd..e08b7963 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,6 @@ [target.aarch64-unknown-linux-gnu] linker = "aarch64-linux-gnu-gcc" runner = ["qemu-aarch64-static"] # use qemu user emulation for cargo run and test + +[build] +rustflags = ["-C", "target-cpu=native"] \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 37072e91..94c389b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -591,6 +591,20 @@ dependencies = [ "syn 2.0.93", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "delegate" version = "0.12.0" @@ -959,6 +973,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -1266,6 +1286,7 @@ dependencies = [ "more-asserts", "num_enum", "once_cell", + "parking_lot", "pnet", "rand", "rand_core", @@ -1297,6 +1318,7 @@ dependencies = [ "bytesize", "clap", "ctrlc", + "dashmap", "delegate", "educe", "ipnet", @@ -1307,6 +1329,7 @@ dependencies = [ "metrics", "metrics-util", "more-asserts", + "parking_lot", "pnet", "ppp", "pwhash", @@ -1332,6 +1355,16 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.22" @@ -1569,6 +1602,29 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "paste" version = "1.0.15" @@ -1857,6 +1913,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_syscall" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.11.1" @@ -1925,6 +1990,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.217" diff --git a/lightway-core/Cargo.toml b/lightway-core/Cargo.toml index 6f80f359..5d02d7a3 100644 --- a/lightway-core/Cargo.toml +++ b/lightway-core/Cargo.toml @@ -29,6 +29,7 @@ metrics.workspace = true more-asserts.workspace = true num_enum = "0.7.0" once_cell = "1.19.0" +parking_lot = "0.12" pnet.workspace = true rand.workspace = true rand_core = "0.6.4" diff --git a/lightway-core/src/connection.rs b/lightway-core/src/connection.rs index b1cd608f..1ec7f08c 100644 --- a/lightway-core/src/connection.rs +++ b/lightway-core/src/connection.rs @@ -6,13 +6,14 @@ mod io_adapter; mod key_update; use bytes::{Bytes, BytesMut}; +use parking_lot::Mutex; use rand::Rng; use std::borrow::Cow; use std::net::AddrParseError; use std::num::{NonZeroU16, Wrapping}; use std::{ net::SocketAddr, - sync::{Arc, Mutex}, + sync::Arc, time::{Duration, Instant}, }; use thiserror::Error; @@ -936,7 +937,7 @@ impl Connection { ref mut pending_session_id, .. } => { - let new_session_id = rng.lock().unwrap().gen(); + let new_session_id = rng.lock().gen(); self.session.io_cb_mut().set_session_id(new_session_id); diff --git a/lightway-core/src/connection/builders.rs b/lightway-core/src/connection/builders.rs index 64952efe..cb38dd70 100644 --- a/lightway-core/src/connection/builders.rs +++ b/lightway-core/src/connection/builders.rs @@ -278,7 +278,7 @@ impl<'a, AppState: Send + 'static> ServerConnectionBuilder<'a, AppState> { let auth = ctx.auth.clone(); let ip_pool = ctx.ip_pool.clone(); - let session_id = ctx.rng.lock().unwrap().gen(); + let session_id = ctx.rng.lock().gen(); let outside_mtu = MAX_OUTSIDE_MTU; diff --git a/lightway-core/src/context.rs b/lightway-core/src/context.rs index 24466aea..6368e8a8 100644 --- a/lightway-core/src/context.rs +++ b/lightway-core/src/context.rs @@ -1,8 +1,9 @@ pub mod ip_pool; mod server_auth; +use parking_lot::Mutex; use rand::SeedableRng; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use thiserror::Error; use crate::{ diff --git a/lightway-core/src/utils.rs b/lightway-core/src/utils.rs index 2bb1b57c..3f635a0d 100644 --- a/lightway-core/src/utils.rs +++ b/lightway-core/src/utils.rs @@ -9,17 +9,35 @@ use std::net::Ipv4Addr; use std::ops; use tracing::warn; +#[cfg(target_arch = "x86_64")] +use std::arch::x86_64::*; + +/// Check if AVX2 is available on the current CPU +#[inline(always)] +fn has_avx2() -> bool { + #[cfg(target_arch = "x86_64")] + { + is_x86_feature_detected!("avx2") + } + #[cfg(not(target_arch = "x86_64"))] + { + false + } +} + +/// Validate if a buffer contains a valid IPv4 packet pub(crate) fn ipv4_is_valid_packet(buf: &[u8]) -> bool { - if buf.is_empty() { + if buf.len() < 20 { + // IPv4 header is at least 20 bytes return false; } let first_byte = buf[0]; let ip_version = first_byte >> 4; - ip_version == 4 } -// Structure to calculate incremental checksum +/// Structure to calculate incremental checksum +#[derive(Clone, Copy)] struct Checksum(u16); impl ops::Deref for Checksum { @@ -33,156 +51,290 @@ impl ops::Sub for Checksum { type Output = Checksum; fn sub(self, rhs: u16) -> Checksum { let (n, of) = self.0.overflowing_sub(rhs); - Checksum(match of { - true => n - 1, - false => n, - }) + Checksum(if of { n.wrapping_sub(1) } else { n }) } } +/// Structure to handle checksum updates when modifying IP addresses +struct ChecksumUpdate(Vec<(u16, u16)>); + impl Checksum { - // Based on RFC-1624 [Eqn. 4] + /// Update checksum when replacing one word with another + /// Based on RFC-1624 [Eqn. 4] fn update_word(self, old_word: u16, new_word: u16) -> Self { self - !old_word - new_word } + /// Apply multiple checksum updates + #[allow(unsafe_code)] fn update(self, updates: &ChecksumUpdate) -> Self { - updates.0.iter().fold(self, |c, x| c.update_word(x.0, x.1)) + if has_avx2() { + // SAFETY: AVX2 operations are isolated in this function + unsafe { self.update_avx2(updates) } + } else { + self.update_generic(updates) + } } -} -struct ChecksumUpdate(Vec<(u16, u16)>); + fn update_generic(self, updates: &ChecksumUpdate) -> Self { + updates + .0 + .iter() + .fold(self, |c, &(old, new)| c.update_word(old, new)) + } -impl ChecksumUpdate { - fn from_ipv4_address(old: Ipv4Addr, new: Ipv4Addr) -> Self { - let mut result = vec![]; - let old: [u8; 4] = old.octets(); - let new: [u8; 4] = new.octets(); - for i in 0..2 { - let old_word = u16::from_be_bytes([old[i * 2], old[i * 2 + 1]]); - let new_word = u16::from_be_bytes([new[i * 2], new[i * 2 + 1]]); - result.push((old_word, new_word)); + /// AVX2-accelerated checksum update + #[allow(unsafe_code)] + #[cfg(target_arch = "x86_64")] + #[target_feature(enable = "avx2")] + unsafe fn update_avx2(self, updates: &ChecksumUpdate) -> Self { + let mut sum = u32::from(self.0); + + // Process 8 words at a time using AVX2 + for chunk in updates.0.chunks(8) { + // Pre-allocate with known size + let mut old_words = Vec::with_capacity(8); + let mut new_words = Vec::with_capacity(8); + + // Fill vectors with data or zeros + for i in 0..8 { + if let Some(&(old, new)) = chunk.get(i) { + old_words.push(i32::from(old)); + new_words.push(i32::from(new)); + } else { + old_words.push(0); + new_words.push(0); + } + } + + // SAFETY: Vectors are guaranteed to have exactly 8 elements + unsafe { + // Load data into AVX2 registers + let old_vec = _mm256_set_epi32( + old_words[7], + old_words[6], + old_words[5], + old_words[4], + old_words[3], + old_words[2], + old_words[1], + old_words[0], + ); + let new_vec = _mm256_set_epi32( + new_words[7], + new_words[6], + new_words[5], + new_words[4], + new_words[3], + new_words[2], + new_words[1], + new_words[0], + ); + + // Compute NOT(old) + new using AVX2 + let not_old = _mm256_xor_si256(old_vec, _mm256_set1_epi32(-1)); + let sum_vec = _mm256_add_epi32(not_old, new_vec); + + // Horizontal sum + let hadd = _mm256_hadd_epi32(sum_vec, sum_vec); + let hadd = _mm256_hadd_epi32(hadd, hadd); + + sum = sum.wrapping_add(_mm256_extract_epi32(hadd, 0) as u32); + } } - Self(result) + + // Fold 32-bit sum to 16 bits + while sum > 0xFFFF { + sum = (sum & 0xFFFF) + (sum >> 16); + } + + Checksum(sum as u16) } } -fn tcp_adjust_packet_checksum(mut packet: MutableIpv4Packet, updates: ChecksumUpdate) { - let packet = MutableTcpPacket::new(packet.payload_mut()); - let Some(mut packet) = packet else { - warn!("Invalid packet size (less than Tcp header)!"); - return; - }; +impl ChecksumUpdate { + /// Create checksum update data from IP address change + fn from_ipv4_address(old: Ipv4Addr, new: Ipv4Addr) -> Self { + // The generic version is simpler and safe + Self::from_ipv4_address_generic(old, new) + } - let checksum = Checksum(packet.get_checksum()); - let checksum = checksum.update(&updates); - packet.set_checksum(*checksum); + #[inline] + fn from_ipv4_address_generic(old: Ipv4Addr, new: Ipv4Addr) -> Self { + let old_bytes = old.octets(); + let new_bytes = new.octets(); + + // Convert to u16 pairs for checksum calculation + let old_words = [ + u16::from_be_bytes([old_bytes[0], old_bytes[1]]), + u16::from_be_bytes([old_bytes[2], old_bytes[3]]), + ]; + let new_words = [ + u16::from_be_bytes([new_bytes[0], new_bytes[1]]), + u16::from_be_bytes([new_bytes[2], new_bytes[3]]), + ]; + + Self(vec![ + (old_words[0], new_words[0]), + (old_words[1], new_words[1]), + ]) + } } -fn udp_adjust_packet_checksum(mut packet: MutableIpv4Packet, updates: ChecksumUpdate) { - let packet = MutableUdpPacket::new(packet.payload_mut()); - let Some(mut packet) = packet else { - warn!("Invalid packet size (less than Udp header)!"); +/// Update transport protocol checksums after IP address changes +fn update_transport_checksums(packet: &mut MutableIpv4Packet, updates: ChecksumUpdate) { + // Skip if this is not the first fragment + if packet.get_fragment_offset() != 0 { return; - }; - - let checksum = Checksum(packet.get_checksum()); + } - // UDP checksums are optional, and we should respect that when doing NAT - if *checksum != 0 { - let checksum = checksum.update(&updates); - packet.set_checksum(checksum.0); + match packet.get_next_level_protocol() { + IpNextHeaderProtocols::Tcp => update_tcp_checksum(packet, updates), + IpNextHeaderProtocols::Udp => update_udp_checksum(packet, updates), + IpNextHeaderProtocols::Icmp => {} // ICMP doesn't need checksum update for IP changes + protocol => warn!(protocol = ?protocol, "Unknown protocol, skipping checksum update"), } } -fn ipv4_adjust_packet_checksum(mut packet: MutableIpv4Packet, updates: ChecksumUpdate) { - let checksum = Checksum(packet.get_checksum()); - let checksum = checksum.update(&updates); - packet.set_checksum(*checksum); - - // In case of fragmented packets, TCP/UDP header will be present only in the first fragment. - // So skip updating the checksum, if it is not the first fragment (i.e frag_offset != 0) - if 0 != packet.get_fragment_offset() { - return; +fn update_tcp_checksum(packet: &mut MutableIpv4Packet, updates: ChecksumUpdate) { + if let Some(mut tcp_packet) = MutableTcpPacket::new(packet.payload_mut()) { + let checksum = Checksum(tcp_packet.get_checksum()); + let checksum = checksum.update(&updates); + tcp_packet.set_checksum(*checksum); + } else { + warn!("Invalid packet size (less than TCP header)!"); } +} - let transport_protocol = packet.get_next_level_protocol(); - match transport_protocol { - IpNextHeaderProtocols::Tcp => tcp_adjust_packet_checksum(packet, updates), - IpNextHeaderProtocols::Udp => udp_adjust_packet_checksum(packet, updates), - IpNextHeaderProtocols::Icmp => {} - protocol => { - warn!(protocol = ?protocol, "Unknown protocol, skipping checksum adjust") +fn update_udp_checksum(packet: &mut MutableIpv4Packet, updates: ChecksumUpdate) { + if let Some(mut udp_packet) = MutableUdpPacket::new(packet.payload_mut()) { + let checksum = udp_packet.get_checksum(); + // Only update if checksum is present (not 0) + if checksum != 0 { + let checksum = Checksum(checksum).update(&updates); + udp_packet.set_checksum(*checksum); } + } else { + warn!("Invalid packet size (less than UDP header)!"); } } -/// Utility function to update source ip address in ipv4 packet buffer -/// Nop if buf is not a valid IPv4 packet -pub fn ipv4_update_source(buf: &mut [u8], ip: Ipv4Addr) { - let packet = MutableIpv4Packet::new(buf); - let Some(mut packet) = packet else { - warn!("Invalid packet size (less than Ipv4 header)!"); +/// Update source IP address in an IPv4 packet +pub fn ipv4_update_source(buf: &mut [u8], new_ip: Ipv4Addr) { + if buf.len() < 20 { + warn!("Invalid packet size (less than IPv4 header)!"); return; + } + + // Create packet structure + let mut packet = match MutableIpv4Packet::new(buf) { + Some(p) => p, + None => { + warn!("Failed to create IPv4 packet!"); + return; + } }; - let old = packet.get_source(); - // Set new source only after getting old source ip address - packet.set_source(ip); + // Get old IP before updating + let old_ip = packet.get_source(); + + // Update source IP + packet.set_source(new_ip); + + // Update checksums + let checksum = Checksum(packet.get_checksum()); + let updates = ChecksumUpdate::from_ipv4_address(old_ip, new_ip); + let checksum = checksum.update(&updates); + packet.set_checksum(*checksum); - ipv4_adjust_packet_checksum(packet, ChecksumUpdate::from_ipv4_address(old, ip)); + // Update transport protocol checksums + update_transport_checksums(&mut packet, updates); } -/// Utility function to update destination ip address in ipv4 packet buffer -/// Nop if buf is not a valid IPv4 packet -pub fn ipv4_update_destination(buf: &mut [u8], ip: Ipv4Addr) { - let packet = MutableIpv4Packet::new(buf); - let Some(mut packet) = packet else { - warn!("Invalid packet size (less than Ipv4 header)!"); +/// Update destination IP address in an IPv4 packet +pub fn ipv4_update_destination(buf: &mut [u8], new_ip: Ipv4Addr) { + if buf.len() < 20 { + warn!("Invalid packet size (less than IPv4 header)!"); return; + } + + // Create packet structure + let mut packet = match MutableIpv4Packet::new(buf) { + Some(p) => p, + None => { + warn!("Failed to create IPv4 packet!"); + return; + } }; - let old = packet.get_destination(); - // Set new destination only after getting old destination ip address - packet.set_destination(ip); + // Get old IP before updating + let old_ip = packet.get_destination(); + + // Update destination IP + packet.set_destination(new_ip); + + // Update checksums + let checksum = Checksum(packet.get_checksum()); + let updates = ChecksumUpdate::from_ipv4_address(old_ip, new_ip); + let checksum = checksum.update(&updates); + packet.set_checksum(*checksum); - ipv4_adjust_packet_checksum(packet, ChecksumUpdate::from_ipv4_address(old, ip)); + // Update transport protocol checksums + update_transport_checksums(&mut packet, updates); } -pub fn tcp_clamp_mss(pkt: &mut [u8], mss: u16) -> Option { +/// Clamp TCP MSS option if present in a TCP SYN packet +pub fn tcp_clamp_mss(pkt: &mut [u8], max_mss: u16) -> Option { + // Get IPv4 packet let mut ipv4_packet = MutableIpv4Packet::new(pkt)?; - let transport_protocol = ipv4_packet.get_next_level_protocol(); - if !matches!(transport_protocol, IpNextHeaderProtocols::Tcp) { + // Check if it's TCP + if !matches!( + ipv4_packet.get_next_level_protocol(), + IpNextHeaderProtocols::Tcp + ) { return None; } + // Get TCP packet let mut tcp_packet = MutableTcpPacket::new(ipv4_packet.payload_mut())?; - // Skip if the packet is not TCP SYN packet + // Check if it's a SYN packet if tcp_packet.get_flags() & TcpFlags::SYN == 0 { return None; } let mut option_raw = tcp_packet.get_options_raw_mut(); - // TCP MSS option len is 4, so options lesser than 4 does not have MSS option + + // Process TCP options while option_raw.len() >= 4 { + // Minimum size for MSS option let mut option = MutableTcpOptionPacket::new(option_raw)?; + if option.get_number() == TcpOptionNumbers::MSS { let bytes = option.payload_mut(); let existing_mss = u16::from_be_bytes([bytes[0], bytes[1]]); - // If existing MSS is lesser than clamped value, skip updating - if existing_mss <= mss { + + // Only update if current MSS is higher than maximum + if existing_mss <= max_mss { return None; } - [bytes[0], bytes[1]] = mss.to_be_bytes(); - tcp_adjust_packet_checksum(ipv4_packet, ChecksumUpdate(vec![(existing_mss, mss)])); + // Update MSS + bytes.copy_from_slice(&max_mss.to_be_bytes()); + + // Update TCP checksum + let updates = ChecksumUpdate(vec![(existing_mss, max_mss)]); + update_tcp_checksum(&mut ipv4_packet, updates); + return Some(existing_mss); } - let start = std::cmp::min(option.packet_size(), option_raw.len()); - option_raw = &mut option_raw[start..]; + + // Move to next option + let option_size = std::cmp::min(option.packet_size(), option_raw.len()); + option_raw = &mut option_raw[option_size..]; } + None } diff --git a/lightway-server/Cargo.toml b/lightway-server/Cargo.toml index 02274171..1b98e8a1 100644 --- a/lightway-server/Cargo.toml +++ b/lightway-server/Cargo.toml @@ -24,6 +24,7 @@ bytes.workspace = true bytesize.workspace = true clap.workspace = true ctrlc.workspace = true +dashmap = "6.1.0" delegate.workspace = true educe.workspace = true ipnet.workspace = true @@ -33,6 +34,7 @@ lightway-app-utils.workspace = true lightway-core = { workspace = true, features = ["postquantum"] } metrics.workspace = true metrics-util = "0.18.0" +parking_lot = "0.12.3" pnet.workspace = true ppp = "2.2.0" pwhash = "1.0.0" diff --git a/lightway-server/src/connection.rs b/lightway-server/src/connection.rs index d60f1f74..0683fa34 100644 --- a/lightway-server/src/connection.rs +++ b/lightway-server/src/connection.rs @@ -1,8 +1,9 @@ use bytes::BytesMut; use delegate::delegate; +use parking_lot::Mutex; use std::{ net::{Ipv4Addr, SocketAddr}, - sync::{Arc, Mutex, Weak}, + sync::{Arc, Weak}, }; use tracing::{trace, warn}; @@ -80,11 +81,9 @@ impl Connection { conn.lw_conn .lock() - .unwrap() .app_state_mut() .conn .set(Arc::downgrade(&conn)) - .unwrap(); let mut join_set = tokio::task::JoinSet::new(); ticker_task.spawn(Arc::downgrade(&conn), &mut join_set); @@ -143,7 +142,7 @@ impl Connection { } pub fn begin_session_id_rotation(self: &Arc) { - let mut conn = self.lw_conn.lock().unwrap(); + let mut conn = self.lw_conn.lock(); // A rotation is already in flight, nothing to be done this // time. @@ -169,12 +168,12 @@ impl Connection { // Use this only during shutdown, after clearing all connections from // connection_manager pub fn lw_disconnect(self: Arc) -> ConnectionResult<()> { - self.lw_conn.lock().unwrap().disconnect() + self.lw_conn.lock().disconnect() } pub fn disconnect(&self) -> ConnectionResult<()> { self.manager.remove_connection(self); - self.lw_conn.lock().unwrap().disconnect() + self.lw_conn.lock().disconnect() } } diff --git a/lightway-server/src/connection_manager.rs b/lightway-server/src/connection_manager.rs index a0758c0d..c9534d43 100644 --- a/lightway-server/src/connection_manager.rs +++ b/lightway-server/src/connection_manager.rs @@ -1,12 +1,14 @@ mod connection_map; +use dashmap::DashMap; use delegate::delegate; +use parking_lot::Mutex; use std::{ collections::HashMap, net::SocketAddr, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, Mutex, Weak, + Arc, Weak, }, }; use thiserror::Error; @@ -103,7 +105,7 @@ async fn evict_expired_connections(manager: Weak) { pub(crate) struct ConnectionManager { ctx: ServerContext, connections: Mutex>, - pending_session_id_rotations: Mutex>>, + pending_session_id_rotations: DashMap>, /// Total number of sessions there have ever been total_sessions: AtomicUsize, } @@ -214,7 +216,7 @@ impl ConnectionManager { let conn_manager = Arc::new(Self { ctx, connections: Mutex::new(Default::default()), - pending_session_id_rotations: Mutex::new(Default::default()), + pending_session_id_rotations: Default::default(), total_sessions: Default::default(), }); @@ -236,7 +238,7 @@ impl ConnectionManager { } pub(crate) fn pending_session_id_rotations_count(&self) -> usize { - self.pending_session_id_rotations.lock().unwrap().len() + self.pending_session_id_rotations.len() } pub(crate) fn create_streaming_connection( @@ -314,8 +316,6 @@ impl ConnectionManager { // Maybe this is a pending session rotation if let Some(c) = self .pending_session_id_rotations - .lock() - .unwrap() .get(&session_id) { let update_peer_address = addr != c.peer_addr(); @@ -354,8 +354,6 @@ impl ConnectionManager { new_session_id: SessionId, ) { self.pending_session_id_rotations - .lock() - .unwrap() .insert(new_session_id, conn.clone()); metrics::udp_session_rotation_begin(); @@ -368,8 +366,6 @@ impl ConnectionManager { new: SessionId, ) { self.pending_session_id_rotations - .lock() - .unwrap() .remove(&new); self.connections .lock()