Skip to content

Commit

Permalink
io: Make peer_addr fallible (#755)
Browse files Browse the repository at this point in the history
The `io::PeerAddr` trait assumes that the peer addr lookup is infallible,
panicking the process when the call fails. In practice, however, this
call can fail when a system is under load.

This change modifies the `io::PeerAddr` trait to allow this lookup to
fail. When it fails, we propagate this failure to the connection's task.
  • Loading branch information
olix0r authored Nov 30, 2020
1 parent 8cb51ec commit 1e9a001
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 40 deletions.
8 changes: 4 additions & 4 deletions linkerd/io/src/boxed.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{internal::Io, AsyncRead, AsyncWrite, PeerAddr, Poll};
use super::{internal::Io, AsyncRead, AsyncWrite, PeerAddr, Poll, Result};
use bytes::{Buf, BufMut};
use std::{mem::MaybeUninit, pin::Pin, task::Context};

Expand All @@ -15,7 +15,7 @@ impl BoxedIo {
}

impl PeerAddr for BoxedIo {
fn peer_addr(&self) -> std::net::SocketAddr {
fn peer_addr(&self) -> Result<std::net::SocketAddr> {
self.0.peer_addr()
}
}
Expand Down Expand Up @@ -106,8 +106,8 @@ mod tests {
struct WriteBufDetector;

impl PeerAddr for WriteBufDetector {
fn peer_addr(&self) -> std::net::SocketAddr {
([0, 0, 0, 0], 0).into()
fn peer_addr(&self) -> Result<std::net::SocketAddr> {
Ok(([0, 0, 0, 0], 0).into())
}
}

Expand Down
18 changes: 9 additions & 9 deletions linkerd/io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,38 @@ pub use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
pub type Poll<T> = std::task::Poll<Result<T>>;

pub trait PeerAddr {
fn peer_addr(&self) -> SocketAddr;
fn peer_addr(&self) -> Result<SocketAddr>;
}

impl PeerAddr for tokio::net::TcpStream {
fn peer_addr(&self) -> SocketAddr {
tokio::net::TcpStream::peer_addr(self).expect("TcpStream must have a peer address")
fn peer_addr(&self) -> Result<SocketAddr> {
tokio::net::TcpStream::peer_addr(self)
}
}

impl<T: PeerAddr> PeerAddr for tokio_rustls::client::TlsStream<T> {
fn peer_addr(&self) -> SocketAddr {
fn peer_addr(&self) -> Result<SocketAddr> {
self.get_ref().0.peer_addr()
}
}

impl<T: PeerAddr> PeerAddr for tokio_rustls::server::TlsStream<T> {
fn peer_addr(&self) -> SocketAddr {
fn peer_addr(&self) -> Result<SocketAddr> {
self.get_ref().0.peer_addr()
}
}

#[cfg(feature = "tokio-test")]
impl PeerAddr for tokio_test::io::Mock {
fn peer_addr(&self) -> SocketAddr {
([0, 0, 0, 0], 0).into()
fn peer_addr(&self) -> Result<SocketAddr> {
Ok(([0, 0, 0, 0], 0).into())
}
}

#[cfg(feature = "tokio-test")]
impl PeerAddr for tokio::io::DuplexStream {
fn peer_addr(&self) -> SocketAddr {
([0, 0, 0, 0], 0).into()
fn peer_addr(&self) -> Result<SocketAddr> {
Ok(([0, 0, 0, 0], 0).into())
}
}
mod internal {
Expand Down
7 changes: 3 additions & 4 deletions linkerd/io/src/prefixed.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::{internal::Io, PeerAddr, Poll};
use bytes::{Buf, BufMut, Bytes};
use pin_project::pin_project;
use std::{cmp, io};
use std::{mem::MaybeUninit, pin::Pin, task::Context};
use tokio::io::{AsyncRead, AsyncWrite};

use pin_project::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, Result};

/// A TcpStream where the initial reads will be served from `prefix`.
#[pin_project]
Expand All @@ -28,7 +27,7 @@ impl<S: AsyncRead + AsyncWrite> PrefixedIo<S> {
}

impl<S: PeerAddr> PeerAddr for PrefixedIo<S> {
fn peer_addr(&self) -> std::net::SocketAddr {
fn peer_addr(&self) -> Result<std::net::SocketAddr> {
self.io.peer_addr()
}
}
Expand Down
4 changes: 2 additions & 2 deletions linkerd/io/src/sensor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use pin_project::pin_project;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::task::Context;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, Result};

pub trait Sensor {
fn record_read(&mut self, sz: usize);
Expand Down Expand Up @@ -106,7 +106,7 @@ impl<T: Io, S: Sensor + Send> Io for SensorIo<T, S> {
}

impl<T: PeerAddr, S> PeerAddr for SensorIo<T, S> {
fn peer_addr(&self) -> std::net::SocketAddr {
fn peer_addr(&self) -> Result<std::net::SocketAddr> {
self.io.peer_addr()
}
}
40 changes: 19 additions & 21 deletions linkerd/proxy/http/src/detect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,20 @@ where
svc
};

let (svc, closed) = SetClientHandle::new(io.peer_addr(), http1);

let mut conn = self
.server
.clone()
.http1_only(true)
.serve_connection(
io,
// Enable support for HTTP upgrades (CONNECT and websockets).
HyperServerSvc::new(upgrade::Service::new(svc, self.drain.clone())),
)
.with_upgrades();

let mut server = self.server.clone();
let drain = self.drain.clone();
Box::pin(async move {
let (svc, closed) = SetClientHandle::new(io.peer_addr()?, http1);

let mut conn = server
.http1_only(true)
.serve_connection(
io,
// Enable support for HTTP upgrades (CONNECT and websockets).
HyperServerSvc::new(upgrade::Service::new(svc, drain.clone())),
)
.with_upgrades();

tokio::select! {
res = &mut conn => {
debug!(?res, "The client is shutting down the connection");
Expand Down Expand Up @@ -208,16 +207,15 @@ where
svc
};

let (svc, closed) = SetClientHandle::new(io.peer_addr(), h2);

let mut conn = self
.server
.clone()
.http2_only(true)
.serve_connection(io, HyperServerSvc::new(svc));

let mut server = self.server.clone();
let drain = self.drain.clone();
Box::pin(async move {
let (svc, closed) = SetClientHandle::new(io.peer_addr()?, h2);

let mut conn = server
.http2_only(true)
.serve_connection(io, HyperServerSvc::new(svc));

tokio::select! {
res = &mut conn => {
debug!(?res, "The client is shutting down the connection");
Expand Down

0 comments on commit 1e9a001

Please sign in to comment.