Skip to content

Commit

Permalink
Add channel_buffer_size to config
Browse files Browse the repository at this point in the history
  • Loading branch information
EpicEric committed Dec 8, 2024
1 parent f164e96 commit 5df22e9
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 17 deletions.
5 changes: 2 additions & 3 deletions russh/src/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ pub use channel_ref::ChannelRef;
mod channel_stream;
pub use channel_stream::ChannelStream;

pub static CHANNEL_BUFFER_SIZE: usize = 100;

#[derive(Debug)]
#[non_exhaustive]
/// Possible messages that [Channel::wait] can receive.
Expand Down Expand Up @@ -162,8 +160,9 @@ impl<S: From<(ChannelId, ChannelMsg)> + Send + Sync + 'static> Channel<S> {
sender: Sender<S>,
max_packet_size: u32,
window_size: u32,
channel_buffer_size: usize,
) -> (Self, ChannelRef) {
let (tx, rx) = tokio::sync::mpsc::channel(CHANNEL_BUFFER_SIZE);
let (tx, rx) = tokio::sync::mpsc::channel(channel_buffer_size);
let window_size = WindowSizeRef::new(window_size);

(
Expand Down
1 change: 1 addition & 0 deletions russh/src/client/encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,7 @@ impl Session {
self.inbound_channel_sender.clone(),
msg.recipient_maximum_packet_size,
msg.recipient_window_size,
self.common.config.channel_buffer_size,
);

self.channels.insert(id, channel_ref);
Expand Down
16 changes: 11 additions & 5 deletions russh/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use tokio::sync::mpsc::{
};
use tokio::sync::oneshot;

use crate::channels::{Channel, ChannelMsg, ChannelRef, WindowSizeRef, CHANNEL_BUFFER_SIZE};
use crate::channels::{Channel, ChannelMsg, ChannelRef, WindowSizeRef};
use crate::cipher::{self, clear, CipherPair, OpeningKey};
use crate::keys::key::parse_public_key;
use crate::session::{
Expand Down Expand Up @@ -224,6 +224,7 @@ pub struct Handle<H: Handler> {
sender: Sender<Msg>,
receiver: UnboundedReceiver<Reply>,
join: russh_util::runtime::JoinHandle<Result<(), H::Error>>,
channel_buffer_size: usize,
}

impl<H: Handler> Drop for Handle<H> {
Expand Down Expand Up @@ -466,7 +467,7 @@ impl<H: Handler> Handle<H> {
/// usable when it's confirmed by the server, as indicated by the
/// `confirmed` field of the corresponding `Channel`.
pub async fn channel_open_session(&self) -> Result<Channel<Msg>, crate::Error> {
let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();

Expand All @@ -484,7 +485,7 @@ impl<H: Handler> Handle<H> {
originator_address: A,
originator_port: u32,
) -> Result<Channel<Msg>, crate::Error> {
let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();

Expand Down Expand Up @@ -515,7 +516,7 @@ impl<H: Handler> Handle<H> {
originator_address: B,
originator_port: u32,
) -> Result<Channel<Msg>, crate::Error> {
let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();

Expand All @@ -537,7 +538,7 @@ impl<H: Handler> Handle<H> {
&self,
socket_path: S,
) -> Result<Channel<Msg>, crate::Error> {
let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();

Expand Down Expand Up @@ -749,6 +750,7 @@ where
config.maximum_packet_size
);
}
let channel_buffer_size = config.channel_buffer_size;
let mut session = Session::new(
config.window_size,
CommonSession {
Expand Down Expand Up @@ -789,6 +791,7 @@ where
sender: handle_sender,
receiver: handle_receiver,
join,
channel_buffer_size,
})
}

Expand Down Expand Up @@ -1472,6 +1475,8 @@ pub struct Config {
pub window_size: u32,
/// The maximal size of a single packet.
pub maximum_packet_size: u32,
/// Buffer size for created channels.
pub channel_buffer_size: usize,
/// Lists of preferred algorithms.
pub preferred: negotiation::Preferred,
/// Time after which the connection is garbage-collected.
Expand All @@ -1495,6 +1500,7 @@ impl Default for Config {
limits: Limits::default(),
window_size: 2097152,
maximum_packet_size: 32768,
channel_buffer_size: 100,
preferred: Default::default(),
inactivity_timeout: None,
keepalive_interval: None,
Expand Down
1 change: 1 addition & 0 deletions russh/src/server/encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,7 @@ impl Session {
self.sender.sender.clone(),
channel_params.recipient_maximum_packet_size,
channel_params.recipient_window_size,
self.common.config.channel_buffer_size,
);

match &msg.typ {
Expand Down
11 changes: 9 additions & 2 deletions russh/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub struct Config {
pub window_size: u32,
/// The maximal size of a single packet.
pub maximum_packet_size: u32,
/// Buffer size for created channels.
pub channel_buffer_size: usize,
/// Internal event buffer size
pub event_buffer_size: usize,
/// Lists of preferred algorithms.
Expand Down Expand Up @@ -108,6 +110,7 @@ impl Default for Config {
keys: Vec::new(),
window_size: 2097152,
maximum_packet_size: 32768,
channel_buffer_size: 100,
event_buffer_size: 10,
limits: Limits::default(),
preferred: Default::default(),
Expand All @@ -134,6 +137,7 @@ impl Debug for Config {
.field("keys", &"***")
.field("window_size", &self.window_size)
.field("maximum_packet_size", &self.maximum_packet_size)
.field("channel_buffer_size", &self.channel_buffer_size)
.field("event_buffer_size", &self.event_buffer_size)
.field("limits", &self.limits)
.field("preferred", &self.preferred)
Expand Down Expand Up @@ -751,7 +755,7 @@ pub trait Server {
let handler = self.new_client(socket.peer_addr().ok());
let error_tx = error_tx.clone();
russh_util::runtime::spawn(async move {
let session = match run_stream(config, socket, handler).await {
let session = match run_stream(config, socket, handler).await {
Ok(s) => s,
Err(e) => {
debug!("Connection setup failed");
Expand Down Expand Up @@ -856,8 +860,11 @@ where
// Reading SSH id and allocating a session.
let mut stream = SshRead::new(stream);
let (sender, receiver) = tokio::sync::mpsc::channel(config.event_buffer_size);
let handle = server::session::Handle {
sender,
channel_buffer_size: config.channel_buffer_size,
};
let common = read_ssh_id(config, &mut stream).await?;
let handle = server::session::Handle { sender };
let session = Session {
target_window_size: common.config.window_size,
common,
Expand Down
15 changes: 8 additions & 7 deletions russh/src/server/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot;

use super::*;
use crate::channels::{Channel, ChannelMsg, ChannelRef, CHANNEL_BUFFER_SIZE};
use crate::channels::{Channel, ChannelMsg, ChannelRef};
use crate::kex::EXTENSION_SUPPORT_AS_CLIENT;
use crate::msg;

Expand Down Expand Up @@ -90,6 +90,7 @@ impl From<(ChannelId, ChannelMsg)> for Msg {
/// the request/response cycle.
pub struct Handle {
pub(crate) sender: Sender<Msg>,
pub(crate) channel_buffer_size: usize,
}

impl Handle {
Expand Down Expand Up @@ -217,7 +218,7 @@ impl Handle {
/// confirmed that it allows agent forwarding. See
/// [PROTOCOL.agent](https://datatracker.ietf.org/doc/html/draft-miller-ssh-agent).
pub async fn channel_open_agent(&self) -> Result<Channel<Msg>, Error> {
let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();

Expand All @@ -236,7 +237,7 @@ impl Handle {
/// usable when it's confirmed by the server, as indicated by the
/// `confirmed` field of the corresponding `Channel`.
pub async fn channel_open_session(&self) -> Result<Channel<Msg>, Error> {
let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();

Expand All @@ -261,7 +262,7 @@ impl Handle {
originator_address: B,
originator_port: u32,
) -> Result<Channel<Msg>, Error> {
let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();

Expand All @@ -286,7 +287,7 @@ impl Handle {
originator_address: B,
originator_port: u32,
) -> Result<Channel<Msg>, Error> {
let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();

Expand All @@ -308,7 +309,7 @@ impl Handle {
&self,
server_socket_path: A,
) -> Result<Channel<Msg>, Error> {
let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();

Expand All @@ -328,7 +329,7 @@ impl Handle {
originator_address: A,
originator_port: u32,
) -> Result<Channel<Msg>, Error> {
let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
let (sender, receiver) = channel(self.channel_buffer_size);
let channel_ref = ChannelRef::new(sender);
let window_size_ref = channel_ref.window_size().clone();

Expand Down

0 comments on commit 5df22e9

Please sign in to comment.