diff --git a/russh/src/channels/mod.rs b/russh/src/channels/mod.rs index 30655fb0..e08cceb6 100644 --- a/russh/src/channels/mod.rs +++ b/russh/src/channels/mod.rs @@ -14,8 +14,6 @@ pub use channel_ref::ChannelRef; mod channel_stream; pub use channel_stream::ChannelStream; -pub static CHANNEL_BUFFER_SIZE: usize = 100; - #[derive(Debug)] #[non_exhaustive] /// Possible messages that [Channel::wait] can receive. @@ -162,8 +160,9 @@ impl + Send + Sync + 'static> Channel { sender: Sender, max_packet_size: u32, window_size: u32, + channel_buffer_size: usize, ) -> (Self, ChannelRef) { - let (tx, rx) = tokio::sync::mpsc::channel(CHANNEL_BUFFER_SIZE); + let (tx, rx) = tokio::sync::mpsc::channel(channel_buffer_size); let window_size = WindowSizeRef::new(window_size); ( diff --git a/russh/src/client/encrypted.rs b/russh/src/client/encrypted.rs index 29ec242c..a4d50ae8 100644 --- a/russh/src/client/encrypted.rs +++ b/russh/src/client/encrypted.rs @@ -891,6 +891,7 @@ impl Session { self.inbound_channel_sender.clone(), msg.recipient_maximum_packet_size, msg.recipient_window_size, + self.common.config.channel_buffer_size, ); self.channels.insert(id, channel_ref); diff --git a/russh/src/client/mod.rs b/russh/src/client/mod.rs index 568a1823..f831f89e 100644 --- a/russh/src/client/mod.rs +++ b/russh/src/client/mod.rs @@ -56,7 +56,7 @@ use tokio::sync::mpsc::{ }; use tokio::sync::oneshot; -use crate::channels::{Channel, ChannelMsg, ChannelRef, WindowSizeRef, CHANNEL_BUFFER_SIZE}; +use crate::channels::{Channel, ChannelMsg, ChannelRef, WindowSizeRef}; use crate::cipher::{self, clear, CipherPair, OpeningKey}; use crate::keys::key::parse_public_key; use crate::session::{ @@ -224,6 +224,7 @@ pub struct Handle { sender: Sender, receiver: UnboundedReceiver, join: russh_util::runtime::JoinHandle>, + channel_buffer_size: usize, } impl Drop for Handle { @@ -466,7 +467,7 @@ impl Handle { /// usable when it's confirmed by the server, as indicated by the /// `confirmed` field of the corresponding `Channel`. pub async fn channel_open_session(&self) -> Result, crate::Error> { - let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); + let (sender, receiver) = channel(self.channel_buffer_size); let channel_ref = ChannelRef::new(sender); let window_size_ref = channel_ref.window_size().clone(); @@ -484,7 +485,7 @@ impl Handle { originator_address: A, originator_port: u32, ) -> Result, crate::Error> { - let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); + let (sender, receiver) = channel(self.channel_buffer_size); let channel_ref = ChannelRef::new(sender); let window_size_ref = channel_ref.window_size().clone(); @@ -515,7 +516,7 @@ impl Handle { originator_address: B, originator_port: u32, ) -> Result, crate::Error> { - let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); + let (sender, receiver) = channel(self.channel_buffer_size); let channel_ref = ChannelRef::new(sender); let window_size_ref = channel_ref.window_size().clone(); @@ -537,7 +538,7 @@ impl Handle { &self, socket_path: S, ) -> Result, crate::Error> { - let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); + let (sender, receiver) = channel(self.channel_buffer_size); let channel_ref = ChannelRef::new(sender); let window_size_ref = channel_ref.window_size().clone(); @@ -749,6 +750,7 @@ where config.maximum_packet_size ); } + let channel_buffer_size = config.channel_buffer_size; let mut session = Session::new( config.window_size, CommonSession { @@ -789,6 +791,7 @@ where sender: handle_sender, receiver: handle_receiver, join, + channel_buffer_size, }) } @@ -1472,6 +1475,8 @@ pub struct Config { pub window_size: u32, /// The maximal size of a single packet. pub maximum_packet_size: u32, + /// Buffer size for created channels. + pub channel_buffer_size: usize, /// Lists of preferred algorithms. pub preferred: negotiation::Preferred, /// Time after which the connection is garbage-collected. @@ -1495,6 +1500,7 @@ impl Default for Config { limits: Limits::default(), window_size: 2097152, maximum_packet_size: 32768, + channel_buffer_size: 100, preferred: Default::default(), inactivity_timeout: None, keepalive_interval: None, diff --git a/russh/src/server/encrypted.rs b/russh/src/server/encrypted.rs index 8d495d6e..b496c85b 100644 --- a/russh/src/server/encrypted.rs +++ b/russh/src/server/encrypted.rs @@ -1226,6 +1226,7 @@ impl Session { self.sender.sender.clone(), channel_params.recipient_maximum_packet_size, channel_params.recipient_window_size, + self.common.config.channel_buffer_size, ); match &msg.typ { diff --git a/russh/src/server/mod.rs b/russh/src/server/mod.rs index 9c41078e..39c43597 100644 --- a/russh/src/server/mod.rs +++ b/russh/src/server/mod.rs @@ -79,6 +79,8 @@ pub struct Config { pub window_size: u32, /// The maximal size of a single packet. pub maximum_packet_size: u32, + /// Buffer size for created channels. + pub channel_buffer_size: usize, /// Internal event buffer size pub event_buffer_size: usize, /// Lists of preferred algorithms. @@ -108,6 +110,7 @@ impl Default for Config { keys: Vec::new(), window_size: 2097152, maximum_packet_size: 32768, + channel_buffer_size: 100, event_buffer_size: 10, limits: Limits::default(), preferred: Default::default(), @@ -134,6 +137,7 @@ impl Debug for Config { .field("keys", &"***") .field("window_size", &self.window_size) .field("maximum_packet_size", &self.maximum_packet_size) + .field("channel_buffer_size", &self.channel_buffer_size) .field("event_buffer_size", &self.event_buffer_size) .field("limits", &self.limits) .field("preferred", &self.preferred) @@ -751,7 +755,7 @@ pub trait Server { let handler = self.new_client(socket.peer_addr().ok()); let error_tx = error_tx.clone(); russh_util::runtime::spawn(async move { - let session = match run_stream(config, socket, handler).await { + let session = match run_stream(config, socket, handler).await { Ok(s) => s, Err(e) => { debug!("Connection setup failed"); @@ -856,8 +860,11 @@ where // Reading SSH id and allocating a session. let mut stream = SshRead::new(stream); let (sender, receiver) = tokio::sync::mpsc::channel(config.event_buffer_size); + let handle = server::session::Handle { + sender, + channel_buffer_size: config.channel_buffer_size, + }; let common = read_ssh_id(config, &mut stream).await?; - let handle = server::session::Handle { sender }; let session = Session { target_window_size: common.config.window_size, common, diff --git a/russh/src/server/session.rs b/russh/src/server/session.rs index 8322966f..30968120 100644 --- a/russh/src/server/session.rs +++ b/russh/src/server/session.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::oneshot; use super::*; -use crate::channels::{Channel, ChannelMsg, ChannelRef, CHANNEL_BUFFER_SIZE}; +use crate::channels::{Channel, ChannelMsg, ChannelRef}; use crate::kex::EXTENSION_SUPPORT_AS_CLIENT; use crate::msg; @@ -90,6 +90,7 @@ impl From<(ChannelId, ChannelMsg)> for Msg { /// the request/response cycle. pub struct Handle { pub(crate) sender: Sender, + pub(crate) channel_buffer_size: usize, } impl Handle { @@ -217,7 +218,7 @@ impl Handle { /// confirmed that it allows agent forwarding. See /// [PROTOCOL.agent](https://datatracker.ietf.org/doc/html/draft-miller-ssh-agent). pub async fn channel_open_agent(&self) -> Result, Error> { - let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); + let (sender, receiver) = channel(self.channel_buffer_size); let channel_ref = ChannelRef::new(sender); let window_size_ref = channel_ref.window_size().clone(); @@ -236,7 +237,7 @@ impl Handle { /// usable when it's confirmed by the server, as indicated by the /// `confirmed` field of the corresponding `Channel`. pub async fn channel_open_session(&self) -> Result, Error> { - let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); + let (sender, receiver) = channel(self.channel_buffer_size); let channel_ref = ChannelRef::new(sender); let window_size_ref = channel_ref.window_size().clone(); @@ -261,7 +262,7 @@ impl Handle { originator_address: B, originator_port: u32, ) -> Result, Error> { - let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); + let (sender, receiver) = channel(self.channel_buffer_size); let channel_ref = ChannelRef::new(sender); let window_size_ref = channel_ref.window_size().clone(); @@ -286,7 +287,7 @@ impl Handle { originator_address: B, originator_port: u32, ) -> Result, Error> { - let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); + let (sender, receiver) = channel(self.channel_buffer_size); let channel_ref = ChannelRef::new(sender); let window_size_ref = channel_ref.window_size().clone(); @@ -308,7 +309,7 @@ impl Handle { &self, server_socket_path: A, ) -> Result, Error> { - let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); + let (sender, receiver) = channel(self.channel_buffer_size); let channel_ref = ChannelRef::new(sender); let window_size_ref = channel_ref.window_size().clone(); @@ -328,7 +329,7 @@ impl Handle { originator_address: A, originator_port: u32, ) -> Result, Error> { - let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); + let (sender, receiver) = channel(self.channel_buffer_size); let channel_ref = ChannelRef::new(sender); let window_size_ref = channel_ref.window_size().clone();