Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ws server: respect max limit for received messages #537

Merged
merged 21 commits into from
Nov 9, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions ws-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::types::{
use futures_channel::mpsc;
use futures_util::io::{BufReader, BufWriter};
use futures_util::stream::StreamExt;
use soketto::connection::Error as SokettoError;
use soketto::handshake::{server::Response, Server as SokettoServer};
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
Expand Down Expand Up @@ -194,6 +195,7 @@ async fn handshake(socket: tokio::net::TcpStream, mode: HandshakeResponse<'_>) -
Ok(())
}
HandshakeResponse::Accept { conn_id, methods, resources, cfg, stop_monitor } => {
tracing::info!("Accepting new connection: {}", conn_id);
let key = {
let req = server.receive_request().await?;
let host_check = cfg.allowed_hosts.verify("Host", Some(req.headers().host));
Expand Down Expand Up @@ -238,7 +240,7 @@ async fn background_task(
conn_id: ConnectionId,
methods: Methods,
resources: Resources,
max_request_body_size: u32,
_max_request_body_size: u32,
stop_server: StopMonitor,
) -> Result<(), Error> {
// And we can finally transition to a websocket background_task.
Expand All @@ -251,8 +253,9 @@ async fn background_task(
while !stop_server2.shutdown_requested() {
match rx.next().await {
Some(response) => {
// TODO: check length of response https://github.com/paritytech/jsonrpsee/issues/536
tracing::debug!("send: {}", response);
let _ = sender.send_text(response).await;
let _ = sender.send_text_owned(response).await;
Copy link
Contributor

@maciejhirsz maciejhirsz Nov 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for skipping a clone :)

let _ = sender.flush().await;
}
None => break,
Expand All @@ -271,17 +274,25 @@ async fn background_task(
while !stop_server.shutdown_requested() {
data.clear();

if let Err(e) = method_executors.select_with(receiver.receive_data(&mut data)).await {
tracing::error!("Could not receive WS data: {:?}; closing connection", e);
tx.close_channel();
return Err(e.into());
}

if data.len() > max_request_body_size as usize {
tracing::warn!("Request is too big ({} bytes, max is {})", data.len(), max_request_body_size);
send_error(Id::Null, &tx, ErrorCode::OversizedRequest.into());
continue;
}
if let Err(err) = method_executors.select_with(receiver.receive_data(&mut data)).await {
match err {
SokettoError::Closed => {
tracing::info!("Remote peer terminated the connection: {}", conn_id);
tx.close_channel();
return Ok(());
}
SokettoError::MessageTooLarge { current, maximum } => {
tracing::warn!("Request is too big ({} bytes, max is {})", current, maximum);
send_error(Id::Null, &tx, ErrorCode::OversizedRequest.into());
}
// NOTE: io::Error might happen if the remove peer terminated connection.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
e @ _ => {
tracing::error!("WS recv error: {:?} => terminate connection {}", e, conn_id);
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
tx.close_channel();
return Err(e.into());
}
}
};

match data.get(0) {
Some(b'{') => {
Expand Down