Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Follow node's best block number to plot in background while node is s…
Browse files Browse the repository at this point in the history
…yncing and handle RPC connection hangs (paritytech#275)

* Follow node's best block number to plot in background while node is syncing and handle RPC connection hangs

Co-authored-by: Özgün Özerk <ozgun@subspace.network>
  • Loading branch information
nazar-pc and ozgunozerk authored Mar 6, 2022
1 parent 2a4e635 commit c450f47
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 45 deletions.
13 changes: 13 additions & 0 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ pub trait SubspaceRpcApi {
#[rpc(name = "subspace_getFarmerMetadata")]
fn get_farmer_metadata(&self) -> FutureResult<FarmerMetadata>;

/// Get best block number
#[rpc(name = "subspace_getBestBlockNumber")]
fn get_best_block_number(&self) -> FutureResult<BlockNumber>;

/// Get encoded block by given block number
#[rpc(name = "subspace_getBlockByNumber")]
fn get_block_by_number(
Expand Down Expand Up @@ -249,6 +253,15 @@ where
})
}

fn get_best_block_number(&self) -> FutureResult<BlockNumber> {
let best_number = TryInto::<BlockNumber>::try_into(self.client.info().best_number)
.unwrap_or_else(|_| {
panic!("Block number can't be converted into BlockNumber");
});

Box::pin(async move { Ok(best_number) })
}

