From f0e75f90bd1e8daf858455dd82c19bc4ce433b5f Mon Sep 17 00:00:00 2001 From: Parth Desai Date: Tue, 31 Oct 2023 13:09:01 +0400 Subject: [PATCH] add relayer tasks in the node builder + start xdm worker in node builder instead of domain --- Cargo.lock | 1 + node/Cargo.toml | 1 + node/src/domains/builder.rs | 20 ++-- node/src/domains/domain_instance_starter.rs | 28 +---- node/src/lib.rs | 109 +++++++++++++++----- 5 files changed, 99 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb80e7fb..dfadf434 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9842,6 +9842,7 @@ dependencies = [ "derivative", "derive_builder 0.12.0", "derive_more", + "domain-client-message-relayer", "domain-client-operator", "domain-eth-service", "domain-runtime-primitives", diff --git a/node/Cargo.toml b/node/Cargo.toml index 05d7795a..c59ae75b 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -11,6 +11,7 @@ cross-domain-message-gossip = { git = "https://github.com/subspace/subspace", re derivative = "2.2.0" derive_builder = "0.12" derive_more = "0.99" +domain-client-message-relayer = { git = "https://github.com/subspace/subspace", rev = "aa2ca80b42b460dd73c1567057d6c07151a626dd" } domain-client-operator = { git = "https://github.com/subspace/subspace", rev = "aa2ca80b42b460dd73c1567057d6c07151a626dd" } domain-eth-service = { git = "https://github.com/subspace/subspace", rev = "aa2ca80b42b460dd73c1567057d6c07151a626dd" } domain-runtime-primitives = { git = "https://github.com/subspace/subspace", rev = "aa2ca80b42b460dd73c1567057d6c07151a626dd" } diff --git a/node/src/domains/builder.rs b/node/src/domains/builder.rs index ba9fa40a..050adbf8 100644 --- a/node/src/domains/builder.rs +++ b/node/src/domains/builder.rs @@ -2,6 +2,7 @@ use std::path::Path; use std::sync::Arc; use anyhow::{anyhow, Result}; +use cross_domain_message_gossip::Message; use derivative::Derivative; use derive_builder::Builder; use domain_client_operator::{BootstrapResult, Bootstrapper}; @@ -12,14 +13,13 @@ use sc_consensus_subspace::notification::SubspaceNotificationStream; use sc_consensus_subspace::{BlockImportingNotification, NewSlotNotification}; use sc_transaction_pool::FullPool; use sc_transaction_pool_api::OffchainTransactionPoolFactory; -use sc_utils::mpsc::tracing_unbounded; +use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender}; use sdk_substrate::{Base, BaseBuilder}; use sdk_utils::chain_spec::get_account_id_from_seed; use sdk_utils::{DestructorSet, TaskOutput}; use serde::{Deserialize, Serialize}; use sp_core::crypto::AccountId32; use sp_domains::DomainId; -use sp_runtime::traits::Block as BlockT; use subspace_runtime::RuntimeApi as CRuntimeApi; use subspace_runtime_primitives::opaque::Block as CBlock; use subspace_service::FullClient as CFullClient; @@ -39,14 +39,15 @@ pub struct ConsensusNodeLink { SubspaceNotificationStream>, /// New slot notification stream for consensus chain pub new_slot_notification_stream: SubspaceNotificationStream, - /// Reference to the consensus node network service - pub consensus_network_service: - Arc::Hash>>, /// Reference to the consensus node's network sync service pub consensus_sync_service: Arc>, /// Consensus tx pool pub consensus_transaction_pool: Arc>>, + /// Cross domain message gossip worker's message sink + pub gossip_message_sink: TracingUnboundedSender, + /// Cross domain message receiver for the domain + pub domain_message_receiver: TracingUnboundedReceiver>, } /// Domain node configuration @@ -164,9 +165,10 @@ impl DomainConfig { consensus_client, block_importing_notification_stream, new_slot_notification_stream, - consensus_network_service, consensus_sync_service, consensus_transaction_pool, + gossip_message_sink, + domain_message_receiver, } = consensus_node_link; let printable_domain_id: u32 = self.domain_id.into(); let mut destructor_set = @@ -336,9 +338,6 @@ impl DomainConfig { let service_config = self.base.configuration(domains_directory, domain_spec).await; - let (domain_message_sink, domain_message_receiver) = - tracing_unbounded("domain_message_channel", 100); - let domain_starter = DomainInstanceStarter { service_config, domain_id: self.domain_id, @@ -347,12 +346,11 @@ impl DomainConfig { consensus_client, block_importing_notification_stream, new_slot_notification_stream, - consensus_network_service, consensus_sync_service, consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory::new( consensus_transaction_pool.clone(), ), - domain_message_sink, + gossip_message_sink, domain_message_receiver, }; diff --git a/node/src/domains/domain_instance_starter.rs b/node/src/domains/domain_instance_starter.rs index 2f3474f0..5c070907 100644 --- a/node/src/domains/domain_instance_starter.rs +++ b/node/src/domains/domain_instance_starter.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use cross_domain_message_gossip::GossipWorkerBuilder; use domain_client_operator::OperatorStreams; use domain_eth_service::provider::EthProvider; use domain_eth_service::DefaultEthConfig; @@ -13,10 +12,8 @@ use sc_consensus_subspace::{BlockImportingNotification, NewSlotNotification}; use sc_service::{BasePath, Configuration, RpcHandlers}; use sc_transaction_pool_api::OffchainTransactionPoolFactory; use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender}; -use sp_core::traits::SpawnEssentialNamed; use sp_domains::{DomainId, RuntimeType}; -use sp_messenger::messages::ChainId; -use sp_runtime::traits::{Block as BlockT, NumberFor}; +use sp_runtime::traits::NumberFor; use subspace_runtime::RuntimeApi as CRuntimeApi; use subspace_runtime_primitives::opaque::Block as CBlock; use subspace_service::FullClient as CFullClient; @@ -37,12 +34,10 @@ pub struct DomainInstanceStarter { pub block_importing_notification_stream: SubspaceNotificationStream>, pub new_slot_notification_stream: SubspaceNotificationStream, - pub consensus_network_service: - Arc::Hash>>, pub consensus_sync_service: Arc>, pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory, - pub domain_message_sink: TracingUnboundedSender>, pub domain_message_receiver: TracingUnboundedReceiver>, + pub gossip_message_sink: TracingUnboundedSender, } impl DomainInstanceStarter { @@ -59,11 +54,10 @@ impl DomainInstanceStarter { consensus_client, block_importing_notification_stream, new_slot_notification_stream, - consensus_network_service, consensus_sync_service, consensus_offchain_tx_pool_factory, domain_message_receiver, - domain_message_sink, + gossip_message_sink, } = self; let block_importing_notification_stream = || { @@ -98,8 +92,6 @@ impl DomainInstanceStarter { match runtime_type { RuntimeType::Evm => { - let mut xdm_gossip_worker_builder = GossipWorkerBuilder::new(); - let eth_provider = EthProvider::< evm_domain_runtime::TransactionConverter, DefaultEthConfig< @@ -123,7 +115,7 @@ impl DomainInstanceStarter { consensus_offchain_tx_pool_factory, consensus_network_sync_oracle: consensus_sync_service.clone(), operator_streams, - gossip_message_sink: xdm_gossip_worker_builder.gossip_msg_sink(), + gossip_message_sink, domain_message_receiver, provider: eth_provider, }; @@ -143,18 +135,6 @@ impl DomainInstanceStarter { .await .map_err(anyhow::Error::new)?; - xdm_gossip_worker_builder - .push_chain_tx_pool_sink(ChainId::Domain(domain_id), domain_message_sink); - - let cross_domain_message_gossip_worker = xdm_gossip_worker_builder - .build::(consensus_network_service, consensus_sync_service); - - domain_node.task_manager.spawn_essential_handle().spawn_essential_blocking( - "cross-domain-gossip-message-worker", - None, - Box::pin(cross_domain_message_gossip_worker.run()), - ); - let domain_start_join_handle = sdk_utils::task_spawn( format!("domain-{}/start-domain", >::into(domain_id)), async move { diff --git a/node/src/lib.rs b/node/src/lib.rs index e6e4a8d0..81d9af87 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context; +use cross_domain_message_gossip::GossipWorkerBuilder; use derivative::Derivative; use frame_system::pallet_prelude::BlockNumberFor; use futures::{FutureExt, Stream, StreamExt}; @@ -24,11 +25,14 @@ use sc_network::network_state::NetworkState; use sc_network::{NetworkService, NetworkStateInfo, SyncState}; use sc_rpc_api::state::StateApiClient; use sc_service::Configuration; +use sc_utils::mpsc::tracing_unbounded; use sdk_dsn::{DsnOptions, DsnShared}; use sdk_traits::Farmer; use sdk_utils::{DestructorSet, MultiaddrWithPeerId, PublicKey, TaskOutput}; use sp_consensus::SyncOracle; use sp_consensus_subspace::digests::PreDigest; +use sp_core::traits::SpawnEssentialNamed; +use sp_messenger::messages::ChainId; use sp_runtime::DigestItem; use subspace_core_primitives::{HistorySize, SegmentIndex}; use subspace_farmer::node_client::NodeClient; @@ -39,7 +43,7 @@ use subspace_networking::{ }; use subspace_rpc_primitives::MAX_SEGMENT_HEADERS_PER_REQUEST; use subspace_runtime::RuntimeApi; -use subspace_runtime_primitives::opaque::{Block as RuntimeBlock, Header}; +use subspace_runtime_primitives::opaque::{Block as OpaqueBlock, Header}; use subspace_service::SubspaceConfiguration; use tokio::sync::oneshot; @@ -238,6 +242,83 @@ impl Config { let mut destructors = DestructorSet::new("node-destructors"); + let mut maybe_domain = None; + if let Some(domain_config) = self.domain { + let base_directory = directory.as_ref().to_owned().clone(); + + let mut xdm_gossip_worker_builder = GossipWorkerBuilder::new(); + + let relayer_worker = + domain_client_message_relayer::worker::relay_consensus_chain_messages( + client.clone(), + sync_service.clone(), + xdm_gossip_worker_builder.gossip_msg_sink(), + ); + + task_manager.spawn_essential_handle().spawn_essential_blocking( + "consensus-chain-relayer", + None, + Box::pin(relayer_worker), + ); + + let (consensus_msg_sink, consensus_msg_receiver) = + tracing_unbounded("consensus_message_channel", 100); + + // Start cross domain message listener for Consensus chain to receive messages + // from domains in the network + let consensus_listener = + cross_domain_message_gossip::start_cross_chain_message_listener( + ChainId::Consensus, + client.clone(), + transaction_pool.clone(), + consensus_msg_receiver, + ); + + task_manager.spawn_essential_handle().spawn_essential_blocking( + "consensus-message-listener", + None, + Box::pin(consensus_listener), + ); + + xdm_gossip_worker_builder + .push_chain_tx_pool_sink(ChainId::Consensus, consensus_msg_sink); + + let (domain_message_sink, domain_message_receiver) = + tracing_unbounded("domain_message_channel", 100); + + xdm_gossip_worker_builder.push_chain_tx_pool_sink( + ChainId::Domain(domain_config.domain_id), + domain_message_sink, + ); + + let domain = domain_config + .build( + base_directory, + ConsensusNodeLink { + consensus_client: client.clone(), + block_importing_notification_stream: block_importing_notification_stream + .clone(), + new_slot_notification_stream: new_slot_notification_stream.clone(), + consensus_sync_service: sync_service.clone(), + consensus_transaction_pool: transaction_pool.clone(), + gossip_message_sink: xdm_gossip_worker_builder.gossip_msg_sink(), + domain_message_receiver, + }, + ) + .await?; + + let cross_domain_message_gossip_worker = xdm_gossip_worker_builder + .build::(network_service.clone(), sync_service.clone()); + + task_manager.spawn_essential_handle().spawn_essential_blocking( + "cross-domain-gossip-message-worker", + None, + Box::pin(cross_domain_message_gossip_worker.run()), + ); + + maybe_domain = Some(domain); + } + let (task_manager_drop_sender, task_manager_drop_receiver) = oneshot::channel(); let (task_manager_result_sender, task_manager_result_receiver) = oneshot::channel(); let task_manager_join_handle = sdk_utils::task_spawn( @@ -266,28 +347,6 @@ impl Config { } })?; - let mut maybe_domain = None; - if let Some(domain_config) = self.domain { - let base_directory = directory.as_ref().to_owned().clone(); - - let domain = domain_config - .build( - base_directory, - ConsensusNodeLink { - consensus_client: client.clone(), - block_importing_notification_stream: block_importing_notification_stream - .clone(), - new_slot_notification_stream: new_slot_notification_stream.clone(), - consensus_network_service: network_service.clone(), - consensus_sync_service: sync_service.clone(), - consensus_transaction_pool: transaction_pool.clone(), - }, - ) - .await?; - - maybe_domain = Some(domain); - } - let rpc_handle = sdk_utils::Rpc::new(&rpc_handlers); network_starter.start_network(); @@ -359,9 +418,9 @@ pub struct Node { #[derivative(Debug = "ignore")] client: Arc, #[derivative(Debug = "ignore")] - sync_service: Arc>, + sync_service: Arc>, #[derivative(Debug = "ignore")] - network_service: Arc>, + network_service: Arc>, rpc_handle: sdk_utils::Rpc, name: String, dsn: DsnShared,