Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

outbound: Skip endpoint resolution on profile hint #736

Merged
merged 10 commits into from
Nov 14, 2020
5 changes: 3 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1168,8 +1168,8 @@ dependencies = [

[[package]]
name = "linkerd2-proxy-api"
version = "0.1.14"
source = "git+https://github.com/linkerd/linkerd2-proxy-api?tag=v0.1.14#76470482fb53c359e6f823d0279d1c7b26030a6a"
version = "0.1.15"
source = "git+https://github.com/linkerd/linkerd2-proxy-api?tag=v0.1.15#d260ea21a0e490d863a9ed02bcc051035be9d238"
dependencies = [
"h2 0.2.6",
"http 0.2.1",
Expand Down Expand Up @@ -1407,6 +1407,7 @@ dependencies = [
"linkerd2-dns-name",
"linkerd2-error",
"linkerd2-proxy-api",
"linkerd2-proxy-api-resolve",
"linkerd2-stack",
"pin-project",
"prost-types",
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ linkerd2-http-metrics = { path = "../../http-metrics" }
linkerd2-metrics = { path = "../../metrics" }
linkerd2-opencensus = { path = "../../opencensus" }
linkerd2-proxy-core = { path = "../../proxy/core" }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.14" }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.15" }
linkerd2-proxy-api-resolve = { path = "../../proxy/api-resolve" }
linkerd2-proxy-discover = { path = "../../proxy/discover" }
linkerd2-proxy-identity = { path = "../../proxy/identity" }
Expand Down Expand Up @@ -86,5 +86,5 @@ libc = "0.2"
procinfo = "0.4.2"

[dev-dependencies]
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.14", features = ["arbitrary"] }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.15", features = ["arbitrary"] }
prost-types = "0.6.0"
10 changes: 10 additions & 0 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,16 @@ impl<S> Stack<S> {
}))
}

pub fn push_switch<T: Clone, U: Clone>(
self,
switch: T,
other: U,
) -> Stack<stack::MakeSwitch<T, S, U>> {
self.push(layer::mk(|inner: S| {
stack::MakeSwitch::new(switch.clone(), inner, other.clone())
}))
}

// pub fn box_http_request<B>(self) -> Stack<http::boxed::BoxRequest<S, B>>
// where
// B: hyper::body::HttpBody<Data = http::boxed::Data, Error = Error> + 'static,
Expand Down
37 changes: 18 additions & 19 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl Config {

// If the traffic is targeted at the inbound port, send it through
// the loopback service (i.e. as a gateway).
let switch_loopback = svc::stack::MakeSwitch::new(prevent_loop, loopback, profile);
let switch_loopback = svc::stack(loopback).push_switch(prevent_loop, profile);

// Attempts to resolve the target as a service profile or, if that
// fails, skips that stack to forward to the local endpoint.
Expand Down Expand Up @@ -423,25 +423,24 @@ impl Config {
..
} = self.proxy;
let require_identity = self.require_identity_for_inbound_ports;
let skip_detect = self.disable_protocol_detection_for_ports;

svc::stack::MakeSwitch::new(
skip_detect,
svc::stack(detect)
.push_request_filter(require_identity)
.push(metrics.transport.layer_accept())
.push_map_target(TcpAccept::from)
.push(tls::DetectTls::layer(
identity.clone(),
detect_protocol_timeout,
))
.into_inner(),
svc::stack(tcp_forward)
.push_map_target(TcpEndpoint::from)
.push(metrics.transport.layer_accept())
.push_map_target(TcpAccept::from)
.into_inner(),
)
svc::stack(detect)
.push_request_filter(require_identity)
.push(metrics.transport.layer_accept())
.push_map_target(TcpAccept::from)
.push(tls::DetectTls::layer(
identity.clone(),
detect_protocol_timeout,
))
.push_switch(
self.disable_protocol_detection_for_ports,
svc::stack(tcp_forward)
.push_map_target(TcpEndpoint::from)
.push(metrics.transport.layer_accept())
.push_map_target(TcpAccept::from)
.into_inner(),
)
.into_inner()
}
}

Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ hyper = "0.13.7"
linkerd2-app = { path = "..", features = ["mock-orig-dst"] }
linkerd2-app-core = { path = "../core", features = ["mock-orig-dst"] }
linkerd2-metrics = { path = "../../metrics", features = ["test_util"] }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.14", features = ["arbitrary"] }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.15", features = ["arbitrary"] }
linkerd2-app-test = { path = "../test" }
regex = "0.1"
socket2 = "0.3.12"
Expand Down
21 changes: 19 additions & 2 deletions linkerd/app/outbound/src/http/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use linkerd2_app_core::{
config::ProxyConfig,
metrics, profiles,
proxy::{api_resolve::Metadata, core::Resolve, http},
retry, svc, Addr, Error, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER,
retry, svc,
transport::tls::ReasonForNoPeerName,
Addr, Error, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER,
};
use tracing::debug_span;

