From e440137dcc88d0d548abb45aa2c6c81300e8067a Mon Sep 17 00:00:00 2001 From: Riccardo Zaglia Date: Thu, 13 Jul 2023 17:32:49 +0800 Subject: [PATCH] Progress on sync sockets (26) --- alvr/sockets/src/stream_socket/mod.rs | 47 ++++++++++++--------------- alvr/sockets/src/stream_socket/tcp.rs | 46 +++++++++++++++++--------- alvr/sockets/src/stream_socket/udp.rs | 13 +++++--- 3 files changed, 59 insertions(+), 47 deletions(-) diff --git a/alvr/sockets/src/stream_socket/mod.rs b/alvr/sockets/src/stream_socket/mod.rs index 01b12c1282..f862b47220 100644 --- a/alvr/sockets/src/stream_socket/mod.rs +++ b/alvr/sockets/src/stream_socket/mod.rs @@ -24,7 +24,6 @@ use std::{ time::Duration, }; use tcp::{TcpStreamReceiveSocket, TcpStreamSendSocket}; -use tokio::time; use tokio::{net, runtime::Runtime}; use udp::{UdpStreamReceiveSocket, UdpStreamSendSocket}; @@ -323,16 +322,18 @@ impl StreamSocketBuilder { recv_buffer_bytes: SocketBufferSize, ) -> StrResult { Ok(match stream_socket_config { - SocketProtocol::Udp => StreamSocketBuilder::Udp(runtime.block_on(udp::bind( + SocketProtocol::Udp => StreamSocketBuilder::Udp(udp::bind( + runtime, port, send_buffer_bytes, recv_buffer_bytes, - ))?), - SocketProtocol::Tcp => StreamSocketBuilder::Tcp(runtime.block_on(tcp::bind( + )?), + SocketProtocol::Tcp => StreamSocketBuilder::Tcp(tcp::bind( + runtime, port, send_buffer_bytes, recv_buffer_bytes, - ))?), + )?), }) } @@ -346,8 +347,7 @@ impl StreamSocketBuilder { ) -> ConResult { let (send_socket, receive_socket) = match self { StreamSocketBuilder::Udp(socket) => { - let (send_socket, receive_socket) = - udp::connect(socket, server_ip, port).map_err(to_con_e!())?; + let (send_socket, receive_socket) = udp::connect(socket, server_ip, port); ( StreamSendSocket::Udp(send_socket), @@ -355,14 +355,8 @@ impl StreamSocketBuilder { ) } StreamSocketBuilder::Tcp(listener) => { - let (send_socket, receive_socket) = runtime.block_on(async { - tokio::select! { - res = tcp::accept_from_server(listener, server_ip) => { - res.map_err(to_con_e!()) - }, - _ = time::sleep(timeout) => alvr_common::timeout(), - } - })?; + let (send_socket, receive_socket) = + tcp::accept_from_server(runtime, timeout, listener, server_ip)?; ( StreamSendSocket::Tcp(send_socket), @@ -392,25 +386,24 @@ impl StreamSocketBuilder { ) -> ConResult { let (send_socket, receive_socket) = match protocol { SocketProtocol::Udp => { - let socket = runtime - .block_on(udp::bind(port, send_buffer_bytes, recv_buffer_bytes)) + let socket = udp::bind(runtime, port, send_buffer_bytes, recv_buffer_bytes) .map_err(to_con_e!())?; - let (send_socket, receive_socket) = - udp::connect(socket, client_ip, port).map_err(to_con_e!())?; + let (send_socket, receive_socket) = udp::connect(socket, client_ip, port); + ( StreamSendSocket::Udp(send_socket), StreamReceiveSocket::Udp(receive_socket), ) } SocketProtocol::Tcp => { - let (send_socket, receive_socket) = runtime.block_on(async { - tokio::select! { - res = tcp::connect_to_client(client_ip, port, send_buffer_bytes, recv_buffer_bytes) => { - res.map_err(to_con_e!()) - }, - _ = time::sleep(timeout) => alvr_common::timeout(), - } - })?; + let (send_socket, receive_socket) = tcp::connect_to_client( + runtime, + timeout, + client_ip, + port, + send_buffer_bytes, + recv_buffer_bytes, + )?; ( StreamSendSocket::Tcp(send_socket), diff --git a/alvr/sockets/src/stream_socket/tcp.rs b/alvr/sockets/src/stream_socket/tcp.rs index 2537e1cb73..a3f840b0d0 100644 --- a/alvr/sockets/src/stream_socket/tcp.rs +++ b/alvr/sockets/src/stream_socket/tcp.rs @@ -22,12 +22,15 @@ use tokio_util::codec::Framed; pub type TcpStreamSendSocket = Arc, Bytes>>>; pub type TcpStreamReceiveSocket = SplitStream>; -pub async fn bind( +pub fn bind( + runtime: &Runtime, port: u16, send_buffer_bytes: SocketBufferSize, recv_buffer_bytes: SocketBufferSize, ) -> StrResult { - let socket = TcpListener::bind((LOCAL_IP, port)).await.map_err(err!())?; + let socket = runtime + .block_on(TcpListener::bind((LOCAL_IP, port))) + .map_err(err!())?; let socket = socket2::Socket::from(socket.into_std().map_err(err!())?); super::set_socket_buffers(&socket, send_buffer_bytes, recv_buffer_bytes).ok(); @@ -35,38 +38,51 @@ pub async fn bind( TcpListener::from_std(socket.into()).map_err(err!()) } -pub async fn accept_from_server( +pub fn accept_from_server( + runtime: &Runtime, + timeout: Duration, listener: TcpListener, server_ip: IpAddr, -) -> StrResult<(TcpStreamSendSocket, TcpStreamReceiveSocket)> { - let (socket, server_address) = listener.accept().await.map_err(err!())?; +) -> ConResult<(TcpStreamSendSocket, TcpStreamReceiveSocket)> { + let (socket, server_address) = runtime.block_on(async { + tokio::select! { + res = listener.accept() => res.map_err(to_con_e!()), + _ = time::sleep(timeout) => alvr_common::timeout(), + } + })?; if server_address.ip() != server_ip { - return fmt_e!("Connected to wrong client: {server_address} != {server_ip}"); + return con_fmt_e!("Connected to wrong client: {server_address} != {server_ip}"); } - socket.set_nodelay(true).map_err(err!())?; + socket.set_nodelay(true).map_err(to_con_e!())?; let socket = Framed::new(socket, Ldc::new()); let (send_socket, receive_socket) = socket.split(); Ok((Arc::new(Mutex::new(send_socket)), receive_socket)) } -pub async fn connect_to_client( +pub fn connect_to_client( + runtime: &Runtime, + timeout: Duration, client_ip: IpAddr, port: u16, send_buffer_bytes: SocketBufferSize, recv_buffer_bytes: SocketBufferSize, -) -> StrResult<(TcpStreamSendSocket, TcpStreamReceiveSocket)> { - let socket = TcpStream::connect((client_ip, port)) - .await - .map_err(err!())?; - let socket = socket2::Socket::from(socket.into_std().map_err(err!())?); +) -> ConResult<(TcpStreamSendSocket, TcpStreamReceiveSocket)> { + let socket = runtime.block_on(async { + tokio::select! { + res = TcpStream::connect((client_ip, port)) => res.map_err(to_con_e!()), + _ = time::sleep(timeout) => alvr_common::timeout(), + } + })?; + + let socket = socket2::Socket::from(socket.into_std().map_err(to_con_e!())?); super::set_socket_buffers(&socket, send_buffer_bytes, recv_buffer_bytes).ok(); - let socket = TcpStream::from_std(socket.into()).map_err(err!())?; - socket.set_nodelay(true).map_err(err!())?; + let socket = TcpStream::from_std(socket.into()).map_err(to_con_e!())?; + socket.set_nodelay(true).map_err(to_con_e!())?; let socket = Framed::new(socket, Ldc::new()); let (send_socket, receive_socket) = socket.split(); diff --git a/alvr/sockets/src/stream_socket/udp.rs b/alvr/sockets/src/stream_socket/udp.rs index 2b8ac3aca7..bb39c9e3a0 100644 --- a/alvr/sockets/src/stream_socket/udp.rs +++ b/alvr/sockets/src/stream_socket/udp.rs @@ -31,12 +31,15 @@ pub struct UdpStreamReceiveSocket { // Create tokio socket, convert to socket2, apply settings, convert back to tokio. This is done to // let tokio set all the internal parameters it needs from the start. -pub async fn bind( +pub fn bind( + runtime: &Runtime, port: u16, send_buffer_bytes: SocketBufferSize, recv_buffer_bytes: SocketBufferSize, ) -> StrResult { - let socket = UdpSocket::bind((LOCAL_IP, port)).await.map_err(err!())?; + let socket = runtime + .block_on(UdpSocket::bind((LOCAL_IP, port))) + .map_err(err!())?; let socket = socket2::Socket::from(socket.into_std().map_err(err!())?); super::set_socket_buffers(&socket, send_buffer_bytes, recv_buffer_bytes).ok(); @@ -48,12 +51,12 @@ pub fn connect( socket: UdpSocket, peer_ip: IpAddr, port: u16, -) -> StrResult<(UdpStreamSendSocket, UdpStreamReceiveSocket)> { +) -> (UdpStreamSendSocket, UdpStreamReceiveSocket) { let peer_addr = (peer_ip, port).into(); let socket = UdpFramed::new(socket, Ldc::new()); let (send_socket, receive_socket) = socket.split(); - Ok(( + ( UdpStreamSendSocket { peer_addr, inner: Arc::new(Mutex::new(send_socket)), @@ -62,7 +65,7 @@ pub fn connect( peer_addr, inner: receive_socket, }, - )) + ) } pub fn recv(