diff --git a/Cargo.lock b/Cargo.lock index f3eef9cdd7..a06a200c15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -105,7 +105,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0df2f85c8a2abbe3b7d7e748052fdd9b76a0458fdeb16ad4223f5eca78c7c130" dependencies = [ "addr2line", - "cfg-if", + "cfg-if 0.1.10", "libc", "object", "rustc-demangle", @@ -128,9 +128,9 @@ checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" [[package]] name = "bitflags" -version = "1.0.1" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3c30d3802dfb7281680d6285f2ccdaa8c2d8fee41f93805dba5c4cf50dc23cf" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" [[package]] name = "build_const" @@ -187,6 +187,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + [[package]] name = "chrono" version = "0.4.10" @@ -207,6 +213,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "cloudabi" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4344512281c643ae7638bbabc3af17a11307803ec8f0fcad9fae512a8bf36467" +dependencies = [ + "bitflags", +] + [[package]] name = "cmake" version = "0.1.41" @@ -232,7 +247,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" dependencies = [ "autocfg 1.0.0", - "cfg-if", + "cfg-if 0.1.10", "lazy_static", ] @@ -247,6 +262,12 @@ dependencies = [ "gzip-header", ] +[[package]] +name = "dyn-clone" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d55796afa1b20c2945ca8eabfc421839f2b766619209f1ede813cf2484f31804" + [[package]] name = "either" version = "1.5.1" @@ -439,7 +460,7 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7db7ca94ed4cd01190ceee0d8a8052f08a247aa1b469a7f68c6a3b71afcf407" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "wasi", ] @@ -633,6 +654,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d" +[[package]] +name = "instant" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb1fc4429a33e1f80d41dc9fea4d108a88bec1de8053878898ae448a0b52f613" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "iovec" version = "0.1.4" @@ -949,6 +979,7 @@ dependencies = [ "futures 0.3.5", "linkerd2-error", "linkerd2-stack", + "parking_lot 0.11.0", "tokio", "tower", "tracing", @@ -1433,6 +1464,7 @@ dependencies = [ name = "linkerd2-stack" version = "0.1.0" dependencies = [ + "dyn-clone", "futures 0.3.5", "linkerd2-error", "pin-project", @@ -1525,13 +1557,22 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lock_api" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28247cc5a5be2f05fbcd76dd0cf2c7d3b5400cb978a28042abcd4fa0b3f8261c" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c84ec4b527950aa83a329754b01dbe3f58361d1c5efacd1f6d68c494d08a17c6" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", ] [[package]] @@ -1540,7 +1581,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0e8460f2f2121162705187214720353c517b97bdfb3494c0b1e33d83ebe4bed" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "generator", "scoped-tls", "serde", @@ -1629,7 +1670,7 @@ version = "0.6.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "302dec22bcf6bae6dfb69c647187f4b4d0fb6f535521f7bc022430ce8e12008f" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "fuchsia-zircon", "fuchsia-zircon-sys", "iovec", @@ -1699,7 +1740,7 @@ version = "0.2.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9044faf1413a1057267be51b5afba8eb1090bd2231c693664aa1db716fe1eae0" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "winapi 0.3.8", ] @@ -1773,8 +1814,19 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" dependencies = [ - "lock_api", - "parking_lot_core", + "lock_api 0.3.4", + "parking_lot_core 0.7.2", +] + +[[package]] +name = "parking_lot" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4893845fa2ca272e647da5d0e46660a314ead9c2fdd9a883aabc32e481a8733" +dependencies = [ + "instant", + "lock_api 0.4.1", + "parking_lot_core 0.8.0", ] [[package]] @@ -1783,8 +1835,23 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" dependencies = [ - "cfg-if", - "cloudabi", + "cfg-if 0.1.10", + "cloudabi 0.0.3", + "libc", + "redox_syscall", + "smallvec", + "winapi 0.3.8", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b" +dependencies = [ + "cfg-if 0.1.10", + "cloudabi 0.1.0", + "instant", "libc", "redox_syscall", "smallvec", @@ -2075,7 +2142,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071" dependencies = [ - "cloudabi", + "cloudabi 0.0.3", "fuchsia-cprng", "libc", "rand_core 0.4.0", @@ -2250,9 +2317,9 @@ checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" [[package]] name = "scopeguard" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b42e15e59b18a828bbf5c58ea01debb36b9b096346de35d941dcb89009f24a0d" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "sct" @@ -2338,9 +2405,9 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" [[package]] name = "smallvec" -version = "1.2.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c2fb2ec9bcd216a5b0d0ccf31ab17b5ed1d627960edff65bbe95d3ce221cefc" +checksum = "7acad6f34eb9e8a259d3283d1e8c1d34d7415943d4895f65cc73813c7396fc85" [[package]] name = "socket2" @@ -2348,7 +2415,7 @@ version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "redox_syscall", "winapi 0.3.8", @@ -2386,7 +2453,7 @@ version = "3.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e91405c14320e5c79b3d148e1c86f40749a36e490642202a31689cb1a3452b2" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "rand 0.6.5", "redox_syscall", @@ -2470,7 +2537,7 @@ dependencies = [ "mio-named-pipes", "mio-uds", "num_cpus", - "parking_lot", + "parking_lot 0.10.2", "pin-project-lite", "signal-hook-registry", "slab", @@ -2691,7 +2758,7 @@ version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d79ca061b032d6ce30c660fded31189ca0b9922bf483cd70759f13a2d86786c" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "log", "tracing-attributes", "tracing-core", @@ -2758,7 +2825,7 @@ dependencies = [ "chrono", "lazy_static", "matchers", - "parking_lot", + "parking_lot 0.11.0", "regex 1.3.9", "serde", "serde_json", @@ -2798,7 +2865,7 @@ version = "0.20.0" source = "git+https://github.com/bluejekyll/trust-dns.git?rev=97d3bf10ecb0711aebf523e930f5de873808eb33#97d3bf10ecb0711aebf523e930f5de873808eb33" dependencies = [ "backtrace", - "cfg-if", + "cfg-if 0.1.10", "futures-util", "ipconfig", "lazy_static", @@ -2901,7 +2968,7 @@ version = "0.2.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "wasm-bindgen-macro", ] diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index 99f2e39654..aa7fd131c8 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -284,6 +284,14 @@ impl Stack { self.push(http::boxed::response::Layer::new()) } + pub fn box_new_service(self) -> Stack> + where + S: NewService + Clone + Send + Sync + 'static, + S::Service: Send + 'static, + { + self.push(layer::mk(stack::BoxNewService::new)) + } + /// Validates that this stack serves T-typed targets. pub fn check_new(self) -> Self where diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index ea708a7eb2..30f1f42821 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -73,7 +73,8 @@ impl Config { Future = impl Send + 'static, > + Send + 'static, - > + Send + > + Clone + + Send + 'static where L: svc::NewService + Unpin + Clone + Send + Sync + 'static, @@ -85,7 +86,7 @@ impl Config { + 'static, S::Error: Into, S::Future: Unpin + Send, - P: profiles::GetProfile + Unpin + Clone + Send + 'static, + P: profiles::GetProfile + Unpin + Clone + Send + Sync + 'static, P::Future: Unpin + Send, P::Error: Send, { @@ -173,11 +174,11 @@ impl Config { C::Error: Into, C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, C::Future: Unpin + Send, - P: profiles::GetProfile + Unpin + Clone + Send + 'static, + P: profiles::GetProfile + Unpin + Clone + Send + Sync + 'static, P::Future: Unpin + Send, P::Error: Send, // The loopback router processes requests sent to the inbound port. - L: svc::NewService + Unpin + Send + Clone + 'static, + L: svc::NewService + Unpin + Send + Clone + Sync + 'static, S: tower::Service< http::Request, Response = http::Response, @@ -282,9 +283,9 @@ impl Config { .box_http_response(), ), ) - .into_make_service() - .spawn_buffer(buffer_capacity) - .into_new_service() + // Boxing is necessary purely to limit the link-time overhead of + // having enormous types. + .box_new_service() .check_new_service::>() .into_inner() } @@ -406,7 +407,8 @@ impl Config { Future = impl Send + 'static, > + Send + 'static, - > + Send + > + Clone + + Send + 'static where D: svc::NewService + Unpin + Clone + Send + Sync + 'static, diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 30edeec2c1..2fb61c03be 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -48,7 +48,7 @@ where + Send + 'static, TSvc::Future: Send, - H: svc::NewService + Unpin + Send + Clone + 'static, + H: svc::NewService + Unpin + Clone + Send + Sync + 'static, HSvc: tower::Service< http::Request, Response = http::Response, @@ -56,7 +56,7 @@ where > + Send + 'static, HSvc::Future: Send, - P: profiles::GetProfile + Unpin + Clone + Send + 'static, + P: profiles::GetProfile + Unpin + Clone + Send + Sync + 'static, P::Future: Unpin + Send, P::Error: Send, { @@ -89,9 +89,6 @@ where .push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age), ), ) - .into_make_service() - .spawn_buffer(buffer_capacity) - .into_new_service() .check_new_service::>() .instrument(|t: &Target| info_span!("target", dst = %t.dst)) .push(svc::layer::mk(|inner| { @@ -144,6 +141,9 @@ where .push(metrics.transport.layer_accept()) .push_map_target(tcp::Accept::from) .check_new_service::() + // Boxing is necessary purely to limit the link-time overhead of + // having enormous types. + .box_new_service() .into_inner() } diff --git a/linkerd/app/outbound/src/server.rs b/linkerd/app/outbound/src/server.rs index 7c5133a5a1..8c9e7b86c2 100644 --- a/linkerd/app/outbound/src/server.rs +++ b/linkerd/app/outbound/src/server.rs @@ -36,13 +36,13 @@ pub fn stack( + 'static where I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Unpin + Send + 'static, - R: Resolve + Unpin + Clone + Send + 'static, + R: Resolve + Unpin + Clone + Send + Sync + 'static, R::Future: Unpin + Send, R::Resolution: Unpin + Send, C: tower::Service + Unpin + Clone + Send + Sync + 'static, C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, C::Future: Unpin + Send, - H: svc::NewService + Unpin + Send + Clone + 'static, + H: svc::NewService + Unpin + Clone + Send + Sync + 'static, S: tower::Service< http::Request, Response = http::Response, @@ -50,7 +50,7 @@ where > + Send + 'static, S::Future: Send, - P: profiles::GetProfile + Unpin + Clone + Send + 'static, + P: profiles::GetProfile + Unpin + Clone + Send + Sync + 'static, P::Future: Unpin + Send, P::Error: Send, { diff --git a/linkerd/cache/Cargo.toml b/linkerd/cache/Cargo.toml index 8f05ea784b..a36fdac8b3 100644 --- a/linkerd/cache/Cargo.toml +++ b/linkerd/cache/Cargo.toml @@ -9,6 +9,7 @@ publish = false futures = "0.3" linkerd2-error = { path = "../error" } linkerd2-stack = { path = "../stack" } +parking_lot = "0.11" tokio = "0.2" tower = { version = "0.3", default-features = false, features = ["util"] } tracing = "0.1.19" diff --git a/linkerd/cache/src/lib.rs b/linkerd/cache/src/lib.rs index 358aa319db..4ea0453928 100644 --- a/linkerd/cache/src/lib.rs +++ b/linkerd/cache/src/lib.rs @@ -1,15 +1,19 @@ #![deny(warnings, rust_2018_idioms)] use linkerd2_stack::NewService; -use std::collections::HashMap; -use std::hash::Hash; -use std::sync::{Arc, Weak}; +use parking_lot::RwLock; +use std::{ + collections::{hash_map::Entry, HashMap}, + hash::Hash, + sync::{Arc, Weak}, +}; use tracing::{debug, trace}; pub mod layer; pub use self::layer::CacheLayer; +#[derive(Clone)] pub struct Cache where T: Eq + Hash, @@ -24,13 +28,13 @@ where #[derive(Clone, Debug)] pub struct Handle(Arc<()>); -type Services = HashMap)>; +type Services = Arc)>>>; // === impl Cache === impl Cache where - T: Eq + Hash + Send, + T: Eq + Hash, N: NewService<(T, Handle)>, { pub fn new(new_service: N) -> Self { @@ -39,6 +43,13 @@ where services: Services::default(), } } + + fn new_entry(new: &mut N, target: T) -> (N::Service, Weak<()>) { + let handle = Arc::new(()); + let weak = Arc::downgrade(&handle); + let svc = new.new_service((target, Handle(handle))); + (svc, weak) + } } impl NewService for Cache @@ -50,24 +61,44 @@ where type Service = N::Service; fn new_service(&mut self, target: T) -> N::Service { - if let Some((service, weak)) = self.services.get(&target) { + // We expect the item to be available in most cases, so initially obtain only a read lock. + if let Some((service, weak)) = self.services.read().get(&target) { if weak.upgrade().is_some() { trace!("Using cached service"); return service.clone(); } } - // Make a new service for the target - let handle = Arc::new(()); - let weak = Arc::downgrade(&handle); - let service = self - .new_service - .new_service((target.clone(), Handle(handle))); + // Otherwise, obtain a write lock to insert a new service. + let mut services = self.services.write(); + + let service = match services.entry(target.clone()) { + Entry::Occupied(mut entry) => { + // Another thread raced us to create a service for this target. Use it. + let (svc, weak) = entry.get(); + if weak.upgrade().is_some() { + trace!("Using cached service"); + svc.clone() + } else { + debug!("Caching new service"); + let (svc, weak) = Self::new_entry(&mut self.new_service, target); + entry.insert((svc.clone(), weak)); + svc + } + } + Entry::Vacant(entry) => { + // Make a new service for the target. + debug!("Caching new service"); + let (svc, weak) = Self::new_entry(&mut self.new_service, target); + entry.insert((svc.clone(), weak)); + svc + } + }; // Drop defunct services before inserting the new service into the // cache. - let n = self.services.len(); - self.services.retain(|_, (_, weak)| { + let n = services.len(); + services.retain(|_, (_, weak)| { if weak.strong_count() > 0 { true } else { @@ -75,14 +106,8 @@ where false } }); - debug!( - services = self.services.len(), - dropped = n - self.services.len() - ); - - debug!("Caching new service"); - self.services.insert(target, (service.clone(), weak)); + debug!(services = services.len(), dropped = n - services.len()); - service.into() + service } } diff --git a/linkerd/stack/Cargo.toml b/linkerd/stack/Cargo.toml index 233f4f88af..bff48cb758 100644 --- a/linkerd/stack/Cargo.toml +++ b/linkerd/stack/Cargo.toml @@ -9,6 +9,7 @@ Utilities for composing Tower services. """ [dependencies] +dyn-clone = "1.0.3" futures = "0.3" linkerd2-error = { path = "../error" } pin-project = "0.4" diff --git a/linkerd/stack/src/box_new_service.rs b/linkerd/stack/src/box_new_service.rs new file mode 100644 index 0000000000..1e23e9cc25 --- /dev/null +++ b/linkerd/stack/src/box_new_service.rs @@ -0,0 +1,55 @@ +use crate::NewService; +use dyn_clone::DynClone; +use std::fmt; + +pub struct BoxNewService { + inner: Box + Send + Sync>, +} + +trait CloneNewService: DynClone { + fn inner_new_service(&mut self, t: T) -> S; +} + +impl CloneNewService for N +where + N: NewService + Clone + Send + Sync + 'static, + N::Service: Send + 'static, +{ + fn inner_new_service(&mut self, t: T) -> N::Service { + self.new_service(t) + } +} + +impl BoxNewService { + pub fn new(inner: N) -> Self + where + N: NewService + Clone + Send + Sync + 'static, + S: Send + 'static, + { + Self { + inner: Box::new(inner), + } + } +} + +impl Clone for BoxNewService { + fn clone(&self) -> Self { + Self { + inner: dyn_clone::clone_box(&*self.inner), + } + } +} + +impl NewService for BoxNewService { + type Service = S; + + fn new_service(&mut self, t: T) -> S { + self.inner.inner_new_service(t) + } +} + +impl fmt::Debug for BoxNewService { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BoxNewService").finish() + } +} diff --git a/linkerd/stack/src/lib.rs b/linkerd/stack/src/lib.rs index ef53c520a9..0ccd0d5dea 100644 --- a/linkerd/stack/src/lib.rs +++ b/linkerd/stack/src/lib.rs @@ -2,6 +2,7 @@ #![deny(warnings, rust_2018_idioms)] +mod box_new_service; mod fail_on_error; mod future_service; pub mod layer; @@ -19,6 +20,7 @@ pub mod router; mod switch; mod switch_ready; +pub use self::box_new_service::BoxNewService; pub use self::fail_on_error::FailOnError; pub use self::future_service::FutureService; pub use self::make_ready::{MakeReady, MakeReadyLayer};