fn get_block_by_number(
&self,
block_number: BlockNumber,
Expand Down
12 changes: 7 additions & 5 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::mem;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::PublicKey;
use subspace_farmer::ws_rpc_server::{RpcServer, RpcServerImpl};
use subspace_farmer::{
Commitments, Farming, Identity, ObjectMappings, Plot, Plotting, RpcClient, WsRpc,
Commitments, FarmerData, Farming, Identity, ObjectMappings, Plot, Plotting, RpcClient, WsRpc,
};
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::libp2p::Multiaddr;
Expand All @@ -25,6 +26,7 @@ pub(crate) async fn farm(
node_rpc_url: &str,
ws_server_listen_addr: SocketAddr,
reward_address: Option<PublicKey>,
best_block_number_check_interval: Duration,
) -> Result<(), anyhow::Error> {
// TODO: This doesn't account for the fact that node can
// have a completely different history to what farmer expects
Expand Down Expand Up @@ -140,14 +142,14 @@ pub(crate) async fn farm(
reward_address,
);

let farmer_data = FarmerData::new(plot, commitments, object_mappings, farmer_metadata);

// start the background plotting
let plotting_instance = Plotting::start(
plot,
commitments,
object_mappings,
farmer_data,
client,
farmer_metadata,
subspace_codec,
best_block_number_check_interval,
);

tokio::select! {
Expand Down
4 changes: 4 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use log::info;
use sp_core::crypto::PublicError;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::Duration;
use subspace_core_primitives::PublicKey;
use subspace_networking::libp2p::Multiaddr;

const BEST_BLOCK_NUMBER_CHECK_INTERVAL: Duration = Duration::from_secs(5);

#[derive(Debug, Parser)]
enum IdentityCommand {
/// View identity information
Expand Down Expand Up @@ -119,6 +122,7 @@ async fn main() -> Result<()> {
&node_rpc_url,
ws_server_listen_addr,
reward_address,
BEST_BLOCK_NUMBER_CHECK_INTERVAL,
)
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ pub use identity::Identity;
pub use jsonrpsee;
pub use object_mappings::{ObjectMappingError, ObjectMappings};
pub use plot::{Plot, PlotError};
pub use plotting::Plotting;
pub use plotting::{FarmerData, Plotting};
pub use rpc::RpcClient;
pub use ws_rpc::WsRpc;
6 changes: 6 additions & 0 deletions crates/subspace-farmer/src/mock_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::rpc::{Error as MockError, NewHead, RpcClient};
use async_trait::async_trait;
use std::sync::Arc;
use subspace_core_primitives::BlockNumber;
use subspace_rpc_primitives::{
BlockSignature, BlockSigningInfo, EncodedBlockWithObjectMapping, FarmerMetadata, SlotInfo,
SolutionResponse,
Expand Down Expand Up @@ -123,6 +124,11 @@ impl RpcClient for MockRpc {
Ok(self.inner.metadata_receiver.lock().await.try_recv()?)
}

async fn best_block_number(&self) -> Result<BlockNumber, MockError> {
// Doesn't matter for tests (at least yet)
Ok(0)
}

async fn block_by_number(
&self,
_block_number: u32,
Expand Down
138 changes: 112 additions & 26 deletions crates/subspace-farmer/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ use crate::commitments::Commitments;
use crate::object_mappings::ObjectMappings;
use crate::plot::Plot;
use crate::rpc::RpcClient;
use log::{debug, error, info};
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use log::{debug, error, info, warn};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use subspace_archiving::archiver::{ArchivedSegment, Archiver};
use subspace_core_primitives::objects::{GlobalObject, PieceObject, PieceObjectMapping};
use subspace_core_primitives::Sha256Hash;
Expand All @@ -17,6 +20,8 @@ use thiserror::Error;
use tokio::sync::oneshot::Receiver;
use tokio::{sync::oneshot, task::JoinHandle};

const BEST_BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);

#[derive(Debug, Error)]
pub enum PlottingError {
#[error("Plot is empty on restart, can't continue")]
Expand All @@ -41,29 +46,48 @@ pub struct Plotting {
handle: Option<JoinHandle<Result<(), PlottingError>>>,
}

pub struct FarmerData {
plot: Plot,
commitments: Commitments,
object_mappings: ObjectMappings,
metadata: FarmerMetadata,
}

impl FarmerData {
pub fn new(
plot: Plot,
commitments: Commitments,
object_mappings: ObjectMappings,
metadata: FarmerMetadata,
) -> Self {
Self {
plot,
commitments,
object_mappings,
metadata,
}
}
}

/// Assumes `plot`, `commitment`, `object_mappings`, `client` and `identity` are already initialized
impl Plotting {
/// Returns an instance of plotting, and also starts a concurrent background plotting task
pub fn start<T: RpcClient + Clone + Send + Sync + 'static>(
plot: Plot,
commitments: Commitments,
object_mappings: ObjectMappings,
farmer_data: FarmerData,
client: T,
farmer_metadata: FarmerMetadata,
subspace_codec: SubspaceCodec,
best_block_number_check_interval: Duration,
) -> Self {
// Oneshot channels, that will be used for interrupt/stop the process
let (stop_sender, stop_receiver) = oneshot::channel();

// Get a handle for the background task, so that we can wait on it later if we want to
let plotting_handle = tokio::spawn(async move {
background_plotting(
farmer_data,
client,
plot,
commitments,
object_mappings,
farmer_metadata,
subspace_codec,
best_block_number_check_interval,
stop_receiver,
)
.await
Expand Down Expand Up @@ -95,26 +119,24 @@ impl Drop for Plotting {
// don't want eventually
/// Maintains plot in up to date state plotting new pieces as they are produced on the network.
async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
farmer_data: FarmerData,
client: T,
plot: Plot,
commitments: Commitments,
object_mappings: ObjectMappings,
farmer_metadata: FarmerMetadata,
mut subspace_codec: SubspaceCodec,
best_block_number_check_interval: Duration,
mut stop_receiver: Receiver<()>,
) -> Result<(), PlottingError> {
let weak_plot = plot.downgrade();
let weak_plot = farmer_data.plot.downgrade();
let FarmerMetadata {
confirmation_depth_k,
record_size,
recorded_history_segment_size,
} = farmer_metadata;
} = farmer_data.metadata;

// TODO: This assumes fixed size segments, which might not be the case
let merkle_num_leaves = u64::from(recorded_history_segment_size / record_size * 2);

let maybe_last_root_block = tokio::task::spawn_blocking({
let plot = plot.clone();
let plot = farmer_data.plot.clone();

move || plot.get_last_root_block().map_err(PlottingError::LastBlock)
})
Expand All @@ -123,7 +145,7 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(

let mut archiver = if let Some(last_root_block) = maybe_last_root_block {
// Continuing from existing initial state
if plot.is_empty() {
if farmer_data.plot.is_empty() {
return Err(PlottingError::ContinueError);
}

Expand Down Expand Up @@ -153,12 +175,12 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
}
} else {
// Starting from genesis
if !plot.is_empty() {
if !farmer_data.plot.is_empty() {
// Restart before first block was archived, erase the plot
// TODO: Erase plot
}

drop(plot);
drop(farmer_data.plot);

Archiver::new(record_size as usize, recorded_history_segment_size as usize)
.map_err(PlottingError::Archiver)?
Expand Down Expand Up @@ -248,12 +270,13 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
{
error!("Failed to write encoded pieces: {}", error);
}
if let Err(error) =
commitments.create_for_pieces(&pieces, piece_index_offset)
if let Err(error) = farmer_data
.commitments
.create_for_pieces(&pieces, piece_index_offset)
{
error!("Failed to create commitments for pieces: {}", error);
}
if let Err(error) = object_mappings.store(&object_mapping) {
if let Err(error) = farmer_data.object_mappings.store(&object_mapping) {
error!("Failed to store object mappings for pieces: {}", error);
}
let segment_index = root_block.segment_index();
Expand Down Expand Up @@ -295,6 +318,30 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
.expect("Failed to send genesis block archiving message");
}

let (mut best_block_number_sender, mut best_block_number_receiver) = mpsc::channel(1);

tokio::spawn(async move {
loop {
tokio::time::sleep(best_block_number_check_interval).await;

// In case connection dies, we need to disconnect from the node
let best_block_number_result =
tokio::time::timeout(BEST_BLOCK_REQUEST_TIMEOUT, client.best_block_number()).await;

let is_error = !matches!(best_block_number_result, Ok(Ok(_)));
// Result doesn't matter here
let _ = best_block_number_sender
.send(best_block_number_result)
.await;

if is_error {
break;
}
}
});

let mut last_best_block_number_error = false;

// Listen for new blocks produced on the network
loop {
tokio::select! {
Expand All @@ -308,10 +355,11 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
let block_number = u32::from_str_radix(&head.number[2..], 16).unwrap();
debug!("Last block number: {:#?}", block_number);

if let Some(block) = block_number.checked_sub(confirmation_depth_k) {
// We send block that should be archived over channel that doesn't have a buffer, atomic
// integer is used to make sure archiving process always read up to date value
block_to_archive.store(block, Ordering::Relaxed);
if let Some(block_number) = block_number.checked_sub(confirmation_depth_k) {
// We send block that should be archived over channel that doesn't have
// a buffer, atomic integer is used to make sure archiving process
// always read up to date value
block_to_archive.store(block_number, Ordering::Relaxed);
let _ = new_block_to_archive_sender.try_send(Arc::clone(&block_to_archive));
}
},
Expand All @@ -321,6 +369,44 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
}
}
}
maybe_result = best_block_number_receiver.next() => {
match maybe_result {
Some(Ok(Ok(best_block_number))) => {
debug!("Best block number: {:#?}", best_block_number);
last_best_block_number_error = false;

if let Some(block_number) = best_block_number.checked_sub(confirmation_depth_k) {
// We send block that should be archived over channel that doesn't have
// a buffer, atomic integer is used to make sure archiving process
// always read up to date value
block_to_archive.fetch_max(block_number, Ordering::Relaxed);
let _ = new_block_to_archive_sender.try_send(Arc::clone(&block_to_archive));
}
}
Some(Ok(Err(error))) => {
if last_best_block_number_error {
error!("Request to get new best block failed second time: {error}");
break;
} else {
warn!("Request to get new best block failed: {error}");
last_best_block_number_error = true;
}
}
Some(Err(_error)) => {
if last_best_block_number_error {
error!("Request to get new best block timed out second time");
break;
} else {
warn!("Request to get new best block timed out");
last_best_block_number_error = true;
}
}
None => {
debug!("Best block number channel closed!");
break;
}
}
}
}
}

Expand Down
Loading

0 comments on commit c450f47

Please sign in to comment.