Skip to content

Commit

Permalink
Progress on sync sockets (26)
Browse files Browse the repository at this point in the history
  • Loading branch information
zmerp committed Jul 13, 2023
1 parent 9f3308f commit e440137
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 47 deletions.
47 changes: 20 additions & 27 deletions alvr/sockets/src/stream_socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::{
time::Duration,
};
use tcp::{TcpStreamReceiveSocket, TcpStreamSendSocket};
use tokio::time;
use tokio::{net, runtime::Runtime};
use udp::{UdpStreamReceiveSocket, UdpStreamSendSocket};

Expand Down Expand Up @@ -323,16 +322,18 @@ impl StreamSocketBuilder {
recv_buffer_bytes: SocketBufferSize,
) -> StrResult<Self> {
Ok(match stream_socket_config {
SocketProtocol::Udp => StreamSocketBuilder::Udp(runtime.block_on(udp::bind(
SocketProtocol::Udp => StreamSocketBuilder::Udp(udp::bind(
runtime,
port,
send_buffer_bytes,
recv_buffer_bytes,
))?),
SocketProtocol::Tcp => StreamSocketBuilder::Tcp(runtime.block_on(tcp::bind(
)?),
SocketProtocol::Tcp => StreamSocketBuilder::Tcp(tcp::bind(
runtime,
port,
send_buffer_bytes,
recv_buffer_bytes,
))?),
)?),
})
}

