From 1c4f001dca9a85d39d9172a1780f90481d5be4b1 Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Thu, 23 Apr 2020 12:01:09 +0200 Subject: [PATCH] Fix leak in stream notifications (#5739) --- Cargo.lock | 1 + client/api/Cargo.toml | 1 + client/api/src/notifications.rs | 80 ++++++++++++++++++++++++++------- client/src/client.rs | 4 +- 4 files changed, 68 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc1037840163f..6d8651dabdc64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6001,6 +6001,7 @@ dependencies = [ "sp-trie", "sp-utils", "sp-version", + "substrate-prometheus-endpoint", ] [[package]] diff --git a/client/api/Cargo.toml b/client/api/Cargo.toml index 2145a09f2d52e..c745dc3bee747 100644 --- a/client/api/Cargo.toml +++ b/client/api/Cargo.toml @@ -41,6 +41,7 @@ sc-telemetry = { version = "2.0.0-dev", path = "../telemetry" } sp-trie = { version = "2.0.0-dev", path = "../../primitives/trie" } sp-storage = { version = "2.0.0-dev", path = "../../primitives/storage" } sp-transaction-pool = { version = "2.0.0-dev", path = "../../primitives/transaction-pool" } +prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.8.0-dev", path = "../../utils/prometheus" } [dev-dependencies] sp-test-primitives = { version = "2.0.0-dev", path = "../../primitives/test-primitives" } diff --git a/client/api/src/notifications.rs b/client/api/src/notifications.rs index f154eade44d5e..412fe8adc1e5b 100644 --- a/client/api/src/notifications.rs +++ b/client/api/src/notifications.rs @@ -25,6 +25,7 @@ use fnv::{FnvHashSet, FnvHashMap}; use sp_core::storage::{StorageKey, StorageData}; use sp_runtime::traits::Block as BlockT; use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded}; +use prometheus_endpoint::{Registry, CounterVec, Opts, U64, register}; /// Storage change set #[derive(Debug)] @@ -71,9 +72,12 @@ pub type StorageEventStream = TracingUnboundedReceiver<(H, StorageChangeSet)> type SubscriberId = u64; +type SubscribersGauge = CounterVec; + /// Manages storage listeners. #[derive(Debug)] pub struct StorageNotifications { + metrics: Option, next_id: SubscriberId, wildcard_listeners: FnvHashSet, listeners: HashMap>, @@ -90,7 +94,8 @@ pub struct StorageNotifications { impl Default for StorageNotifications { fn default() -> Self { - StorageNotifications { + Self { + metrics: Default::default(), next_id: Default::default(), wildcard_listeners: Default::default(), listeners: Default::default(), @@ -101,6 +106,29 @@ impl Default for StorageNotifications { } impl StorageNotifications { + /// Initialize a new StorageNotifications + /// optionally pass a prometheus registry to send subscriber metrics to + pub fn new(prometheus_registry: Option) -> Self { + let metrics = prometheus_registry.and_then(|r| + CounterVec::new( + Opts::new( + "storage_notification_subscribers", + "Number of subscribers in storage notification sytem" + ), + &["action"], //added | removed + ).and_then(|g| register(g, &r)) + .ok() + ); + + StorageNotifications { + metrics, + next_id: Default::default(), + wildcard_listeners: Default::default(), + listeners: Default::default(), + child_listeners: Default::default(), + sinks: Default::default(), + } + } /// Trigger notification to all listeners. /// /// Note the changes are going to be filtered by listener's filter key. @@ -113,6 +141,7 @@ impl StorageNotifications { Item=(Vec, impl Iterator, Option>)>) >, ) { + let has_wildcard = !self.wildcard_listeners.is_empty(); // early exit if no listeners @@ -169,21 +198,32 @@ impl StorageNotifications { let changes = Arc::new(changes); let child_changes = Arc::new(child_changes); // Trigger the events - for subscriber in subscribers { - let should_remove = { - let &(ref sink, ref filter, ref child_filters) = self.sinks.get(&subscriber) - .expect("subscribers returned from self.listeners are always in self.sinks; qed"); - sink.unbounded_send((hash.clone(), StorageChangeSet { - changes: changes.clone(), - child_changes: child_changes.clone(), - filter: filter.clone(), - child_filters: child_filters.clone(), - })).is_err() - }; - - if should_remove { - self.remove_subscriber(subscriber); - } + + let to_remove = self.sinks + .iter() + .filter_map(|(subscriber, &(ref sink, ref filter, ref child_filters))| { + let should_remove = { + if subscribers.contains(subscriber) { + sink.unbounded_send((hash.clone(), StorageChangeSet { + changes: changes.clone(), + child_changes: child_changes.clone(), + filter: filter.clone(), + child_filters: child_filters.clone(), + })).is_err() + } else { + sink.is_closed() + } + }; + + if should_remove { + Some(subscriber.clone()) + } else { + None + } + }).collect::>(); + + for sub_id in to_remove { + self.remove_subscriber(sub_id); } } @@ -241,6 +281,9 @@ impl StorageNotifications { } } } + if let Some(m) = self.metrics.as_ref() { + m.with_label_values(&[&"removed"]).inc(); + } } } @@ -301,6 +344,11 @@ impl StorageNotifications { // insert sink let (tx, rx) = tracing_unbounded("mpsc_storage_notification_items"); self.sinks.insert(current_id, (tx, keys, child_keys)); + + if let Some(m) = self.metrics.as_ref() { + m.with_label_values(&[&"added"]).inc(); + } + rx } } diff --git a/client/src/client.rs b/client/src/client.rs index 2a8040febf3ef..d5184fb979e0e 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -252,7 +252,7 @@ impl Client where fork_blocks: ForkBlocks, bad_blocks: BadBlocks, execution_extensions: ExecutionExtensions, - _prometheus_registry: Option, + prometheus_registry: Option, ) -> sp_blockchain::Result { if backend.blockchain().header(BlockId::Number(Zero::zero()))?.is_none() { let genesis_storage = build_genesis_storage.build_storage()?; @@ -276,7 +276,7 @@ impl Client where Ok(Client { backend, executor, - storage_notifications: Default::default(), + storage_notifications: Mutex::new(StorageNotifications::new(prometheus_registry)), import_notification_sinks: Default::default(), finality_notification_sinks: Default::default(), importing_block: Default::default(),