Skip to content
This repository has been archived by the owner on Feb 15, 2024. It is now read-only.

Commit

Permalink
add relayer tasks in the node builder + start xdm worker in node buil…
Browse files Browse the repository at this point in the history
…der instead of domain
  • Loading branch information
ParthDesai committed Oct 31, 2023
1 parent 2d1dbb3 commit f0e75f9
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 60 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ cross-domain-message-gossip = { git = "https://github.com/subspace/subspace", re
derivative = "2.2.0"
derive_builder = "0.12"
derive_more = "0.99"
domain-client-message-relayer = { git = "https://github.com/subspace/subspace", rev = "aa2ca80b42b460dd73c1567057d6c07151a626dd" }
domain-client-operator = { git = "https://github.com/subspace/subspace", rev = "aa2ca80b42b460dd73c1567057d6c07151a626dd" }
domain-eth-service = { git = "https://github.com/subspace/subspace", rev = "aa2ca80b42b460dd73c1567057d6c07151a626dd" }
domain-runtime-primitives = { git = "https://github.com/subspace/subspace", rev = "aa2ca80b42b460dd73c1567057d6c07151a626dd" }
Expand Down
20 changes: 9 additions & 11 deletions node/src/domains/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::path::Path;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use cross_domain_message_gossip::Message;
use derivative::Derivative;
use derive_builder::Builder;
use domain_client_operator::{BootstrapResult, Bootstrapper};
Expand All @@ -12,14 +13,13 @@ use sc_consensus_subspace::notification::SubspaceNotificationStream;
use sc_consensus_subspace::{BlockImportingNotification, NewSlotNotification};
use sc_transaction_pool::FullPool;
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sc_utils::mpsc::tracing_unbounded;
use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
use sdk_substrate::{Base, BaseBuilder};
use sdk_utils::chain_spec::get_account_id_from_seed;
use sdk_utils::{DestructorSet, TaskOutput};
use serde::{Deserialize, Serialize};
use sp_core::crypto::AccountId32;
use sp_domains::DomainId;
use sp_runtime::traits::Block as BlockT;
use subspace_runtime::RuntimeApi as CRuntimeApi;
use subspace_runtime_primitives::opaque::Block as CBlock;
use subspace_service::FullClient as CFullClient;
Expand All @@ -39,14 +39,15 @@ pub struct ConsensusNodeLink {
SubspaceNotificationStream<BlockImportingNotification<CBlock>>,
/// New slot notification stream for consensus chain
pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
/// Reference to the consensus node network service
pub consensus_network_service:
Arc<sc_network::NetworkService<CBlock, <CBlock as BlockT>::Hash>>,
/// Reference to the consensus node's network sync service
pub consensus_sync_service: Arc<sc_network_sync::SyncingService<CBlock>>,
/// Consensus tx pool
pub consensus_transaction_pool:
Arc<FullPool<CBlock, CFullClient<CRuntimeApi, CExecutorDispatch>>>,
/// Cross domain message gossip worker's message sink
pub gossip_message_sink: TracingUnboundedSender<Message>,
/// Cross domain message receiver for the domain
pub domain_message_receiver: TracingUnboundedReceiver<Vec<u8>>,
}

/// Domain node configuration
Expand Down Expand Up @@ -164,9 +165,10 @@ impl DomainConfig {
consensus_client,
block_importing_notification_stream,
new_slot_notification_stream,
consensus_network_service,
consensus_sync_service,
consensus_transaction_pool,
gossip_message_sink,
domain_message_receiver,
} = consensus_node_link;
let printable_domain_id: u32 = self.domain_id.into();
let mut destructor_set =
Expand Down Expand Up @@ -336,9 +338,6 @@ impl DomainConfig {
let service_config =
self.base.configuration(domains_directory, domain_spec).await;

let (domain_message_sink, domain_message_receiver) =
tracing_unbounded("domain_message_channel", 100);

let domain_starter = DomainInstanceStarter {
service_config,
domain_id: self.domain_id,
Expand All @@ -347,12 +346,11 @@ impl DomainConfig {
consensus_client,
block_importing_notification_stream,
new_slot_notification_stream,
consensus_network_service,
consensus_sync_service,
consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(
consensus_transaction_pool.clone(),
),
domain_message_sink,
gossip_message_sink,
domain_message_receiver,
};

Expand Down
28 changes: 4 additions & 24 deletions node/src/domains/domain_instance_starter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use cross_domain_message_gossip::GossipWorkerBuilder;
use domain_client_operator::OperatorStreams;
use domain_eth_service::provider::EthProvider;
use domain_eth_service::DefaultEthConfig;
Expand All @@ -13,10 +12,8 @@ use sc_consensus_subspace::{BlockImportingNotification, NewSlotNotification};
use sc_service::{BasePath, Configuration, RpcHandlers};
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
use sp_core::traits::SpawnEssentialNamed;
use sp_domains::{DomainId, RuntimeType};
use sp_messenger::messages::ChainId;
use sp_runtime::traits::{Block as BlockT, NumberFor};
use sp_runtime::traits::NumberFor;
use subspace_runtime::RuntimeApi as CRuntimeApi;
use subspace_runtime_primitives::opaque::Block as CBlock;
use subspace_service::FullClient as CFullClient;
Expand All @@ -37,12 +34,10 @@ pub struct DomainInstanceStarter {
pub block_importing_notification_stream:
SubspaceNotificationStream<BlockImportingNotification<CBlock>>,
pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
pub consensus_network_service:
Arc<sc_network::NetworkService<CBlock, <CBlock as BlockT>::Hash>>,
pub consensus_sync_service: Arc<sc_network_sync::SyncingService<CBlock>>,
pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
pub domain_message_sink: TracingUnboundedSender<Vec<u8>>,
pub domain_message_receiver: TracingUnboundedReceiver<Vec<u8>>,
pub gossip_message_sink: TracingUnboundedSender<cross_domain_message_gossip::Message>,
}

impl DomainInstanceStarter {
Expand All @@ -59,11 +54,10 @@ impl DomainInstanceStarter {
consensus_client,
block_importing_notification_stream,
new_slot_notification_stream,
consensus_network_service,
consensus_sync_service,
consensus_offchain_tx_pool_factory,
domain_message_receiver,
domain_message_sink,
gossip_message_sink,
} = self;

let block_importing_notification_stream = || {
Expand Down Expand Up @@ -98,8 +92,6 @@ impl DomainInstanceStarter {

match runtime_type {
RuntimeType::Evm => {
let mut xdm_gossip_worker_builder = GossipWorkerBuilder::new();

let eth_provider = EthProvider::<
evm_domain_runtime::TransactionConverter,
DefaultEthConfig<
Expand All @@ -123,7 +115,7 @@ impl DomainInstanceStarter {
consensus_offchain_tx_pool_factory,
consensus_network_sync_oracle: consensus_sync_service.clone(),
operator_streams,
gossip_message_sink: xdm_gossip_worker_builder.gossip_msg_sink(),
gossip_message_sink,
domain_message_receiver,
provider: eth_provider,
};
Expand All @@ -143,18 +135,6 @@ impl DomainInstanceStarter {
.await
.map_err(anyhow::Error::new)?;

xdm_gossip_worker_builder
.push_chain_tx_pool_sink(ChainId::Domain(domain_id), domain_message_sink);

let cross_domain_message_gossip_worker = xdm_gossip_worker_builder
.build::<CBlock, _, _>(consensus_network_service, consensus_sync_service);

domain_node.task_manager.spawn_essential_handle().spawn_essential_blocking(
"cross-domain-gossip-message-worker",
None,
Box::pin(cross_domain_message_gossip_worker.run()),
);

let domain_start_join_handle = sdk_utils::task_spawn(
format!("domain-{}/start-domain", <DomainId as Into<u32>>::into(domain_id)),
async move {
Expand Down
109 changes: 84 additions & 25 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use cross_domain_message_gossip::GossipWorkerBuilder;
use derivative::Derivative;
use frame_system::pallet_prelude::BlockNumberFor;
use futures::{FutureExt, Stream, StreamExt};
Expand All @@ -24,11 +25,14 @@ use sc_network::network_state::NetworkState;
use sc_network::{NetworkService, NetworkStateInfo, SyncState};
use sc_rpc_api::state::StateApiClient;
use sc_service::Configuration;
use sc_utils::mpsc::tracing_unbounded;
use sdk_dsn::{DsnOptions, DsnShared};
use sdk_traits::Farmer;
use sdk_utils::{DestructorSet, MultiaddrWithPeerId, PublicKey, TaskOutput};
use sp_consensus::SyncOracle;
use sp_consensus_subspace::digests::PreDigest;
use sp_core::traits::SpawnEssentialNamed;
use sp_messenger::messages::ChainId;
use sp_runtime::DigestItem;
use subspace_core_primitives::{HistorySize, SegmentIndex};
use subspace_farmer::node_client::NodeClient;
Expand All @@ -39,7 +43,7 @@ use subspace_networking::{
};
use subspace_rpc_primitives::MAX_SEGMENT_HEADERS_PER_REQUEST;
use subspace_runtime::RuntimeApi;
use subspace_runtime_primitives::opaque::{Block as RuntimeBlock, Header};
use subspace_runtime_primitives::opaque::{Block as OpaqueBlock, Header};
use subspace_service::SubspaceConfiguration;
use tokio::sync::oneshot;

Expand Down Expand Up @@ -238,6 +242,83 @@ impl<F: Farmer + 'static> Config<F> {

let mut destructors = DestructorSet::new("node-destructors");

let mut maybe_domain = None;
if let Some(domain_config) = self.domain {
let base_directory = directory.as_ref().to_owned().clone();

let mut xdm_gossip_worker_builder = GossipWorkerBuilder::new();

let relayer_worker =
domain_client_message_relayer::worker::relay_consensus_chain_messages(
client.clone(),
sync_service.clone(),
xdm_gossip_worker_builder.gossip_msg_sink(),
);

task_manager.spawn_essential_handle().spawn_essential_blocking(
"consensus-chain-relayer",
None,
Box::pin(relayer_worker),
);

let (consensus_msg_sink, consensus_msg_receiver) =
tracing_unbounded("consensus_message_channel", 100);

// Start cross domain message listener for Consensus chain to receive messages
// from domains in the network
let consensus_listener =
cross_domain_message_gossip::start_cross_chain_message_listener(
ChainId::Consensus,
client.clone(),
transaction_pool.clone(),
consensus_msg_receiver,
);

task_manager.spawn_essential_handle().spawn_essential_blocking(
"consensus-message-listener",
None,
Box::pin(consensus_listener),
);

xdm_gossip_worker_builder
.push_chain_tx_pool_sink(ChainId::Consensus, consensus_msg_sink);

let (domain_message_sink, domain_message_receiver) =
tracing_unbounded("domain_message_channel", 100);

xdm_gossip_worker_builder.push_chain_tx_pool_sink(
ChainId::Domain(domain_config.domain_id),
domain_message_sink,
);

let domain = domain_config
.build(
base_directory,
ConsensusNodeLink {
consensus_client: client.clone(),
block_importing_notification_stream: block_importing_notification_stream
.clone(),
new_slot_notification_stream: new_slot_notification_stream.clone(),
consensus_sync_service: sync_service.clone(),
consensus_transaction_pool: transaction_pool.clone(),
gossip_message_sink: xdm_gossip_worker_builder.gossip_msg_sink(),
domain_message_receiver,
},
)
.await?;

let cross_domain_message_gossip_worker = xdm_gossip_worker_builder
.build::<OpaqueBlock, _, _>(network_service.clone(), sync_service.clone());

task_manager.spawn_essential_handle().spawn_essential_blocking(
"cross-domain-gossip-message-worker",
None,
Box::pin(cross_domain_message_gossip_worker.run()),
);

maybe_domain = Some(domain);
}

let (task_manager_drop_sender, task_manager_drop_receiver) = oneshot::channel();
let (task_manager_result_sender, task_manager_result_receiver) = oneshot::channel();
let task_manager_join_handle = sdk_utils::task_spawn(
Expand Down Expand Up @@ -266,28 +347,6 @@ impl<F: Farmer + 'static> Config<F> {
}
})?;

let mut maybe_domain = None;
if let Some(domain_config) = self.domain {
let base_directory = directory.as_ref().to_owned().clone();

let domain = domain_config
.build(
base_directory,
ConsensusNodeLink {
consensus_client: client.clone(),
block_importing_notification_stream: block_importing_notification_stream
.clone(),
new_slot_notification_stream: new_slot_notification_stream.clone(),
consensus_network_service: network_service.clone(),
consensus_sync_service: sync_service.clone(),
consensus_transaction_pool: transaction_pool.clone(),
},
)
.await?;

maybe_domain = Some(domain);
}

let rpc_handle = sdk_utils::Rpc::new(&rpc_handlers);
network_starter.start_network();

Expand Down Expand Up @@ -359,9 +418,9 @@ pub struct Node<F: Farmer> {
#[derivative(Debug = "ignore")]
client: Arc<FullClient>,
#[derivative(Debug = "ignore")]
sync_service: Arc<sc_network_sync::service::chain_sync::SyncingService<RuntimeBlock>>,
sync_service: Arc<sc_network_sync::service::chain_sync::SyncingService<OpaqueBlock>>,
#[derivative(Debug = "ignore")]
network_service: Arc<NetworkService<RuntimeBlock, Hash>>,
network_service: Arc<NetworkService<OpaqueBlock, Hash>>,
rpc_handle: sdk_utils::Rpc,
name: String,
dsn: DsnShared,
Expand Down

0 comments on commit f0e75f9

Please sign in to comment.