Expand Down Expand Up @@ -48,7 +50,7 @@ where
} = config.clone();
let watchdog = cache_max_idle_age * 2;

svc::stack(endpoint)
svc::stack(endpoint.clone())
.check_new_service::<Endpoint, http::Request<http::boxed::Payload>>()
.push_on_response(
svc::layers()
Expand Down Expand Up @@ -118,5 +120,20 @@ where
)
.instrument(|l: &Logical| debug_span!("logical", dst = %l.addr()))
.check_new_service::<Logical, http::Request<_>>()
.push_switch(
Logical::should_resolve,
svc::stack(endpoint)
.push_on_response(
svc::layers()
.push(svc::layer::mk(
svc::stack::FailOnError::<std::io::Error, S>::new,
))
.box_http_request(),
)
.push_map_target(Endpoint::from_logical(
ReasonForNoPeerName::NotProvidedByServiceDiscovery,
))
.into_inner(),
)
.into_inner()
}
3 changes: 2 additions & 1 deletion linkerd/app/outbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ where
.check_new_service::<tcp::Logical, transport::metrics::SensorIo<I>>()
.into_inner();

svc::stack(svc::stack::MakeSwitch::new(SkipByProfile, http, tcp))
svc::stack(http)
.push_switch(SkipByProfile, tcp)
.check_new_service::<tcp::Logical, transport::metrics::SensorIo<I>>()
.push_map_target(tcp::Logical::from)
.push(profiles::discover::layer(
Expand Down
44 changes: 37 additions & 7 deletions linkerd/app/outbound/src/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ impl<P> Logical<P> {
.map(|n| Addr::from((n, self.orig_dst.port())))
.unwrap_or_else(|| self.orig_dst.into())
}

pub fn should_resolve(&self) -> bool {
if let Some(p) = self.profile.as_ref() {
let p = p.borrow();
p.endpoint.is_none() && (p.name.is_some() || p.targets.len() > 0)
} else {
false
}
}
}

impl<P: std::fmt::Debug> std::fmt::Debug for Logical<P> {
Expand Down Expand Up @@ -160,13 +169,34 @@ impl<P> Into<SocketAddr> for &'_ Concrete<P> {

impl<P> Endpoint<P> {
pub fn from_logical(reason: tls::ReasonForNoPeerName) -> impl (Fn(Logical<P>) -> Self) + Clone {
move |logical| Self {
addr: (&logical).into(),
metadata: Metadata::default(),
identity: tls::PeerIdentity::None(reason),
concrete: Concrete {
logical,
resolve: None,
move |logical| match logical
.profile
.as_ref()
.and_then(|p| p.borrow().endpoint.clone())
{
None => Self {
addr: (&logical).into(),
metadata: Metadata::default(),
identity: tls::PeerIdentity::None(reason),
concrete: Concrete {
logical,
resolve: None,
},
},
Some((addr, metadata)) => Self {
addr: addr.into(),
identity: metadata
.identity()
.cloned()
.map(tls::Conditional::Some)
.unwrap_or(tls::Conditional::None(
tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery,
)),
metadata,
concrete: Concrete {
logical,
resolve: None,
},
},
}
}
Expand Down
125 changes: 124 additions & 1 deletion linkerd/app/outbound/src/tcp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{Concrete, Endpoint, Logical};
use crate::test_util::{
support::{
connect::{Connect, ConnectFuture},
resolver,
profile, resolver,
},
*,
};
Expand Down Expand Up @@ -611,6 +611,129 @@ async fn no_profiles_when_outside_search_nets() {
);
}

#[tokio::test(core_threads = 1)]
async fn no_discovery_when_profile_has_an_endpoint() {
let _trace = support::trace_init();

let ep = SocketAddr::new([10, 0, 0, 41].into(), 5550);
let cfg = default_config(ep);
let id_name = linkerd2_identity::Name::from_hostname(
b"foo.ns1.serviceaccount.identity.linkerd.cluster.local",
)
.expect("hostname is invalid");
let meta = support::resolver::Metadata::new(
Default::default(),
support::resolver::ProtocolHint::Unknown,
Some(id_name.clone()),
10_000,
None,
);

// Build a mock "connector" that returns the upstream "server" IO.
let connect = support::connect().endpoint(
ep,
Connection {
identity: tls::Conditional::Some(id_name.clone()),
..Connection::default()
},
);

let resolver = support::resolver::<Addr, support::resolver::Metadata>();
let resolve_state = resolver.handle();

let profiles = profile::resolver().profile(
ep,
profile::Profile {
opaque_protocol: true,
endpoint: Some((ep, meta.clone())),
..Default::default()
},
);

// Build the outbound server
let mut server = build_server(cfg, profiles, resolver, connect);

hello_world_client(ep, &mut server).await;

assert!(
resolve_state.is_empty(),
"proxy tried to resolve endpoints provided by profile discovery!"
);
}

#[tokio::test(core_threads = 1)]
async fn profile_endpoint_propagates_conn_errors() {
// This test asserts that when profile resolution returns an endpoint, and
// connecting to that endpoint fails, the proxy will resolve a new endpoint
// for subsequent connections to the same original destination.
let _trace = support::trace_init();

let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550);
let ep2 = SocketAddr::new([10, 0, 0, 42].into(), 5550);

let cfg = default_config(ep1);
let id_name = linkerd2_identity::Name::from_hostname(
b"foo.ns1.serviceaccount.identity.linkerd.cluster.local",
)
.expect("hostname is invalid");
let meta = support::resolver::Metadata::new(
Default::default(),
support::resolver::ProtocolHint::Unknown,
Some(id_name.clone()),
10_000,
None,
);

// Build a mock "connector" that returns the upstream "server" IO.
let connect = support::connect()
.endpoint_fn(ep1, |_| {
Err(Box::new(io::Error::new(
io::ErrorKind::ConnectionReset,
"i dont like you, go away",
)))
})
.endpoint(
ep2,
Connection {
identity: tls::Conditional::Some(id_name.clone()),
..Connection::default()
},
);

let profiles = profile::resolver();
let profile_tx = profiles.profile_tx(ep1);
profile_tx
.broadcast(profile::Profile {
opaque_protocol: true,
endpoint: Some((ep1, meta.clone())),
..Default::default()
})
.expect("still listening to profiles");

let resolver = support::resolver::<Addr, support::resolver::Metadata>();

// Build the outbound server
let mut server = build_server(cfg, profiles, resolver, connect);

let svc = server.new_service(listen::Addrs::new(
([127, 0, 0, 1], 4140).into(),
([127, 0, 0, 1], 666).into(),
Some(ep1),
));

let res = svc
.oneshot(support::io().read(b"hello\r\n").write(b"world").build())
.await
.map_err(Into::into);
tracing::info!(?res);
assert_eq!(
res.unwrap_err()
.downcast_ref::<io::Error>()
.map(io::Error::kind),
Some(io::ErrorKind::ConnectionReset)
);
}

struct Connection {
identity: tls::Conditional<linkerd2_identity::Name>,
count: Arc<AtomicUsize>,
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/test/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub type Profiles<T> = Resolver<T, Option<profiles::Receiver>>;
#[derive(Debug, Clone)]
pub struct DstSender<T>(mpsc::UnboundedSender<Result<Update<T>, Error>>);

pub struct ProfileSender(watch::Sender<Profile>);
pub type ProfileSender = watch::Sender<Profile>;

#[derive(Debug, Clone)]
pub struct Handle<T, E>(Arc<State<T, E>>);
Expand Down Expand Up @@ -144,7 +144,7 @@ where
pub fn profile_tx(&self, addr: T) -> ProfileSender {
let (tx, rx) = watch::channel(Profile::default());
self.state.endpoints.lock().unwrap().insert(addr, Some(rx));
ProfileSender(tx)
tx
}

pub fn profile(self, addr: T, profile: Profile) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion linkerd/io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub use self::{
prefixed::PrefixedIo,
sensor::{Sensor, SensorIo},
};
pub use std::io::{Error, Read, Result, Write};
pub use std::io::{Error, ErrorKind, Read, Result, Write};
use std::net::SocketAddr;
pub use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/api-resolve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Implements the Resolve trait using the proxy's gRPC API
async-stream = "0.2.1"
futures = "0.3"
linkerd2-identity = { path = "../../identity" }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.14" }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.15" }
linkerd2-proxy-core = { path = "../core" }
prost = "0.6"
http = "0.2"
Expand Down
Loading