Expand All @@ -346,23 +347,16 @@ impl StreamSocketBuilder {
) -> ConResult<StreamSocket> {
let (send_socket, receive_socket) = match self {
StreamSocketBuilder::Udp(socket) => {
let (send_socket, receive_socket) =
udp::connect(socket, server_ip, port).map_err(to_con_e!())?;
let (send_socket, receive_socket) = udp::connect(socket, server_ip, port);

(
StreamSendSocket::Udp(send_socket),
StreamReceiveSocket::Udp(receive_socket),
)
}
StreamSocketBuilder::Tcp(listener) => {
let (send_socket, receive_socket) = runtime.block_on(async {
tokio::select! {
res = tcp::accept_from_server(listener, server_ip) => {
res.map_err(to_con_e!())
},
_ = time::sleep(timeout) => alvr_common::timeout(),
}
})?;
let (send_socket, receive_socket) =
tcp::accept_from_server(runtime, timeout, listener, server_ip)?;

(
StreamSendSocket::Tcp(send_socket),
Expand Down Expand Up @@ -392,25 +386,24 @@ impl StreamSocketBuilder {
) -> ConResult<StreamSocket> {
let (send_socket, receive_socket) = match protocol {
SocketProtocol::Udp => {
let socket = runtime
.block_on(udp::bind(port, send_buffer_bytes, recv_buffer_bytes))
let socket = udp::bind(runtime, port, send_buffer_bytes, recv_buffer_bytes)
.map_err(to_con_e!())?;
let (send_socket, receive_socket) =
udp::connect(socket, client_ip, port).map_err(to_con_e!())?;
let (send_socket, receive_socket) = udp::connect(socket, client_ip, port);

(
StreamSendSocket::Udp(send_socket),
StreamReceiveSocket::Udp(receive_socket),
)
}
SocketProtocol::Tcp => {
let (send_socket, receive_socket) = runtime.block_on(async {
tokio::select! {
res = tcp::connect_to_client(client_ip, port, send_buffer_bytes, recv_buffer_bytes) => {
res.map_err(to_con_e!())
},
_ = time::sleep(timeout) => alvr_common::timeout(),
}
})?;
let (send_socket, receive_socket) = tcp::connect_to_client(
runtime,
timeout,
client_ip,
port,
send_buffer_bytes,
recv_buffer_bytes,
)?;

(
StreamSendSocket::Tcp(send_socket),
Expand Down
46 changes: 31 additions & 15 deletions alvr/sockets/src/stream_socket/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,51 +22,67 @@ use tokio_util::codec::Framed;
pub type TcpStreamSendSocket = Arc<Mutex<SplitSink<Framed<TcpStream, Ldc>, Bytes>>>;
pub type TcpStreamReceiveSocket = SplitStream<Framed<TcpStream, Ldc>>;

pub async fn bind(
pub fn bind(
runtime: &Runtime,
port: u16,
send_buffer_bytes: SocketBufferSize,
recv_buffer_bytes: SocketBufferSize,
) -> StrResult<TcpListener> {
let socket = TcpListener::bind((LOCAL_IP, port)).await.map_err(err!())?;
let socket = runtime
.block_on(TcpListener::bind((LOCAL_IP, port)))
.map_err(err!())?;
let socket = socket2::Socket::from(socket.into_std().map_err(err!())?);

super::set_socket_buffers(&socket, send_buffer_bytes, recv_buffer_bytes).ok();

TcpListener::from_std(socket.into()).map_err(err!())
}

pub async fn accept_from_server(
pub fn accept_from_server(
runtime: &Runtime,
timeout: Duration,
listener: TcpListener,
server_ip: IpAddr,
) -> StrResult<(TcpStreamSendSocket, TcpStreamReceiveSocket)> {
let (socket, server_address) = listener.accept().await.map_err(err!())?;
) -> ConResult<(TcpStreamSendSocket, TcpStreamReceiveSocket)> {
let (socket, server_address) = runtime.block_on(async {
tokio::select! {
res = listener.accept() => res.map_err(to_con_e!()),
_ = time::sleep(timeout) => alvr_common::timeout(),
}
})?;

if server_address.ip() != server_ip {
return fmt_e!("Connected to wrong client: {server_address} != {server_ip}");
return con_fmt_e!("Connected to wrong client: {server_address} != {server_ip}");
}

socket.set_nodelay(true).map_err(err!())?;
socket.set_nodelay(true).map_err(to_con_e!())?;
let socket = Framed::new(socket, Ldc::new());
let (send_socket, receive_socket) = socket.split();

Ok((Arc::new(Mutex::new(send_socket)), receive_socket))
}

pub async fn connect_to_client(
pub fn connect_to_client(
runtime: &Runtime,
timeout: Duration,
client_ip: IpAddr,
port: u16,
send_buffer_bytes: SocketBufferSize,
recv_buffer_bytes: SocketBufferSize,
) -> StrResult<(TcpStreamSendSocket, TcpStreamReceiveSocket)> {
let socket = TcpStream::connect((client_ip, port))
.await
.map_err(err!())?;
let socket = socket2::Socket::from(socket.into_std().map_err(err!())?);
) -> ConResult<(TcpStreamSendSocket, TcpStreamReceiveSocket)> {
let socket = runtime.block_on(async {
tokio::select! {
res = TcpStream::connect((client_ip, port)) => res.map_err(to_con_e!()),
_ = time::sleep(timeout) => alvr_common::timeout(),
}
})?;

let socket = socket2::Socket::from(socket.into_std().map_err(to_con_e!())?);

super::set_socket_buffers(&socket, send_buffer_bytes, recv_buffer_bytes).ok();

let socket = TcpStream::from_std(socket.into()).map_err(err!())?;
socket.set_nodelay(true).map_err(err!())?;
let socket = TcpStream::from_std(socket.into()).map_err(to_con_e!())?;
socket.set_nodelay(true).map_err(to_con_e!())?;
let socket = Framed::new(socket, Ldc::new());
let (send_socket, receive_socket) = socket.split();

Expand Down
13 changes: 8 additions & 5 deletions alvr/sockets/src/stream_socket/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ pub struct UdpStreamReceiveSocket {

// Create tokio socket, convert to socket2, apply settings, convert back to tokio. This is done to
// let tokio set all the internal parameters it needs from the start.
pub async fn bind(
pub fn bind(
runtime: &Runtime,
port: u16,
send_buffer_bytes: SocketBufferSize,
recv_buffer_bytes: SocketBufferSize,
) -> StrResult<UdpSocket> {
let socket = UdpSocket::bind((LOCAL_IP, port)).await.map_err(err!())?;
let socket = runtime
.block_on(UdpSocket::bind((LOCAL_IP, port)))
.map_err(err!())?;
let socket = socket2::Socket::from(socket.into_std().map_err(err!())?);

super::set_socket_buffers(&socket, send_buffer_bytes, recv_buffer_bytes).ok();
Expand All @@ -48,12 +51,12 @@ pub fn connect(
socket: UdpSocket,
peer_ip: IpAddr,
port: u16,
) -> StrResult<(UdpStreamSendSocket, UdpStreamReceiveSocket)> {
) -> (UdpStreamSendSocket, UdpStreamReceiveSocket) {
let peer_addr = (peer_ip, port).into();
let socket = UdpFramed::new(socket, Ldc::new());
let (send_socket, receive_socket) = socket.split();

Ok((
(
UdpStreamSendSocket {
peer_addr,
inner: Arc::new(Mutex::new(send_socket)),
Expand All @@ -62,7 +65,7 @@ pub fn connect(
peer_addr,
inner: receive_socket,
},
))
)
}

pub fn recv(
Expand Down

0 comments on commit e440137

Please sign in to comment.