Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

outbound: Introduce 'ingress mode' #728

Merged
merged 11 commits into from
Oct 26, 2020
42 changes: 40 additions & 2 deletions linkerd/app/core/src/addr_match.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,48 @@ impl AddrMatch {
&self.nets
}

#[inline]
pub fn matches(&self, addr: &Addr) -> bool {
match addr {
Addr::Name(ref name) => self.names.matches(name.name()),
Addr::Socket(sa) => self.nets.matches(sa.ip()),
Addr::Name(name) => self.names.matches(name.name()),
Addr::Socket(sa) => self.matches_ip(sa.ip()),
}
}

#[inline]
pub fn matches_ip(&self, ip: IpAddr) -> bool {
self.nets.matches(ip)
}
}

impl From<IpMatch> for AddrMatch {
fn from(nets: IpMatch) -> Self {
Self {
nets,
names: NameMatch::new(None),
}
}
}

impl From<NameMatch> for AddrMatch {
fn from(names: NameMatch) -> Self {
Self {
names,
nets: IpMatch::new(None),
}
}
}

impl Into<IpMatch> for AddrMatch {
fn into(self) -> IpMatch {
self.nets
}
}

impl Into<NameMatch> for AddrMatch {
fn into(self) -> NameMatch {
self.names
}
}

// === impl NameMatch ===
Expand All @@ -51,6 +87,7 @@ impl NameMatch {
Self(Arc::new(suffixes.into_iter().collect()))
}

#[inline]
pub fn matches(&self, name: &Name) -> bool {
self.0.iter().any(|sfx| sfx.contains(name))
}
Expand All @@ -63,6 +100,7 @@ impl IpMatch {
Self(Arc::new(nets.into_iter().collect()))
}

