Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
fix chain subscription: less boiler plate (#10285)
Browse files Browse the repository at this point in the history
* fix chain subscription: less boiler plate

* fix bad merge
  • Loading branch information
niklasad1 authored Nov 22, 2021
1 parent 7034e6e commit 5acc878
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 113 deletions.
115 changes: 98 additions & 17 deletions client/rpc/src/chain/chain_full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
//! Blockchain API backend for full nodes.
use super::{client_err, ChainBackend, Error};
use crate::{chain::helpers, SubscriptionTaskExecutor};
use crate::SubscriptionTaskExecutor;
use std::{marker::PhantomData, sync::Arc};

use futures::task::Spawn;
use futures::{
stream::{self, Stream, StreamExt},
future,
task::Spawn,
};
use jsonrpsee::ws_server::SubscriptionSink;
use sc_client_api::{BlockBackend, BlockchainEvents};
use sp_blockchain::HeaderBackend;
Expand Down Expand Up @@ -68,27 +72,104 @@ where
}

fn subscribe_all_heads(&self, sink: SubscriptionSink) -> Result<(), Error> {
let client = self.client.clone();
let executor = self.executor.clone();

let fut = helpers::subscribe_headers(client, sink, "chain_subscribeAllHeads");
executor.spawn_obj(Box::pin(fut).into()).map_err(|e| Error::Client(Box::new(e)))
subscribe_headers(
&self.client,
self.executor.clone(),
"chain_subscribeAllHeads",
sink,
|| self.client().info().best_hash,
|| {
self.client()
.import_notification_stream()
.map(|notification| notification.header)
},
)
}

fn subscribe_new_heads(&self, sink: SubscriptionSink) -> Result<(), Error> {
let client = self.client.clone();
let executor = self.executor.clone();

let fut = helpers::subscribe_headers(client, sink, "chain_subscribeNewHeads");
executor.spawn_obj(Box::pin(fut).into()).map_err(|e| Error::Client(Box::new(e)))
subscribe_headers(
&self.client,
self.executor.clone(),
"chain_subscribeNewHeads",
sink,
|| self.client().info().best_hash,
|| {
self.client()
.import_notification_stream()
.filter(|notification| future::ready(notification.is_new_best))
.map(|notification| notification.header)
},
)
}

fn subscribe_finalized_heads(&self, sink: SubscriptionSink) -> Result<(), Error> {
let client = self.client.clone();
let executor = self.executor.clone();
subscribe_headers(
&self.client,
self.executor.clone(),
"chain_subscribeFinalizedHeads",
sink,
|| self.client().info().finalized_hash,
|| {
self.client()
.finality_notification_stream()
.map(|notification| notification.header)
},
)
}
}

/// Subscribe to new headers.
fn subscribe_headers<Block, Client, F, G, S>(
client: &Arc<Client>,
executor: SubscriptionTaskExecutor,
method: &'static str,
mut sink: SubscriptionSink,
best_block_hash: G,
stream: F,
) -> Result<(), Error>
where
Block: BlockT + 'static,
Block::Header: Unpin,
Client: HeaderBackend<Block> + 'static,
F: FnOnce() -> S,
G: FnOnce() -> Block::Hash,
S: Stream<Item = Block::Header> + Send + 'static,
{
// send current head right at the start.
let maybe_header = client
.header(BlockId::Hash(best_block_hash()))
.map_err(client_err)
.and_then(|header| {
header.ok_or_else(|| Error::Other("Best header missing.".to_string()))
})
.map_err(|e| {
log::warn!("Best header error {:?}", e);
e
})
.ok();

// send further subscriptions
let stream = stream();

// NOTE: by the time we set up the stream there might be a new best block and so there is a risk
// that the stream has a hole in it. The alternative would be to look up the best block *after*
// we set up the stream and chain it to the stream. Consuming code would need to handle
// duplicates at the beginning of the stream though.
let fut = async move {
stream::iter(maybe_header)
.chain(stream)
.take_while(|storage| {
future::ready(sink.send(&storage).map_or_else(
|e| {
log::debug!("Could not send data to subscription: {} error: {:?}", method, e);
false
},
|_| true,
))
})
.for_each(|_| future::ready(()))
.await;
};

let fut =
helpers::subscribe_finalized_headers(client, sink, "chain_subscribeFinalizedHeads");
executor.spawn_obj(Box::pin(fut).into()).map_err(|e| Error::Client(Box::new(e)))
}
}
95 changes: 0 additions & 95 deletions client/rpc/src/chain/helpers.rs

This file was deleted.

1 change: 0 additions & 1 deletion client/rpc/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
//! Substrate blockchain API.
mod chain_full;
mod helpers;

#[cfg(test)]
mod tests;
Expand Down

0 comments on commit 5acc878

Please sign in to comment.