Skip to content

Commit

Permalink
Update.
Browse files Browse the repository at this point in the history
  • Loading branch information
xOS committed Apr 19, 2022
1 parent f21156b commit 313397c
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "realm"
version = "2.1.2"
version = "2.1.3"
authors = ["zhboner <zhboner@gmail.com>"]
edition = "2021"

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ pub mod conf;
pub mod utils;
pub mod relay;

pub const VERSION: &str = "2.1.2";
pub const VERSION: &str = "2.1.3";
pub const ENV_CONFIG: &str = "REALM_CONF";
40 changes: 18 additions & 22 deletions src/relay/tcp/haproxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use crate::utils;
use crate::utils::HaproxyOpts;
use crate::utils::timeoutfut;

// client -> relay -> server
// TODO: replace the "proxy-protocol" crate, and then avoid heap allocation.

// client -> relay -> server
pub async fn handle_proxy_protocol(
src: &mut TcpStream,
dst: &mut TcpStream,
Expand All @@ -43,34 +44,39 @@ pub async fn handle_proxy_protocol(
// may not get src and dst addr
if accept_proxy {
let buf = buf.write(BytesMut::with_capacity(256));
buf.resize(256, 0);

// FIXME: may not read the entire header

// The receiver may apply a short timeout and decide to
// abort the connection if the protocol header is not seen
// within a few seconds (at least 3 seconds to cover a TCP retransmit).
let n = timeoutfut(src.read_buf(buf), accept_proxy_timeout).await??;
let peek_n = timeoutfut(src.peek(buf), accept_proxy_timeout).await??;

buf.truncate(peek_n);
debug!("[tcp]peek initial {} bytes: {:#x}", peek_n, buf);

let _ = buf.split_off(n);
debug!("[tcp]recv initial {} bytes: {:#x}", n, buf);
let mut slice = buf.as_ref();

let header = parse(buf).map_err(|e| Error::new(ErrorKind::Other, e))?;
debug!("[tcp]proxy-protocol parsed, {} bytes left", buf.remaining());
// slice is advanced
let header =
parse(&mut slice).map_err(|e| Error::new(ErrorKind::Other, e))?;
let parsed_n = peek_n - slice.remaining();
debug!("[tcp]proxy-protocol parsed, {} bytes", parsed_n);

// handle parsed header, and print log
if let Some((src, dst)) = handle_header(header) {
client_addr.write(src);
server_addr.write(dst);
fwd_hdr = true;
}

// header has been parsed
// header has been parsed, remove these bytes from sock buffer.
buf.truncate(parsed_n);
src.read_exact(buf).await?;

// do not send header to server
if !send_proxy {
// write left bytes
if !buf.is_empty() {
debug!("[tcp]send left {} bytes: {:#x}", buf.len(), buf);
dst.write_all(buf).await?;
}
return Ok(());
}
}
Expand Down Expand Up @@ -99,16 +105,6 @@ pub async fn handle_proxy_protocol(
debug!("[tcp]send initial {} bytes: {:#x}", header.len(), &header);
dst.write_all(&header).await?;

// write left bytes
// Safety: buf is initialized, filled with PROXY header
if accept_proxy {
let buf = unsafe { buf.assume_init() };
if !buf.is_empty() {
debug!("[tcp]send left {} bytes: {:?}", buf.len(), &buf);
dst.write_all(&buf).await?;
}
}

Ok(())
}

Expand Down
5 changes: 5 additions & 0 deletions src/relay/tcp/tfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ impl TcpStream {
) -> Result<R> {
inner!(self).try_io(interest, f)
}

#[allow(unused)]
pub async fn peek(&self, buf: &mut [u8]) -> Result<usize> {
inner!(self).peek(buf).await
}
}

impl From<TfoStream> for TcpStream {
Expand Down
2 changes: 1 addition & 1 deletion tests/proxy_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use realm::utils::{Endpoint, ConnectOpts, HaproxyOpts};
use realm::utils::timeoutfut;

#[tokio::test]
async fn proxy_v1() {
async fn proxy_v2() {
env_logger::init();

let endpoint1 = Endpoint {
Expand Down

0 comments on commit 313397c

Please sign in to comment.