From c450f479c7855c80233e37f537dca51bec79f391 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sun, 6 Mar 2022 23:04:37 +0200 Subject: [PATCH] Follow node's best block number to plot in background while node is syncing and handle RPC connection hangs (#275) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Follow node's best block number to plot in background while node is syncing and handle RPC connection hangs Co-authored-by: Özgün Özerk --- crates/sc-consensus-subspace-rpc/src/lib.rs | 13 ++ .../src/bin/subspace-farmer/commands/farm.rs | 12 +- .../src/bin/subspace-farmer/main.rs | 4 + crates/subspace-farmer/src/lib.rs | 2 +- crates/subspace-farmer/src/mock_rpc.rs | 6 + crates/subspace-farmer/src/plotting.rs | 138 ++++++++++++++---- crates/subspace-farmer/src/plotting/tests.rs | 37 +++-- crates/subspace-farmer/src/rpc.rs | 4 + crates/subspace-farmer/src/ws_rpc.rs | 8 + 9 files changed, 179 insertions(+), 45 deletions(-) diff --git a/crates/sc-consensus-subspace-rpc/src/lib.rs b/crates/sc-consensus-subspace-rpc/src/lib.rs index 4ca651f8976d9..dd6bc7a449bec 100644 --- a/crates/sc-consensus-subspace-rpc/src/lib.rs +++ b/crates/sc-consensus-subspace-rpc/src/lib.rs @@ -61,6 +61,10 @@ pub trait SubspaceRpcApi { #[rpc(name = "subspace_getFarmerMetadata")] fn get_farmer_metadata(&self) -> FutureResult; + /// Get best block number + #[rpc(name = "subspace_getBestBlockNumber")] + fn get_best_block_number(&self) -> FutureResult; + /// Get encoded block by given block number #[rpc(name = "subspace_getBlockByNumber")] fn get_block_by_number( @@ -249,6 +253,15 @@ where }) } + fn get_best_block_number(&self) -> FutureResult { + let best_number = TryInto::::try_into(self.client.info().best_number) + .unwrap_or_else(|_| { + panic!("Block number can't be converted into BlockNumber"); + }); + + Box::pin(async move { Ok(best_number) }) + } + fn get_block_by_number( &self, block_number: BlockNumber, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 13fbcb5f6a5cb..aaf379656a073 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -5,10 +5,11 @@ use std::mem; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; use subspace_core_primitives::PublicKey; use subspace_farmer::ws_rpc_server::{RpcServer, RpcServerImpl}; use subspace_farmer::{ - Commitments, Farming, Identity, ObjectMappings, Plot, Plotting, RpcClient, WsRpc, + Commitments, FarmerData, Farming, Identity, ObjectMappings, Plot, Plotting, RpcClient, WsRpc, }; use subspace_networking::libp2p::multiaddr::Protocol; use subspace_networking::libp2p::Multiaddr; @@ -25,6 +26,7 @@ pub(crate) async fn farm( node_rpc_url: &str, ws_server_listen_addr: SocketAddr, reward_address: Option, + best_block_number_check_interval: Duration, ) -> Result<(), anyhow::Error> { // TODO: This doesn't account for the fact that node can // have a completely different history to what farmer expects @@ -140,14 +142,14 @@ pub(crate) async fn farm( reward_address, ); + let farmer_data = FarmerData::new(plot, commitments, object_mappings, farmer_metadata); + // start the background plotting let plotting_instance = Plotting::start( - plot, - commitments, - object_mappings, + farmer_data, client, - farmer_metadata, subspace_codec, + best_block_number_check_interval, ); tokio::select! { diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index fdb8d50b66e33..2c0fda94df376 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -8,9 +8,12 @@ use log::info; use sp_core::crypto::PublicError; use std::net::SocketAddr; use std::path::PathBuf; +use std::time::Duration; use subspace_core_primitives::PublicKey; use subspace_networking::libp2p::Multiaddr; +const BEST_BLOCK_NUMBER_CHECK_INTERVAL: Duration = Duration::from_secs(5); + #[derive(Debug, Parser)] enum IdentityCommand { /// View identity information @@ -119,6 +122,7 @@ async fn main() -> Result<()> { &node_rpc_url, ws_server_listen_addr, reward_address, + BEST_BLOCK_NUMBER_CHECK_INTERVAL, ) .await?; } diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index 32c01d4ac63d7..99dd79f706cce 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -38,6 +38,6 @@ pub use identity::Identity; pub use jsonrpsee; pub use object_mappings::{ObjectMappingError, ObjectMappings}; pub use plot::{Plot, PlotError}; -pub use plotting::Plotting; +pub use plotting::{FarmerData, Plotting}; pub use rpc::RpcClient; pub use ws_rpc::WsRpc; diff --git a/crates/subspace-farmer/src/mock_rpc.rs b/crates/subspace-farmer/src/mock_rpc.rs index 7695b961209ae..5f71718adb93b 100644 --- a/crates/subspace-farmer/src/mock_rpc.rs +++ b/crates/subspace-farmer/src/mock_rpc.rs @@ -1,6 +1,7 @@ use crate::rpc::{Error as MockError, NewHead, RpcClient}; use async_trait::async_trait; use std::sync::Arc; +use subspace_core_primitives::BlockNumber; use subspace_rpc_primitives::{ BlockSignature, BlockSigningInfo, EncodedBlockWithObjectMapping, FarmerMetadata, SlotInfo, SolutionResponse, @@ -123,6 +124,11 @@ impl RpcClient for MockRpc { Ok(self.inner.metadata_receiver.lock().await.try_recv()?) } + async fn best_block_number(&self) -> Result { + // Doesn't matter for tests (at least yet) + Ok(0) + } + async fn block_by_number( &self, _block_number: u32, diff --git a/crates/subspace-farmer/src/plotting.rs b/crates/subspace-farmer/src/plotting.rs index c62b74dc6724d..83dbe835b6505 100644 --- a/crates/subspace-farmer/src/plotting.rs +++ b/crates/subspace-farmer/src/plotting.rs @@ -5,9 +5,12 @@ use crate::commitments::Commitments; use crate::object_mappings::ObjectMappings; use crate::plot::Plot; use crate::rpc::RpcClient; -use log::{debug, error, info}; +use futures::channel::mpsc; +use futures::{SinkExt, StreamExt}; +use log::{debug, error, info, warn}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; +use std::time::Duration; use subspace_archiving::archiver::{ArchivedSegment, Archiver}; use subspace_core_primitives::objects::{GlobalObject, PieceObject, PieceObjectMapping}; use subspace_core_primitives::Sha256Hash; @@ -17,6 +20,8 @@ use thiserror::Error; use tokio::sync::oneshot::Receiver; use tokio::{sync::oneshot, task::JoinHandle}; +const BEST_BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); + #[derive(Debug, Error)] pub enum PlottingError { #[error("Plot is empty on restart, can't continue")] @@ -41,16 +46,37 @@ pub struct Plotting { handle: Option>>, } +pub struct FarmerData { + plot: Plot, + commitments: Commitments, + object_mappings: ObjectMappings, + metadata: FarmerMetadata, +} + +impl FarmerData { + pub fn new( + plot: Plot, + commitments: Commitments, + object_mappings: ObjectMappings, + metadata: FarmerMetadata, + ) -> Self { + Self { + plot, + commitments, + object_mappings, + metadata, + } + } +} + /// Assumes `plot`, `commitment`, `object_mappings`, `client` and `identity` are already initialized impl Plotting { /// Returns an instance of plotting, and also starts a concurrent background plotting task pub fn start( - plot: Plot, - commitments: Commitments, - object_mappings: ObjectMappings, + farmer_data: FarmerData, client: T, - farmer_metadata: FarmerMetadata, subspace_codec: SubspaceCodec, + best_block_number_check_interval: Duration, ) -> Self { // Oneshot channels, that will be used for interrupt/stop the process let (stop_sender, stop_receiver) = oneshot::channel(); @@ -58,12 +84,10 @@ impl Plotting { // Get a handle for the background task, so that we can wait on it later if we want to let plotting_handle = tokio::spawn(async move { background_plotting( + farmer_data, client, - plot, - commitments, - object_mappings, - farmer_metadata, subspace_codec, + best_block_number_check_interval, stop_receiver, ) .await @@ -95,26 +119,24 @@ impl Drop for Plotting { // don't want eventually /// Maintains plot in up to date state plotting new pieces as they are produced on the network. async fn background_plotting( + farmer_data: FarmerData, client: T, - plot: Plot, - commitments: Commitments, - object_mappings: ObjectMappings, - farmer_metadata: FarmerMetadata, mut subspace_codec: SubspaceCodec, + best_block_number_check_interval: Duration, mut stop_receiver: Receiver<()>, ) -> Result<(), PlottingError> { - let weak_plot = plot.downgrade(); + let weak_plot = farmer_data.plot.downgrade(); let FarmerMetadata { confirmation_depth_k, record_size, recorded_history_segment_size, - } = farmer_metadata; + } = farmer_data.metadata; // TODO: This assumes fixed size segments, which might not be the case let merkle_num_leaves = u64::from(recorded_history_segment_size / record_size * 2); let maybe_last_root_block = tokio::task::spawn_blocking({ - let plot = plot.clone(); + let plot = farmer_data.plot.clone(); move || plot.get_last_root_block().map_err(PlottingError::LastBlock) }) @@ -123,7 +145,7 @@ async fn background_plotting( let mut archiver = if let Some(last_root_block) = maybe_last_root_block { // Continuing from existing initial state - if plot.is_empty() { + if farmer_data.plot.is_empty() { return Err(PlottingError::ContinueError); } @@ -153,12 +175,12 @@ async fn background_plotting( } } else { // Starting from genesis - if !plot.is_empty() { + if !farmer_data.plot.is_empty() { // Restart before first block was archived, erase the plot // TODO: Erase plot } - drop(plot); + drop(farmer_data.plot); Archiver::new(record_size as usize, recorded_history_segment_size as usize) .map_err(PlottingError::Archiver)? @@ -248,12 +270,13 @@ async fn background_plotting( { error!("Failed to write encoded pieces: {}", error); } - if let Err(error) = - commitments.create_for_pieces(&pieces, piece_index_offset) + if let Err(error) = farmer_data + .commitments + .create_for_pieces(&pieces, piece_index_offset) { error!("Failed to create commitments for pieces: {}", error); } - if let Err(error) = object_mappings.store(&object_mapping) { + if let Err(error) = farmer_data.object_mappings.store(&object_mapping) { error!("Failed to store object mappings for pieces: {}", error); } let segment_index = root_block.segment_index(); @@ -295,6 +318,30 @@ async fn background_plotting( .expect("Failed to send genesis block archiving message"); } + let (mut best_block_number_sender, mut best_block_number_receiver) = mpsc::channel(1); + + tokio::spawn(async move { + loop { + tokio::time::sleep(best_block_number_check_interval).await; + + // In case connection dies, we need to disconnect from the node + let best_block_number_result = + tokio::time::timeout(BEST_BLOCK_REQUEST_TIMEOUT, client.best_block_number()).await; + + let is_error = !matches!(best_block_number_result, Ok(Ok(_))); + // Result doesn't matter here + let _ = best_block_number_sender + .send(best_block_number_result) + .await; + + if is_error { + break; + } + } + }); + + let mut last_best_block_number_error = false; + // Listen for new blocks produced on the network loop { tokio::select! { @@ -308,10 +355,11 @@ async fn background_plotting( let block_number = u32::from_str_radix(&head.number[2..], 16).unwrap(); debug!("Last block number: {:#?}", block_number); - if let Some(block) = block_number.checked_sub(confirmation_depth_k) { - // We send block that should be archived over channel that doesn't have a buffer, atomic - // integer is used to make sure archiving process always read up to date value - block_to_archive.store(block, Ordering::Relaxed); + if let Some(block_number) = block_number.checked_sub(confirmation_depth_k) { + // We send block that should be archived over channel that doesn't have + // a buffer, atomic integer is used to make sure archiving process + // always read up to date value + block_to_archive.store(block_number, Ordering::Relaxed); let _ = new_block_to_archive_sender.try_send(Arc::clone(&block_to_archive)); } }, @@ -321,6 +369,44 @@ async fn background_plotting( } } } + maybe_result = best_block_number_receiver.next() => { + match maybe_result { + Some(Ok(Ok(best_block_number))) => { + debug!("Best block number: {:#?}", best_block_number); + last_best_block_number_error = false; + + if let Some(block_number) = best_block_number.checked_sub(confirmation_depth_k) { + // We send block that should be archived over channel that doesn't have + // a buffer, atomic integer is used to make sure archiving process + // always read up to date value + block_to_archive.fetch_max(block_number, Ordering::Relaxed); + let _ = new_block_to_archive_sender.try_send(Arc::clone(&block_to_archive)); + } + } + Some(Ok(Err(error))) => { + if last_best_block_number_error { + error!("Request to get new best block failed second time: {error}"); + break; + } else { + warn!("Request to get new best block failed: {error}"); + last_best_block_number_error = true; + } + } + Some(Err(_error)) => { + if last_best_block_number_error { + error!("Request to get new best block timed out second time"); + break; + } else { + warn!("Request to get new best block timed out"); + last_best_block_number_error = true; + } + } + None => { + debug!("Best block number channel closed!"); + break; + } + } + } } } diff --git a/crates/subspace-farmer/src/plotting/tests.rs b/crates/subspace-farmer/src/plotting/tests.rs index 0e78c5f67072a..a7b09c7d6fe6a 100644 --- a/crates/subspace-farmer/src/plotting/tests.rs +++ b/crates/subspace-farmer/src/plotting/tests.rs @@ -3,7 +3,7 @@ use crate::identity::Identity; use crate::mock_rpc::MockRpc; use crate::object_mappings::ObjectMappings; use crate::plot::Plot; -use crate::plotting::Plotting; +use crate::plotting::{FarmerData, Plotting}; use crate::rpc::{NewHead, RpcClient}; use subspace_core_primitives::{PIECE_SIZE, SHA256_HASH_SIZE}; use subspace_rpc_primitives::{EncodedBlockWithObjectMapping, FarmerMetadata}; @@ -15,6 +15,7 @@ const MERKLE_NUM_LEAVES: usize = 8_usize; const WITNESS_SIZE: usize = SHA256_HASH_SIZE * MERKLE_NUM_LEAVES.log2() as usize; // 96 const RECORD_SIZE: usize = PIECE_SIZE - WITNESS_SIZE; // 4000 const SEGMENT_SIZE: usize = RECORD_SIZE * MERKLE_NUM_LEAVES / 2; // 16000 +const BEST_BLOCK_NUMBER_CHECK_INTERVAL: Duration = Duration::from_secs(5); fn init() { let _ = env_logger::builder().is_test(true).try_init(); @@ -49,6 +50,8 @@ async fn plotting_happy_path() { let subspace_codec = SubspaceCodec::new(identity.public_key()); + let farmer_data = FarmerData::new(plot.clone(), commitments, object_mappings, farmer_metadata); + let encoded_block0 = EncodedBlockWithObjectMapping { block: vec![0u8; SEGMENT_SIZE / 2], object_mapping: Default::default(), // This test does not concern with the object mappings at the moment. @@ -68,12 +71,10 @@ async fn plotting_happy_path() { let new_heads = vec![new_head0, new_head1]; let plotting_instance = Plotting::start( - plot.clone(), - commitments, - object_mappings, + farmer_data, client.clone(), - farmer_metadata, subspace_codec, + BEST_BLOCK_NUMBER_CHECK_INTERVAL, ); for (block, new_head) in encoded_blocks.into_iter().zip(new_heads) { @@ -157,6 +158,13 @@ async fn plotting_continue() { let subspace_codec = SubspaceCodec::new(identity.public_key()); + let farmer_data = FarmerData::new( + plot.clone(), + commitments.clone(), + object_mappings.clone(), + farmer_metadata.clone(), + ); + let encoded_block0 = EncodedBlockWithObjectMapping { block: vec![0u8; SEGMENT_SIZE / 2], object_mapping: Default::default(), // This test does not concern with the object mappings at the moment. @@ -176,12 +184,10 @@ async fn plotting_continue() { let new_heads = vec![new_head0, new_head1]; let plotting_instance = Plotting::start( - plot.clone(), - commitments.clone(), - object_mappings.clone(), + farmer_data, client.clone(), - farmer_metadata.clone(), subspace_codec, + BEST_BLOCK_NUMBER_CHECK_INTERVAL, ); for (block, new_head) in encoded_blocks.into_iter().zip(new_heads) { @@ -216,6 +222,13 @@ async fn plotting_continue() { // phase 2 - continue with new blocks after dropping the old plotting let client = MockRpc::new(); + let farmer_data = FarmerData::new( + plot.clone(), + commitments.clone(), + object_mappings.clone(), + farmer_metadata.clone(), + ); + // plotting will ask for the last encoded block to continue from where it's left off let prev_encoded_block = EncodedBlockWithObjectMapping { block: vec![1u8; SEGMENT_SIZE / 2], @@ -246,12 +259,10 @@ async fn plotting_continue() { sleep(Duration::from_millis(250)).await; let plotting_instance = Plotting::start( - plot.clone(), - commitments, - object_mappings, + farmer_data, client.clone(), - farmer_metadata, subspace_codec, + BEST_BLOCK_NUMBER_CHECK_INTERVAL, ); for (block, new_head) in encoded_blocks.into_iter().zip(new_heads) { diff --git a/crates/subspace-farmer/src/rpc.rs b/crates/subspace-farmer/src/rpc.rs index 068193384d5d1..c6be37eee7737 100644 --- a/crates/subspace-farmer/src/rpc.rs +++ b/crates/subspace-farmer/src/rpc.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use serde::Deserialize; +use subspace_core_primitives::BlockNumber; use subspace_rpc_primitives::{ BlockSignature, BlockSigningInfo, EncodedBlockWithObjectMapping, FarmerMetadata, SlotInfo, SolutionResponse, @@ -22,6 +23,9 @@ pub trait RpcClient: Clone + Send + Sync + 'static { async fn farmer_metadata(&self) -> Result; /// Get a block by number + async fn best_block_number(&self) -> Result; + + /// Get best block number async fn block_by_number( &self, block_number: u32, diff --git a/crates/subspace-farmer/src/ws_rpc.rs b/crates/subspace-farmer/src/ws_rpc.rs index 89fe8128a0bb5..af1bd46657177 100644 --- a/crates/subspace-farmer/src/ws_rpc.rs +++ b/crates/subspace-farmer/src/ws_rpc.rs @@ -5,6 +5,7 @@ use jsonrpsee::core::Error as JsonError; use jsonrpsee::rpc_params; use jsonrpsee::ws_client::{WsClient, WsClientBuilder}; use std::sync::Arc; +use subspace_core_primitives::BlockNumber; use subspace_rpc_primitives::{ BlockSignature, BlockSigningInfo, EncodedBlockWithObjectMapping, FarmerMetadata, SlotInfo, SolutionResponse, @@ -34,6 +35,13 @@ impl RpcClient for WsRpc { .await?) } + async fn best_block_number(&self) -> Result { + Ok(self + .client + .request("subspace_getBestBlockNumber", rpc_params![]) + .await?) + } + async fn block_by_number( &self, block_number: u32,