Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
cecton committed Feb 17, 2021
1 parent 286407c commit 01900c0
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 78 deletions.
19 changes: 12 additions & 7 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use sp_runtime::traits::Block as BlockT;
use futures::prelude::*;
use sc_client_api::{ExecutorProvider, RemoteBackend};
use node_executor::Executor;
use sc_telemetry::Telemetry;
use sc_telemetry::ClientTelemetry;

type FullClient = sc_service::TFullClient<Block, RuntimeApi, Executor>;
type FullBackend = sc_service::TFullBackend<Block>;
Expand Down Expand Up @@ -74,6 +74,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen
client.clone(),
);

let telemetry = client.telemetry();
let (grandpa_block_import, grandpa_link) = grandpa::block_import(
client.clone(),
&(client.clone() as Arc<_>),
Expand Down Expand Up @@ -222,6 +223,7 @@ pub fn new_full_base(
);
}

let telemetry = client.telemetry();
let role = config.role.clone();
let force_authoring = config.force_authoring;
let backoff_authoring_blocks =
Expand All @@ -230,7 +232,7 @@ pub fn new_full_base(
let enable_grandpa = !config.disable_grandpa;
let prometheus_registry = config.prometheus_registry().cloned();

let (_rpc_handlers, telemetry) = sc_service::spawn_tasks(
let _rpc_handlers = sc_service::spawn_tasks(
sc_service::SpawnTasksParams {
config,
backend: backend.clone(),
Expand Down Expand Up @@ -367,7 +369,9 @@ pub fn new_full(config: Configuration)
}

pub fn new_light_base(mut config: Configuration) -> Result<(
TaskManager, RpcHandlers, Option<Telemetry>, Arc<LightClient>,
TaskManager,
RpcHandlers,
Arc<LightClient>,
Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
Arc<sc_transaction_pool::LightPool<Block, LightClient, sc_network::config::OnDemand<Block>>>
), ServiceError> {
Expand All @@ -386,11 +390,13 @@ pub fn new_light_base(mut config: Configuration) -> Result<(
on_demand.clone(),
));

let telemetry = client.telemetry();

let (grandpa_block_import, _) = grandpa::block_import(
client.clone(),
&(client.clone() as Arc<_>),
select_chain.clone(),
telemetry.clone(), // TODO hmm how do I get the telemetry here...
telemetry.clone(),
)?;
let justification_import = grandpa_block_import.clone();

Expand Down Expand Up @@ -442,7 +448,7 @@ pub fn new_light_base(mut config: Configuration) -> Result<(

let rpc_extensions = node_rpc::create_light(light_deps);

let (rpc_handlers, telemetry) =
let rpc_handlers =
sc_service::spawn_tasks(sc_service::SpawnTasksParams {
on_demand: Some(on_demand),
remote_blockchain: Some(backend.remote_blockchain()),
Expand All @@ -458,7 +464,6 @@ pub fn new_light_base(mut config: Configuration) -> Result<(
Ok((
task_manager,
rpc_handlers,
telemetry,
client,
network,
transaction_pool,
Expand All @@ -467,7 +472,7 @@ pub fn new_light_base(mut config: Configuration) -> Result<(

/// Builds a new service for a light client.
pub fn new_light(config: Configuration) -> Result<TaskManager, ServiceError> {
new_light_base(config).map(|(task_manager, _, _, _, _, _)| {
new_light_base(config).map(|(task_manager, _, _, _, _)| {
task_manager
})
}
Expand Down
38 changes: 25 additions & 13 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ pub fn new_full_parts<TBl, TRtApi, TExecDisp>(
Some(keystore_container.sync_keystore()),
);

let telemetry = match (config.telemetry_handle.clone(), config.telemetry_endpoints.clone()) {
(Some(mut handle), Some(endpoints)) => Some(handle.new_telemetry(endpoints)),
_ => None,
};

new_client(
db_config,
executor,
Expand All @@ -352,6 +357,7 @@ pub fn new_full_parts<TBl, TRtApi, TExecDisp>(
extensions,
Box::new(task_manager.spawn_handle()),
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
telemetry,
ClientConfig {
offchain_worker_enabled : config.offchain_worker.enabled,
offchain_indexing_api: config.offchain_worker.indexing_enabled,
Expand Down Expand Up @@ -409,12 +415,17 @@ pub fn new_light_parts<TBl, TRtApi, TExecDisp>(
);
let on_demand = Arc::new(sc_network::config::OnDemand::new(fetch_checker));
let backend = sc_light::new_light_backend(light_blockchain);
let telemetry = match (config.telemetry_handle.clone(), config.telemetry_endpoints.clone()) {
(Some(mut handle), Some(endpoints)) => Some(handle.new_telemetry(endpoints)),
_ => None,
};
let client = Arc::new(light::new_light(
backend.clone(),
config.chain_spec.as_storage_builder(),
executor,
Box::new(task_manager.spawn_handle()),
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
telemetry,
)?);

Ok((client, backend, keystore_container, task_manager, on_demand))
Expand All @@ -430,6 +441,7 @@ pub fn new_client<E, Block, RA>(
execution_extensions: ExecutionExtensions<Block>,
spawn_handle: Box<dyn SpawnNamed>,
prometheus_registry: Option<Registry>,
telemetry: Option<Telemetry>,
config: ClientConfig,
) -> Result<(
crate::client::Client<
Expand Down Expand Up @@ -459,6 +471,7 @@ pub fn new_client<E, Block, RA>(
bad_blocks,
execution_extensions,
prometheus_registry,
telemetry,
config,
)?,
backend,
Expand Down Expand Up @@ -538,7 +551,7 @@ pub fn build_offchain_workers<TBl, TBackend, TCl>(
/// Spawn the tasks that are required to run a node.
pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
) -> Result<(RpcHandlers, Option<Telemetry>), Error>
) -> Result<RpcHandlers, Error>
where
TCl: ProvideRuntimeApi<TBl> + HeaderMetadata<TBl, Error=sp_blockchain::Error> + Chain<TBl> +
BlockBackend<TBl> + BlockIdTo<TBl, Error=sp_blockchain::Error> + ProofProvider<TBl> +
Expand Down Expand Up @@ -581,14 +594,15 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
)?;

let telemetry = init_telemetry(
&mut config,
network.clone(),
client.clone(),
);
let telemetry = client.telemetry();

if let Some(telemetry) = telemetry.clone() {
client.set_telemetry(telemetry);
init_telemetry(
&mut config,
network.clone(),
client.clone(),
telemetry.clone(),
);
}

info!("📦 Highest known block at #{}", chain_info.best_number);
Expand Down Expand Up @@ -659,7 +673,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(

task_manager.keep_alive((config.base_path, rpc, rpc_handlers.clone()));

Ok((rpc_handlers, telemetry))
Ok(rpc_handlers)
}

async fn transaction_notifications<TBl, TExPool>(
Expand Down Expand Up @@ -691,8 +705,8 @@ fn init_telemetry<TBl: BlockT, TCl: BlockBackend<TBl>>(
config: &mut Configuration,
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
client: Arc<TCl>,
) -> Option<Telemetry> {
let endpoints = config.telemetry_endpoints.clone()?;
mut telemetry: Telemetry,
) {
let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
let connection_message = ConnectionMessage {
name: config.network.node_name.to_owned(),
Expand All @@ -708,9 +722,7 @@ fn init_telemetry<TBl: BlockT, TCl: BlockBackend<TBl>>(
network_id: network.local_peer_id().to_base58(),
};

config.telemetry_handle
.as_mut()
.map(|handle| handle.start_telemetry(endpoints, connection_message))
telemetry.start_telemetry(connection_message)
}

fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
Expand Down
23 changes: 15 additions & 8 deletions client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub struct Client<B, E, Block, RA> where Block: BlockT {
block_rules: BlockRules<Block>,
execution_extensions: ExecutionExtensions<Block>,
config: ClientConfig,
telemetry: RwLock<Option<Telemetry>>,
telemetry: Option<Telemetry>,
_phantom: PhantomData<RA>,
}

Expand Down Expand Up @@ -153,6 +153,7 @@ pub fn new_in_mem<E, Block, S, RA>(
genesis_storage: &S,
keystore: Option<SyncCryptoStorePtr>,
prometheus_registry: Option<Registry>,
telemetry: Option<Telemetry>,
spawn_handle: Box<dyn SpawnNamed>,
config: ClientConfig,
) -> sp_blockchain::Result<Client<
Expand All @@ -172,6 +173,7 @@ pub fn new_in_mem<E, Block, S, RA>(
keystore,
spawn_handle,
prometheus_registry,
telemetry,
config,
)
}
Expand All @@ -197,6 +199,7 @@ pub fn new_with_backend<B, E, Block, S, RA>(
keystore: Option<SyncCryptoStorePtr>,
spawn_handle: Box<dyn SpawnNamed>,
prometheus_registry: Option<Registry>,
telemetry: Option<Telemetry>,
config: ClientConfig,
) -> sp_blockchain::Result<Client<B, LocalCallExecutor<B, E>, Block, RA>>
where
Expand All @@ -215,6 +218,7 @@ pub fn new_with_backend<B, E, Block, S, RA>(
Default::default(),
extensions,
prometheus_registry,
telemetry,
config,
)
}
Expand Down Expand Up @@ -295,6 +299,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
bad_blocks: BadBlocks<Block>,
execution_extensions: ExecutionExtensions<Block>,
prometheus_registry: Option<Registry>,
telemetry: Option<Telemetry>,
config: ClientConfig,
) -> sp_blockchain::Result<Self> {
if backend.blockchain().header(BlockId::Number(Zero::zero()))?.is_none() {
Expand Down Expand Up @@ -327,7 +332,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
block_rules: BlockRules::new(fork_blocks, bad_blocks),
execution_extensions,
config,
telemetry: RwLock::new(None),
telemetry,
_phantom: Default::default(),
})
}
Expand Down Expand Up @@ -671,7 +676,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
rand::thread_rng().gen_bool(0.1)
{
telemetry!(
self.telemetry.read().clone(); SUBSTRATE_INFO; "block.import";
self.telemetry.clone(); SUBSTRATE_INFO; "block.import";
"height" => height,
"best" => ?hash,
"origin" => ?origin
Expand Down Expand Up @@ -988,11 +993,11 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
let header = self.header(&BlockId::Hash(*last))?
.expect(
"Header already known to exist in DB because it is \
indicated in the tree route; qed"
indicated in the tree route; qed"
);

telemetry!(
self.telemetry.read().clone(); SUBSTRATE_INFO; "notify.finalized";
self.telemetry.clone(); SUBSTRATE_INFO; "notify.finalized";
"height" => format!("{}", header.number()),
"best" => ?last,
);
Expand All @@ -1002,7 +1007,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
let header = self.header(&BlockId::Hash(finalized_hash))?
.expect(
"Header already known to exist in DB because it is \
indicated in the tree route; qed"
indicated in the tree route; qed"
);

let notification = FinalityNotification {
Expand Down Expand Up @@ -2002,9 +2007,11 @@ impl<BE, E, B, RA> sp_consensus::block_validation::Chain<B> for Client<BE, E, B,

impl<BE, E, B, RA> ClientTelemetry for Client<BE, E, B, RA>
where
BE: backend::Backend<B>,
E: CallExecutor<B>,
B: BlockT,
{
fn set_telemetry(&self, telemetry: Telemetry) {
*self.telemetry.write() = Some(telemetry);
fn telemetry(&self) -> Option<Telemetry> {
self.telemetry.clone()
}
}
3 changes: 3 additions & 0 deletions client/service/src/client/light.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::Arc;

use sc_executor::RuntimeInfo;
use sp_core::traits::{CodeExecutor, SpawnNamed};
use sc_telemetry::Telemetry;
use sp_runtime::BuildStorage;
use sp_runtime::traits::{Block as BlockT, HashFor};
use sp_blockchain::Result as ClientResult;
Expand All @@ -38,6 +39,7 @@ pub fn new_light<B, S, RA, E>(
code_executor: E,
spawn_handle: Box<dyn SpawnNamed>,
prometheus_registry: Option<Registry>,
telemetry: Option<Telemetry>,
) -> ClientResult<
Client<
Backend<S, HashFor<B>>,
Expand Down Expand Up @@ -69,6 +71,7 @@ pub fn new_light<B, S, RA, E>(
Default::default(),
Default::default(),
prometheus_registry,
telemetry,
ClientConfig::default(),
)
}
Loading

0 comments on commit 01900c0

Please sign in to comment.