#[inline]
pub fn matches(&self, addr: IpAddr) -> bool {
self.0.iter().any(|net| match (net, addr) {
(IpNet::V4(net), IpAddr::V4(ip)) => net.contains(&ip),
Expand Down
204 changes: 204 additions & 0 deletions linkerd/app/outbound/src/ingress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
use crate::{http, stack_labels, tcp, trace_labels, Config};
use linkerd2_app_core::{
config::{ProxyConfig, ServerConfig},
discovery_rejected, drain, errors, http_request_l5d_override_dst_addr, metrics,
opencensus::proto::trace::v1 as oc,
profiles,
spans::SpanConverter,
svc::{self},
transport::{self, io, listen, tls},
Addr, AddrMatch, Error, TraceContext,
};
use tokio::sync::mpsc;
use tracing::info_span;

/// Routes HTTP requests according to the l5d-dst-override header.
///
/// Forwards TCP connections without discovery/routing (or mTLS).
///
/// This is only intended for Ingress configurations, where we assume all
/// outbound traffic is either HTTP or TLS'd by the ingress proxy.
pub fn stack<P, T, TSvc, H, HSvc, I>(
config: &Config,
profiles: P,
tcp: T,
http: H,
metrics: &metrics::Proxy,
span_sink: Option<mpsc::Sender<oc::Span>>,
drain: drain::Watch,
) -> impl svc::NewService<
listen::Addrs,
Service = impl tower::Service<
I,
Response = (),
Error = impl Into<Error>,
Future = impl Send + 'static,
> + Send
+ 'static,
> + Send
+ 'static
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Unpin + Send + 'static,
T: svc::NewService<tcp::Endpoint, Service = TSvc> + Unpin + Clone + Send + Sync + 'static,
TSvc: tower::Service<
io::PrefixedIo<transport::metrics::SensorIo<I>>,
Response = (),
Error = Error,
> + Clone
+ Send
+ 'static,
TSvc::Future: Send,
H: svc::NewService<http::Logical, Service = HSvc> + Unpin + Send + Clone + 'static,
HSvc: tower::Service<
http::Request<http::boxed::Payload>,
Response = http::Response<http::boxed::Payload>,
Error = Error,
> + Send
+ 'static,
HSvc::Future: Send,
P: profiles::GetProfile<Addr> + Unpin + Clone + Send + 'static,
P::Future: Unpin + Send,
P::Error: Send,
{
let Config {
allow_discovery,
proxy:
ProxyConfig {
server: ServerConfig { h2_settings, .. },
dispatch_timeout,
max_in_flight_requests,
detect_protocol_timeout,
buffer_capacity,
cache_max_idle_age,
..
},
} = config.clone();

let http = svc::stack(http)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked closely yet but it feels like there's probably some common code between the ingress-mode and normal mode http stacks that we might want to share. nbd if not, but adding more places that future changes will have to update always worries me a little bit...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but deduping things is easier said than done. If you can get it working, I'll merge it ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure if it makes sense here or not, but i can take a look.

.check_new_service::<http::Logical, http::Request<_>>()
.push_map_target(http::Logical::from)
.push(profiles::discover::layer(
profiles,
AllowHttpProfile(allow_discovery),
))
.check_new_service::<Target, http::Request<_>>()
.cache(
svc::layers().push_on_response(
svc::layers()
.push_failfast(dispatch_timeout)
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age),
),
)
.into_make_service()
.spawn_buffer(buffer_capacity)
.into_new_service()
.check_new_service::<Target, http::Request<_>>()
.push(svc::layer::mk(|inner| {
svc::stack::NewRouter::new(TargetPerRequest::accept, inner)
}))
.check_new_service::<http::Accept, http::Request<_>>()
.push_on_response(
svc::layers()
.box_http_request()
// Limits the number of in-flight requests.
.push_concurrency_limit(max_in_flight_requests)
// Eagerly fail requests when the proxy is out of capacity for a
// dispatch_timeout.
.push_failfast(dispatch_timeout)
.push(metrics.http_errors.clone())
// Synthesizes responses for proxy errors.
.push(errors::layer())
// Initiates OpenCensus tracing.
.push(TraceContext::layer(span_sink.clone().map(|span_sink| {
SpanConverter::server(span_sink, trace_labels())
})))
.push(metrics.stack.layer(stack_labels("source")))
.push_failfast(dispatch_timeout)
.push_spawn_buffer(buffer_capacity)
.box_http_response(),
)
.check_new_service::<http::Accept, http::Request<_>>()
.push(svc::layer::mk(http::normalize_uri::MakeNormalizeUri::new))
.check_new_service::<http::Accept, http::Request<_>>()
.instrument(|a: &http::Accept| info_span!("http", v = %a.protocol))
.push_map_target(http::Accept::from)
.check_new_service::<(http::Version, tcp::Accept), http::Request<_>>()
.into_inner();

let tcp = svc::stack(tcp)
.push_map_target(tcp::Endpoint::from_accept(
tls::ReasonForNoPeerName::IngressNonHttp,
))
.into_inner();

svc::stack(http::DetectHttp::new(h2_settings, http, tcp, drain))
.check_new_service::<tcp::Accept, io::PrefixedIo<transport::metrics::SensorIo<I>>>()
.push_on_response(svc::layers().push_spawn_buffer(buffer_capacity).push(
transport::Prefix::layer(
http::Version::DETECT_BUFFER_CAPACITY,
detect_protocol_timeout,
),
))
.check_new_service::<tcp::Accept, transport::metrics::SensorIo<I>>()
.push(metrics.transport.layer_accept())
.push_map_target(tcp::Accept::from)
.check_new_service::<listen::Addrs, I>()
.into_inner()
}

#[derive(Clone)]
struct AllowHttpProfile(AddrMatch);

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Target {
dst: Addr,
accept: http::Accept,
}

#[derive(Clone)]
struct TargetPerRequest(http::Accept);

// === AllowHttpProfile ===

impl svc::stack::FilterRequest<Target> for AllowHttpProfile {
type Request = Addr;

fn filter(&self, Target { dst, .. }: Target) -> Result<Addr, Error> {
if self.0.matches(&dst) {
Ok(dst)
} else {
Err(discovery_rejected().into())
}
}
}

// === impl Target ===

impl From<(Option<profiles::Receiver>, Target)> for http::Logical {
fn from((p, Target { accept, .. }): (Option<profiles::Receiver>, Target)) -> Self {
Self {
profile: p,
orig_dst: accept.orig_dst,
protocol: accept.protocol,
}
}
}

// === TargetPerRequest ===

impl TargetPerRequest {
fn accept(a: http::Accept) -> Self {
Self(a)
}
}

impl<B> svc::stack::RecognizeRoute<http::Request<B>> for TargetPerRequest {
type Key = Target;

fn recognize(&self, req: &http::Request<B>) -> Self::Key {
Target {
accept: self.0,
dst: http_request_l5d_override_dst_addr(req).unwrap_or_else(|_| self.0.orig_dst.into()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/TIOLI: it might be worth having a debug event indicating whether or not we saw the override header, or something, since this is kinda load-bearing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be pretty obvious based on the resulting target, which is ends up in span metadata

}
}
}
5 changes: 3 additions & 2 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
#![deny(warnings, rust_2018_idioms)]

pub mod http;
pub mod ingress;
mod resolve;
pub mod server;
pub mod target;
pub mod tcp;
#[cfg(test)]
mod test_util;

use linkerd2_app_core::{config::ProxyConfig, metrics, IpMatch};
use linkerd2_app_core::{config::ProxyConfig, metrics, AddrMatch};
use std::{collections::HashMap, time::Duration};

const EWMA_DEFAULT_RTT: Duration = Duration::from_millis(30);
Expand All @@ -22,7 +23,7 @@ const EWMA_DECAY: Duration = Duration::from_secs(10);
#[derive(Clone, Debug)]
pub struct Config {
pub proxy: ProxyConfig,
pub allow_discovery: IpMatch,
pub allow_discovery: AddrMatch,
}

fn stack_labels(name: &'static str) -> metrics::StackLabels {
Expand Down
16 changes: 7 additions & 9 deletions linkerd/app/outbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use linkerd2_app_core::{
proxy::{api_resolve::Metadata, core::resolve::Resolve},
spans::SpanConverter,
svc,
transport::{self, io, listen},
transport::{self, io, listen, tls},
Addr, Error, IpMatch, TraceContext,
};
use std::net::SocketAddr;
use tokio::sync::mpsc;
use tracing::{debug_span, info_span};
use tracing::info_span;

pub fn stack<R, P, C, H, S, I>(
config: &Config,
Expand Down Expand Up @@ -126,12 +126,10 @@ where
.check_new_service::<tcp::Logical, transport::metrics::SensorIo<I>>()
.into_inner();

let tcp = svc::stack(tcp_connect)
.push_make_thunk()
.push_on_response(svc::layer::mk(tcp::Forward::new))
.instrument(|_: &tcp::Endpoint| debug_span!("tcp.forward"))
.check_new_service::<tcp::Endpoint, transport::metrics::SensorIo<I>>()
.push_map_target(tcp::Endpoint::from)
let tcp = svc::stack(tcp::connect::forward(tcp_connect))
.push_map_target(tcp::Endpoint::from_logical(
tls::ReasonForNoPeerName::PortSkipped,
))
.check_new_service::<tcp::Logical, transport::metrics::SensorIo<I>>()
.into_inner();

Expand All @@ -140,7 +138,7 @@ where
.push_map_target(tcp::Logical::from)
.push(profiles::discover::layer(
profiles,
AllowProfile(config.allow_discovery.clone()),
AllowProfile(config.allow_discovery.clone().into()),
))
.check_new_service::<tcp::Accept, transport::metrics::SensorIo<I>>()
.cache(
Expand Down
14 changes: 6 additions & 8 deletions linkerd/app/outbound/src/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,23 +158,21 @@ impl<P> Into<SocketAddr> for &'_ Concrete<P> {

// === impl Endpoint ===

impl<P> From<Logical<P>> for Endpoint<P> {
fn from(logical: Logical<P>) -> Self {
Self {
impl<P> Endpoint<P> {
pub fn from_logical(reason: tls::ReasonForNoPeerName) -> impl (Fn(Logical<P>) -> Self) + Clone {
move |logical| Self {
Comment on lines +162 to +163
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

addr: (&logical).into(),
metadata: Metadata::default(),
identity: tls::PeerIdentity::None(tls::ReasonForNoPeerName::PortSkipped.into()),
identity: tls::PeerIdentity::None(reason),
concrete: Concrete {
logical,
resolve: None,
},
}
}
}

impl<P> From<Accept<P>> for Endpoint<P> {
fn from(accept: Accept<P>) -> Self {
Logical::from((None, accept)).into()
pub fn from_accept(reason: tls::ReasonForNoPeerName) -> impl (Fn(Accept<P>) -> Self) + Clone {
move |accept| Self::from_logical(reason)(Logical::from((None, accept)))
}
}

Expand Down
Loading