Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

fix chain subscription: less boiler plate #10285

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 98 additions & 17 deletions client/rpc/src/chain/chain_full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
//! Blockchain API backend for full nodes.

use super::{client_err, ChainBackend, Error};
use crate::{chain::helpers, SubscriptionTaskExecutor};
use crate::SubscriptionTaskExecutor;
use std::{marker::PhantomData, sync::Arc};

use futures::task::Spawn;
use futures::{
stream::{self, Stream, StreamExt},
future,
task::Spawn,
};
use jsonrpsee::ws_server::SubscriptionSink;
use sc_client_api::{BlockBackend, BlockchainEvents};
use sp_blockchain::HeaderBackend;
Expand Down Expand Up @@ -68,27 +72,104 @@ where
}

fn subscribe_all_heads(&self, sink: SubscriptionSink) -> Result<(), Error> {
let client = self.client.clone();
let executor = self.executor.clone();

let fut = helpers::subscribe_headers(client, sink, "chain_subscribeAllHeads");
executor.spawn_obj(Box::pin(fut).into()).map_err(|e| Error::Client(Box::new(e)))
subscribe_headers(
&self.client,
self.executor.clone(),
"chain_subscribeAllHeads",
sink,
|| self.client().info().best_hash,
|| {
self.client()
.import_notification_stream()
.map(|notification| notification.header)
},
)
}

fn subscribe_new_heads(&self, sink: SubscriptionSink) -> Result<(), Error> {
let client = self.client.clone();
let executor = self.executor.clone();

let fut = helpers::subscribe_headers(client, sink, "chain_subscribeNewHeads");
executor.spawn_obj(Box::pin(fut).into()).map_err(|e| Error::Client(Box::new(e)))
subscribe_headers(
&self.client,
self.executor.clone(),
"chain_subscribeNewHeads",
sink,
|| self.client().info().best_hash,
|| {
self.client()
.import_notification_stream()
.filter(|notification| future::ready(notification.is_new_best))
.map(|notification| notification.header)
},
)
}

fn subscribe_finalized_heads(&self, sink: SubscriptionSink) -> Result<(), Error> {
let client = self.client.clone();
let executor = self.executor.clone();
subscribe_headers(
&self.client,
self.executor.clone(),
"chain_subscribeFinalizedHeads",
sink,
|| self.client().info().finalized_hash,
|| {
self.client()
.finality_notification_stream()
.map(|notification| notification.header)
},
)
}
}

/// Subscribe to new headers.
fn subscribe_headers<Block, Client, F, G, S>(
client: &Arc<Client>,
executor: SubscriptionTaskExecutor,
method: &'static str,
mut sink: SubscriptionSink,
best_block_hash: G,
stream: F,
) -> Result<(), Error>
where
Block: BlockT + 'static,
Block::Header: Unpin,
Client: HeaderBackend<Block> + 'static,
F: FnOnce() -> S,
G: FnOnce() -> Block::Hash,
S: Stream<Item = Block::Header> + Send + 'static,
{
// send current head right at the start.
let maybe_header = client
.header(BlockId::Hash(best_block_hash()))
.map_err(client_err)
.and_then(|header| {
header.ok_or_else(|| Error::Other("Best header missing.".to_string()))
})
.map_err(|e| {
log::warn!("Best header error {:?}", e);
e
})
.ok();

// send further subscriptions
let stream = stream();

// NOTE: by the time we set up the stream there might be a new best block and so there is a risk
// that the stream has a hole in it. The alternative would be to look up the best block *after*
// we set up the stream and chain it to the stream. Consuming code would need to handle
// duplicates at the beginning of the stream though.
let fut = async move {
stream::iter(maybe_header)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only send the result if maybe_header.is_some() it should be infallible to get the best_block AFAIU....

More similar to how substrate master behaves FWIW.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be infallible to get the best_block AFAIU....

Well, it's a db lookup right? So sure, if it fails the node has bigger problems, but it's not infallible.

Not sure if you suggest we should get rid of the comment about "holes" in the stream, but I don't think this new code changes that. If there's a new block between when we lookup the best block (i.e. resolve maybe_header) and the time we read from the stream, the block we sent as "best" might not be immediately precedent to the next one we send. I don't think this is a catastrophic flaw though, more like something to be aware of.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it's a db lookup right? So sure, if it fails the node has bigger problems, but it's not infallible.

Yeah, you right I was under impression that it was cached.

My point was that before the error was sent over the channel now it's just logged and filtered out via stream:iter(None).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus, the comment is still "accurate"

.chain(stream)
.take_while(|storage| {
future::ready(sink.send(&storage).map_or_else(
|e| {
log::debug!("Could not send data to subscription: {} error: {:?}", method, e);
false
},
|_| true,
))
})
.for_each(|_| future::ready(()))
.await;
};

let fut =
helpers::subscribe_finalized_headers(client, sink, "chain_subscribeFinalizedHeads");
executor.spawn_obj(Box::pin(fut).into()).map_err(|e| Error::Client(Box::new(e)))
}
}
95 changes: 0 additions & 95 deletions client/rpc/src/chain/helpers.rs

This file was deleted.

1 change: 0 additions & 1 deletion client/rpc/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
//! Substrate blockchain API.

mod chain_full;
mod helpers;

#[cfg(test)]
mod tests;
Expand Down