Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

outbound: Extract HTTP and server modules #727

Merged
merged 4 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions linkerd/app/gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use linkerd2_app_core::{
discovery_rejected, profiles, proxy::http, svc, transport::tls, Error, NameAddr, NameMatch,
};
use linkerd2_app_inbound::endpoint as inbound;
use linkerd2_app_outbound::target as outbound;
use linkerd2_app_outbound as outbound;
use tracing::info_span;

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -35,7 +35,7 @@ impl Config {
P: profiles::GetProfile<NameAddr> + Clone + Send + 'static,
P::Future: Send + 'static,
P::Error: Send,
O: svc::NewService<outbound::HttpLogical, Service = S> + Clone + Send + 'static,
O: svc::NewService<outbound::http::Logical, Service = S> + Clone + Send + 'static,
S: tower::Service<
http::Request<http::boxed::Payload>,
Response = http::Response<http::boxed::Payload>,
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/gateway/src/make.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::gateway::Gateway;
use linkerd2_app_core::{profiles, svc, transport::tls, NameAddr};
use linkerd2_app_inbound::endpoint as inbound;
use linkerd2_app_outbound::target as outbound;
use linkerd2_app_outbound as outbound;
use tracing::debug;

#[derive(Clone, Debug)]
Expand All @@ -20,7 +20,7 @@ pub(crate) type Target = (Option<profiles::Receiver>, inbound::Target);

impl<O> svc::NewService<Target> for MakeGateway<O>
where
O: svc::NewService<outbound::HttpLogical> + Send + Clone + 'static,
O: svc::NewService<outbound::http::Logical> + Send + Clone + 'static,
{
type Service = Gateway<O::Service>;

Expand Down Expand Up @@ -48,7 +48,7 @@ where
// Create an outbound target using the resolved name and an address
// including the original port. We don't know the IP of the target, so
// we use an unroutable one.
let target = outbound::HttpLogical {
let target = outbound::http::Logical {
profile,
protocol: http_version,
orig_dst: ([0, 0, 0, 0], dst.port()).into(),
Expand Down
74 changes: 74 additions & 0 deletions linkerd/app/outbound/src/http/endpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use super::require_identity_on_endpoint::NewRequireIdentity;
use super::Endpoint;
use crate::tcp;
use linkerd2_app_core::{
classify,
config::ConnectConfig,
metrics,
opencensus::proto::trace::v1 as oc,
proxy::{http, tap},
reconnect,
spans::SpanConverter,
svc, Error, TraceContext, CANONICAL_DST_HEADER, L5D_REQUIRE_ID,
};
use tokio::{io, sync::mpsc};
use tracing::info_span;

pub fn stack<B, C>(
config: &ConnectConfig,
tcp_connect: C,
tap_layer: tap::Layer,
metrics: metrics::Proxy,
span_sink: Option<mpsc::Sender<oc::Span>>,
) -> impl svc::NewService<
Endpoint,
Service = impl tower::Service<
http::Request<B>,
Response = http::Response<http::boxed::Payload>,
Error = Error,
Future = impl Send,
> + Send,
> + Clone
+ Send
where
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
B::Data: Send + 'static,
C: tower::Service<Endpoint, Error = Error> + Unpin + Clone + Send + Sync + 'static,
C::Response: io::AsyncRead + io::AsyncWrite + Unpin + Send + 'static,
C::Future: Unpin + Send,
{
svc::stack(tcp_connect)
// Initiates an HTTP client on the underlying transport. Prior-knowledge HTTP/2
// is typically used (i.e. when communicating with other proxies); though
// HTTP/1.x fallback is supported as needed.
.push(http::client::layer(config.h2_settings))
// Re-establishes a connection when the client fails.
.push(reconnect::layer({
let backoff = config.backoff.clone();
move |e: Error| {
if tcp::connect::is_loop(&*e) {
Err(e)
} else {
Ok(backoff.stream())
}
}
}))
.check_new::<Endpoint>()
.push(tap_layer.clone())
.push(metrics.http_endpoint.into_layer::<classify::Response>())
.push_on_response(TraceContext::layer(
span_sink
.clone()
.map(|sink| SpanConverter::client(sink, crate::trace_labels())),
))
.push_on_response(http::strip_header::request::layer(L5D_REQUIRE_ID))
.push(svc::layer::mk(NewRequireIdentity::new))
.push(http::override_authority::Layer::new(vec![
::http::header::HOST.as_str(),
CANONICAL_DST_HEADER,
]))
.push_on_response(svc::layers().box_http_response())
.check_new::<Endpoint>()
.instrument(|e: &Endpoint| info_span!("endpoint", peer.addr = %e.addr))
.into_inner()
}
122 changes: 122 additions & 0 deletions linkerd/app/outbound/src/http/logical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use super::{Concrete, Endpoint, Logical};
use crate::{resolve, stack_labels};
use linkerd2_app_core::{
classify,
config::ProxyConfig,
metrics, profiles,
proxy::{api_resolve::Metadata, core::Resolve, http},
retry, svc, Addr, Error, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER,
};
use tracing::info_span;

pub fn stack<B, E, S, R>(
config: &ProxyConfig,
endpoint: E,
resolve: R,
metrics: metrics::Proxy,
) -> impl svc::NewService<
Logical,
Service = impl tower::Service<
http::Request<B>,
Response = http::Response<http::boxed::Payload>,
Error = Error,
Future = impl Send,
> + Send,
> + Unpin
+ Clone
+ Send
where
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
B::Data: Send + 'static,
E: svc::NewService<Endpoint, Service = S> + Clone + Send + Sync + Unpin + 'static,
S: tower::Service<
http::Request<http::boxed::Payload>,
Response = http::Response<http::boxed::Payload>,
Error = Error,
> + Send
+ 'static,
S::Future: Send,
R: Resolve<Addr, Endpoint = Metadata, Error = Error> + Unpin + Clone + Send + 'static,
R::Future: Unpin + Send,
R::Resolution: Unpin + Send,
{
let ProxyConfig {
buffer_capacity,
cache_max_idle_age,
dispatch_timeout,
..
} = config.clone();
let watchdog = cache_max_idle_age * 2;

svc::stack(endpoint)
.check_new_service::<Endpoint, http::Request<http::boxed::Payload>>()
.push_on_response(
svc::layers()
.push(svc::layer::mk(svc::SpawnReady::new))
.push(metrics.stack.layer(stack_labels("balance.endpoint")))
.box_http_request(),
)
.check_new_service::<Endpoint, http::Request<_>>()
.push(resolve::layer(resolve, watchdog))
.check_service::<Concrete>()
.push_on_response(
svc::layers()
.push(http::balance::layer(
crate::EWMA_DEFAULT_RTT,
crate::EWMA_DECAY,
))
.push(svc::layer::mk(svc::SpawnReady::new))
// If the balancer has been empty/unavailable for 10s, eagerly fail
// requests.
.push_failfast(dispatch_timeout)
.push(metrics.stack.layer(stack_labels("concrete"))),
)
.into_new_service()
.check_new_service::<Concrete, http::Request<_>>()
.instrument(|c: &Concrete| match c.resolve.as_ref() {
None => info_span!("concrete"),
Some(addr) => info_span!("concrete", %addr),
})
.check_new_service::<Concrete, http::Request<_>>()
// The concrete address is only set when the profile could be
// resolved. Endpoint resolution is skipped when there is no
// concrete address.
.push_map_target(Concrete::from)
.check_new_service::<(Option<Addr>, Logical), http::Request<_>>()
.push(profiles::split::layer())
.check_new_service::<Logical, http::Request<_>>()
// Drives concrete stacks to readiness and makes the split
// cloneable, as required by the retry middleware.
.push_on_response(
svc::layers()
.push_failfast(dispatch_timeout)
.push_spawn_buffer(buffer_capacity),
)
.check_new_service::<Logical, http::Request<_>>()
.push(profiles::http::route_request::layer(
svc::proxies()
.push(metrics.http_route_actual.into_layer::<classify::Response>())
// Sets an optional retry policy.
.push(retry::layer(metrics.http_route_retry))
// Sets an optional request timeout.
.push(http::MakeTimeoutLayer::default())
// Records per-route metrics.
.push(metrics.http_route.into_layer::<classify::Response>())
// Sets the per-route response classifier as a request
// extension.
.push(classify::Layer::new())
.push_map_target(Logical::into_route)
.into_inner(),
))
.check_new_service::<Logical, http::Request<_>>()
.push(http::header_from_target::layer(CANONICAL_DST_HEADER))
.push_on_response(
svc::layers()
// Strips headers that may be set by this proxy.
.push(http::strip_header::request::layer(DST_OVERRIDE_HEADER))
.push(svc::layers().box_http_response()),
)
.instrument(|l: &Logical| info_span!("logical", dst = %l.addr()))
.check_new_service::<Logical, http::Request<_>>()
.into_inner()
}
110 changes: 110 additions & 0 deletions linkerd/app/outbound/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
pub mod endpoint;
pub mod logical;
mod require_identity_on_endpoint;

use crate::tcp;
use indexmap::IndexMap;
pub use linkerd2_app_core::proxy::http::*;
use linkerd2_app_core::{
dst, profiles,
proxy::{
api_resolve::ProtocolHint,
http::{self, CanOverrideAuthority, ClientAddr},
identity, tap,
},
transport::tls,
Conditional,
};
use std::{net::SocketAddr, sync::Arc};

pub type Accept = crate::target::Accept<http::Version>;
pub type Logical = crate::target::Logical<http::Version>;
pub type Concrete = crate::target::Concrete<http::Version>;
pub type Endpoint = crate::target::Endpoint<http::Version>;

impl From<(http::Version, tcp::Logical)> for Logical {
fn from((protocol, logical): (http::Version, tcp::Logical)) -> Self {
Self {
protocol,
orig_dst: logical.orig_dst,
profile: logical.profile,
}
}
}

impl Logical {
pub fn into_route((route, logical): (profiles::http::Route, Self)) -> dst::Route {
use linkerd2_app_core::metrics::Direction;
dst::Route {
route,
target: logical.addr(),
direction: Direction::Out,
}
}
}

impl Into<http::client::Settings> for &'_ Endpoint {
fn into(self) -> http::client::Settings {
match self.concrete.logical.protocol {
http::Version::H2 => http::client::Settings::H2,
http::Version::Http1 => match self.metadata.protocol_hint() {
ProtocolHint::Unknown => http::client::Settings::Http1,
ProtocolHint::Http2 => http::client::Settings::OrigProtoUpgrade,
},
}
}
}

// Used to set the l5d-canonical-dst header.
impl From<&'_ Logical> for http::header::HeaderValue {
fn from(target: &'_ Logical) -> Self {
http::header::HeaderValue::from_str(&target.addr().to_string())
.expect("addr must be a valid header")
}
}

impl CanOverrideAuthority for Endpoint {
fn override_authority(&self) -> Option<http::uri::Authority> {
self.metadata.authority_override().cloned()
}
}

impl tap::Inspect for Endpoint {
fn src_addr<B>(&self, req: &http::Request<B>) -> Option<SocketAddr> {
req.extensions()
.get::<ClientAddr>()
.map(|c| c.as_ref().clone())
}

fn src_tls<'a, B>(
&self,
_: &'a http::Request<B>,
) -> Conditional<&'a identity::Name, tls::ReasonForNoPeerName> {
Conditional::None(tls::ReasonForNoPeerName::Loopback.into())
}

fn dst_addr<B>(&self, _: &http::Request<B>) -> Option<SocketAddr> {
Some(self.addr)
}

fn dst_labels<B>(&self, _: &http::Request<B>) -> Option<&IndexMap<String, String>> {
Some(self.metadata.labels())
}

fn dst_tls<B>(
&self,
_: &http::Request<B>,
) -> Conditional<&identity::Name, tls::ReasonForNoPeerName> {
self.identity.as_ref()
}

fn route_labels<B>(&self, req: &http::Request<B>) -> Option<Arc<IndexMap<String, String>>> {
req.extensions()
.get::<dst::Route>()
.map(|r| r.route.labels().clone())
}

fn is_outbound<B>(&self, _: &http::Request<B>) -> bool {
true
}
}
Loading