Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Fix leak in stream notifications (#5739)
Browse files Browse the repository at this point in the history
  • Loading branch information
gnunicorn authored Apr 23, 2020
1 parent 262429d commit 1c4f001
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sc-telemetry = { version = "2.0.0-dev", path = "../telemetry" }
sp-trie = { version = "2.0.0-dev", path = "../../primitives/trie" }
sp-storage = { version = "2.0.0-dev", path = "../../primitives/storage" }
sp-transaction-pool = { version = "2.0.0-dev", path = "../../primitives/transaction-pool" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.8.0-dev", path = "../../utils/prometheus" }

[dev-dependencies]
sp-test-primitives = { version = "2.0.0-dev", path = "../../primitives/test-primitives" }
80 changes: 64 additions & 16 deletions client/api/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use fnv::{FnvHashSet, FnvHashMap};
use sp_core::storage::{StorageKey, StorageData};
use sp_runtime::traits::Block as BlockT;
use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded};
use prometheus_endpoint::{Registry, CounterVec, Opts, U64, register};

/// Storage change set
#[derive(Debug)]
Expand Down Expand Up @@ -71,9 +72,12 @@ pub type StorageEventStream<H> = TracingUnboundedReceiver<(H, StorageChangeSet)>

type SubscriberId = u64;

type SubscribersGauge = CounterVec<U64>;

/// Manages storage listeners.
#[derive(Debug)]
pub struct StorageNotifications<Block: BlockT> {
metrics: Option<SubscribersGauge>,
next_id: SubscriberId,
wildcard_listeners: FnvHashSet<SubscriberId>,
listeners: HashMap<StorageKey, FnvHashSet<SubscriberId>>,
Expand All @@ -90,7 +94,8 @@ pub struct StorageNotifications<Block: BlockT> {

impl<Block: BlockT> Default for StorageNotifications<Block> {
fn default() -> Self {
StorageNotifications {
Self {
metrics: Default::default(),
next_id: Default::default(),
wildcard_listeners: Default::default(),
listeners: Default::default(),
Expand All @@ -101,6 +106,29 @@ impl<Block: BlockT> Default for StorageNotifications<Block> {
}

impl<Block: BlockT> StorageNotifications<Block> {
/// Initialize a new StorageNotifications
/// optionally pass a prometheus registry to send subscriber metrics to
pub fn new(prometheus_registry: Option<Registry>) -> Self {
let metrics = prometheus_registry.and_then(|r|
CounterVec::new(
Opts::new(
"storage_notification_subscribers",
"Number of subscribers in storage notification sytem"
),
&["action"], //added | removed
).and_then(|g| register(g, &r))
.ok()
);

StorageNotifications {
metrics,
next_id: Default::default(),
wildcard_listeners: Default::default(),
listeners: Default::default(),
child_listeners: Default::default(),
sinks: Default::default(),
}
}
/// Trigger notification to all listeners.
///
/// Note the changes are going to be filtered by listener's filter key.
Expand All @@ -113,6 +141,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
Item=(Vec<u8>, impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)>)
>,
) {

let has_wildcard = !self.wildcard_listeners.is_empty();

// early exit if no listeners
Expand Down Expand Up @@ -169,21 +198,32 @@ impl<Block: BlockT> StorageNotifications<Block> {
let changes = Arc::new(changes);
let child_changes = Arc::new(child_changes);
// Trigger the events
for subscriber in subscribers {
let should_remove = {
let &(ref sink, ref filter, ref child_filters) = self.sinks.get(&subscriber)
.expect("subscribers returned from self.listeners are always in self.sinks; qed");
sink.unbounded_send((hash.clone(), StorageChangeSet {
changes: changes.clone(),
child_changes: child_changes.clone(),
filter: filter.clone(),
child_filters: child_filters.clone(),
})).is_err()
};

if should_remove {
self.remove_subscriber(subscriber);
}

let to_remove = self.sinks
.iter()
.filter_map(|(subscriber, &(ref sink, ref filter, ref child_filters))| {
let should_remove = {
if subscribers.contains(subscriber) {
sink.unbounded_send((hash.clone(), StorageChangeSet {
changes: changes.clone(),
child_changes: child_changes.clone(),
filter: filter.clone(),
child_filters: child_filters.clone(),
})).is_err()
} else {
sink.is_closed()
}
};

if should_remove {
Some(subscriber.clone())
} else {
None
}
}).collect::<Vec<_>>();

for sub_id in to_remove {
self.remove_subscriber(sub_id);
}
}

Expand Down Expand Up @@ -241,6 +281,9 @@ impl<Block: BlockT> StorageNotifications<Block> {
}
}
}
if let Some(m) = self.metrics.as_ref() {
m.with_label_values(&[&"removed"]).inc();
}
}
}

Expand Down Expand Up @@ -301,6 +344,11 @@ impl<Block: BlockT> StorageNotifications<Block> {
// insert sink
let (tx, rx) = tracing_unbounded("mpsc_storage_notification_items");
self.sinks.insert(current_id, (tx, keys, child_keys));

if let Some(m) = self.metrics.as_ref() {
m.with_label_values(&[&"added"]).inc();
}

rx
}
}
Expand Down
4 changes: 2 additions & 2 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
fork_blocks: ForkBlocks<Block>,
bad_blocks: BadBlocks<Block>,
execution_extensions: ExecutionExtensions<Block>,
_prometheus_registry: Option<Registry>,
prometheus_registry: Option<Registry>,
) -> sp_blockchain::Result<Self> {
if backend.blockchain().header(BlockId::Number(Zero::zero()))?.is_none() {
let genesis_storage = build_genesis_storage.build_storage()?;
Expand All @@ -276,7 +276,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
Ok(Client {
backend,
executor,
storage_notifications: Default::default(),
storage_notifications: Mutex::new(StorageNotifications::new(prometheus_registry)),
import_notification_sinks: Default::default(),
finality_notification_sinks: Default::default(),
importing_block: Default::default(),
Expand Down

0 comments on commit 1c4f001

Please sign in to comment.