diff --git a/Cargo.lock b/Cargo.lock index 52b7766ab5..d3e5dcdbc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -827,7 +827,6 @@ dependencies = [ "linkerd2-proxy-transport", "linkerd2-reconnect", "linkerd2-retry", - "linkerd2-router", "linkerd2-service-profiles", "linkerd2-stack", "linkerd2-stack-metrics", @@ -1426,18 +1425,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "linkerd2-router" -version = "0.1.0" -dependencies = [ - "futures 0.3.5", - "linkerd2-error", - "linkerd2-stack", - "pin-project", - "tower", - "tracing", -] - [[package]] name = "linkerd2-service-profiles" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index fad3b93f94..f00b07d025 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,6 @@ members = [ "linkerd/proxy/transport", "linkerd/reconnect", "linkerd/retry", - "linkerd/router", "linkerd/service-profiles", "linkerd/signal", "linkerd/stack", diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index 3411662619..0f48584cfd 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -52,7 +52,6 @@ linkerd2-proxy-tcp = { path = "../../proxy/tcp" } linkerd2-proxy-transport = { path = "../../proxy/transport" } linkerd2-reconnect = { path = "../../reconnect" } linkerd2-retry = { path = "../../retry" } -linkerd2-router = { path = "../../router" } linkerd2-timeout = { path = "../../timeout" } linkerd2-tracing = { path = "../../tracing" } linkerd2-service-profiles = { path = "../../service-profiles" } diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index e56346e9c3..0f9e0481c6 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -19,7 +19,6 @@ pub use linkerd2_exp_backoff as exp_backoff; pub use linkerd2_http_metrics as http_metrics; pub use linkerd2_opencensus as opencensus; pub use linkerd2_reconnect as reconnect; -pub use linkerd2_router as router; pub use linkerd2_service_profiles as profiles; pub use linkerd2_stack_metrics as stack_metrics; pub use linkerd2_stack_tracing as stack_tracing; diff --git a/linkerd/app/inbound/src/endpoint.rs b/linkerd/app/inbound/src/endpoint.rs index c5e9d57d49..372fd83fcb 100644 --- a/linkerd/app/inbound/src/endpoint.rs +++ b/linkerd/app/inbound/src/endpoint.rs @@ -3,7 +3,7 @@ use linkerd2_app_core::{ classify, dst, http_request_authority_addr, http_request_host_addr, http_request_l5d_override_dst_addr, metrics, profiles, proxy::{http, identity, tap}, - router, stack_tracing, + stack_tracing, svc, transport::{self, listen, tls}, Addr, Conditional, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, }; @@ -296,7 +296,7 @@ impl From for RequestTarget { } } -impl router::Recognize> for RequestTarget { +impl svc::stack::RecognizeRoute> for RequestTarget { type Key = Target; fn recognize(&self, req: &http::Request) -> Self::Key { diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index ccb0b50ae9..a939cccbf9 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -22,7 +22,7 @@ use linkerd2_app_core::{ http::{self, orig_proto, strip_header}, identity, tap, tcp, }, - reconnect, router, + reconnect, spans::SpanConverter, svc::{self}, transport::{self, io, listen, tls}, @@ -351,8 +351,9 @@ impl Config { // Routes each request to a target, obtains a service for that // target, and dispatches the request. .instrument_from_target() - .into_make_service() - .push(router::Layer::new(RequestTarget::from)) + .push(svc::layer::mk(|inner| { + svc::stack::NewRouter::new(RequestTarget::from, inner) + })) // Used by tap. .push_http_insert_target() .check_new_service::>() diff --git a/linkerd/router/Cargo.toml b/linkerd/router/Cargo.toml deleted file mode 100644 index 7c7fe16188..0000000000 --- a/linkerd/router/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "linkerd2-router" -version = "0.1.0" -authors = ["Linkerd Developers "] -edition = "2018" -publish = false - -[dependencies] -futures = "0.3" -linkerd2-error = { path = "../error" } -linkerd2-stack = { path = "../stack" } -tower = { version = "0.3", default-features = false, features = ["util"] } -tracing = "0.1.2" -pin-project = "0.4" diff --git a/linkerd/router/src/lib.rs b/linkerd/router/src/lib.rs deleted file mode 100644 index b34e6b7bd1..0000000000 --- a/linkerd/router/src/lib.rs +++ /dev/null @@ -1,164 +0,0 @@ -#![deny(warnings, rust_2018_idioms)] - -use futures::{ready, TryFuture}; -use linkerd2_error::Error; -use linkerd2_stack::NewService; -use pin_project::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tower::util::{Oneshot, ServiceExt}; -use tracing::trace; - -pub trait Recognize { - type Key: Clone; - - fn recognize(&self, t: &T) -> Self::Key; -} - -pub fn recognize(f: F) -> RecognizeFn { - RecognizeFn(f) -} - -#[derive(Clone, Debug)] -pub struct Layer { - new_recgonize: T, -} - -#[derive(Clone, Debug)] -pub struct NewRouter { - new_recgonize: T, - make_route: M, -} - -#[derive(Clone, Debug)] -pub struct Router { - recognize: T, - make: M, -} - -#[derive(Clone, Debug)] -pub struct RecognizeFn(F); - -impl Layer { - pub fn new(new_recgonize: K) -> Self { - Self { new_recgonize } - } -} - -impl tower::layer::Layer for Layer { - type Service = NewRouter; - - fn layer(&self, make_route: M) -> Self::Service { - NewRouter { - make_route, - new_recgonize: self.new_recgonize.clone(), - } - } -} - -impl NewService for NewRouter -where - K: NewService, - M: Clone, -{ - type Service = Router; - - fn new_service(&mut self, t: T) -> Self::Service { - Router { - recognize: self.new_recgonize.new_service(t), - make: self.make_route.clone(), - } - } -} - -impl tower::Service for Router -where - U: std::fmt::Debug, - K: Recognize, - K::Key: std::fmt::Debug, - M: tower::Service, - M::Error: Into, - S: tower::Service, - S::Error: Into, -{ - type Response = S::Response; - type Error = Error; - type Future = ResponseFuture; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.make.poll_ready(cx).map_err(Into::into) - } - - fn call(&mut self, request: U) -> Self::Future { - let key = self.recognize.recognize(&request); - trace!(?key, ?request, "Routing"); - ResponseFuture { - state: State::Make(self.make.call(key), Some(request)), - } - } -} - -#[pin_project] -pub struct ResponseFuture -where - M: TryFuture, - M::Error: Into, - S: tower::Service, - S::Error: Into, -{ - #[pin] - state: State, -} - -#[pin_project(project = StateProj)] -enum State -where - M: TryFuture, - M::Error: Into, - S: tower::Service, - S::Error: Into, -{ - Make(#[pin] M, Option), - Respond(#[pin] Oneshot), -} - -impl Future for ResponseFuture -where - M: TryFuture, - M::Error: Into, - S: tower::Service, - S::Error: Into, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - loop { - match this.state.as_mut().project() { - StateProj::Make(fut, req) => { - trace!("Making"); - let service = ready!(fut.try_poll(cx)).map_err(Into::into)?; - let req = req.take().expect("polled after ready"); - this.state.set(State::Respond(service.oneshot(req))) - } - StateProj::Respond(future) => { - trace!("Responding"); - return future.poll(cx).map_err(Into::into); - } - } - } - } -} - -impl Recognize for RecognizeFn -where - K: Clone, - F: Fn(&T) -> K, -{ - type Key = K; - - fn recognize(&self, t: &T) -> Self::Key { - (self.0)(t) - } -} diff --git a/linkerd/stack/src/lib.rs b/linkerd/stack/src/lib.rs index 1939f30f4b..15b0cdcb6f 100644 --- a/linkerd/stack/src/lib.rs +++ b/linkerd/stack/src/lib.rs @@ -15,6 +15,7 @@ mod oneshot; mod proxy; mod request_filter; mod result; +pub mod router; mod switch; mod switch_ready; @@ -30,6 +31,7 @@ pub use self::oneshot::{Oneshot, OneshotLayer}; pub use self::proxy::{Proxy, ProxyService}; pub use self::request_filter::{FilterRequest, RequestFilter}; pub use self::result::ResultService; +pub use self::router::{NewRouter, RecognizeRoute}; pub use self::switch::{MakeSwitch, Switch}; pub use self::switch_ready::{NewSwitchReady, SwitchReady}; pub use tower::util::Either; diff --git a/linkerd/stack/src/router.rs b/linkerd/stack/src/router.rs new file mode 100644 index 0000000000..5b9b08d840 --- /dev/null +++ b/linkerd/stack/src/router.rs @@ -0,0 +1,77 @@ +use crate::NewService; +use std::task::{Context, Poll}; +use tower::util::{Oneshot, ServiceExt}; + +pub trait RecognizeRoute { + type Key: Clone; + + fn recognize(&self, t: &T) -> Self::Key; +} + +#[derive(Clone, Debug)] +pub struct NewRouter { + new_recgonize: T, + inner: N, +} + +#[derive(Clone, Debug)] +pub struct Router { + recognize: T, + inner: N, +} + +impl NewRouter { + pub fn new(new_recgonize: K, inner: N) -> Self { + Self { + new_recgonize, + inner, + } + } +} + +impl NewService for NewRouter +where + K: NewService, + N: Clone, +{ + type Service = Router; + + fn new_service(&mut self, t: T) -> Self::Service { + Router { + recognize: self.new_recgonize.new_service(t), + inner: self.inner.clone(), + } + } +} + +impl tower::Service for Router +where + K: RecognizeRoute, + N: NewService, + S: tower::Service, +{ + type Response = S::Response; + type Error = S::Error; + type Future = Oneshot; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Req) -> Self::Future { + let key = self.recognize.recognize(&req); + self.inner.new_service(key).oneshot(req) + } +} + +impl RecognizeRoute for F +where + K: Clone, + F: Fn(&T) -> K, +{ + type Key = K; + + fn recognize(&self, t: &T) -> Self::Key { + (self)(t) + } +}