Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Piece getter for DSN sync #2503

Merged
merged 3 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions crates/pallet-subspace/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ use subspace_core_primitives::{
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::auditing::audit_sector_sync;
use subspace_farmer_components::plotting::{
plot_sector, PieceGetterRetryPolicy, PlotSectorOptions,
};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions};
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy};
use subspace_proof_of_space::shim::ShimTable;
use subspace_proof_of_space::{Table, TableGenerator};
use subspace_verification::is_within_solution_range;
Expand Down
6 changes: 2 additions & 4 deletions crates/subspace-farmer-components/benches/auditing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ use subspace_core_primitives::{
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::auditing::audit_plot_sync;
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::plotting::{
plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector,
};
use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector};
use subspace_farmer_components::sector::{
sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy};
use subspace_proof_of_space::chia::ChiaTable;
use subspace_proof_of_space::Table;

Expand Down
6 changes: 2 additions & 4 deletions crates/subspace-farmer-components/benches/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ use subspace_core_primitives::crypto::kzg;
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::{HistorySize, PublicKey, Record, RecordedHistorySegment};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::plotting::{
plot_sector, PieceGetterRetryPolicy, PlotSectorOptions,
};
use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions};
use subspace_farmer_components::sector::sector_size;
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy};
use subspace_proof_of_space::chia::ChiaTable;
use subspace_proof_of_space::Table;

Expand Down
6 changes: 2 additions & 4 deletions crates/subspace-farmer-components/benches/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ use subspace_core_primitives::{
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::auditing::audit_plot_sync;
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::plotting::{
plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector,
};
use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector};
use subspace_farmer_components::sector::{
sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy};
use subspace_proof_of_space::chia::ChiaTable;
use subspace_proof_of_space::{Table, TableGenerator};

Expand Down
6 changes: 2 additions & 4 deletions crates/subspace-farmer-components/benches/reading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ use subspace_core_primitives::{
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::plotting::{
plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector,
};
use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector};
use subspace_farmer_components::reading::read_piece;
use subspace_farmer_components::sector::{
sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
};
use subspace_farmer_components::{FarmerProtocolInfo, ReadAt, ReadAtSync};
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy, ReadAt, ReadAtSync};
use subspace_proof_of_space::chia::ChiaTable;
use subspace_proof_of_space::Table;

Expand Down
59 changes: 58 additions & 1 deletion crates/subspace-farmer-components/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,69 @@ pub mod sector;
mod segment_reconstruction;

use crate::file_ext::FileExt;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use static_assertions::const_assert;
use std::fs::File;
use std::future::Future;
use std::io;
use subspace_core_primitives::HistorySize;
use std::sync::Arc;
use subspace_core_primitives::{ArchivedHistorySegment, HistorySize, Piece, PieceIndex};

use std::error::Error;

/// Defines retry policy on error during piece acquiring.
#[derive(PartialEq, Eq, Clone, Debug, Copy)]
pub enum PieceGetterRetryPolicy {
/// Retry N times (including zero)
Limited(u16),
/// No restrictions on retries
Unlimited,
}

impl Default for PieceGetterRetryPolicy {
#[inline]
fn default() -> Self {
Self::Limited(0)
}
}

/// Trait representing a way to get pieces
#[async_trait]
pub trait PieceGetter {
async fn get_piece(
&self,
piece_index: PieceIndex,
retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>>;
}

#[async_trait]
impl<T> PieceGetter for Arc<T>
where
T: PieceGetter + Send + Sync,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
self.as_ref().get_piece(piece_index, retry_policy).await
}
}

#[async_trait]
impl PieceGetter for ArchivedHistorySegment {
async fn get_piece(
&self,
piece_index: PieceIndex,
_retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
Ok(self
.get(usize::try_from(u64::from(piece_index))?)
.map(Piece::from))
}
}

/// Enum to encapsulate the selection between [`ReadAtSync`] and [`ReadAtAsync]` variants
#[derive(Copy, Clone)]
Expand Down
61 changes: 2 additions & 59 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,22 @@ use crate::sector::{
SectorMetadata, SectorMetadataChecksummed,
};
use crate::segment_reconstruction::recover_missing_piece;
use crate::FarmerProtocolInfo;
use crate::{FarmerProtocolInfo, PieceGetter, PieceGetterRetryPolicy};
use async_lock::Mutex;
use async_trait::async_trait;
use backoff::future::retry;
use backoff::{Error as BackoffError, ExponentialBackoff};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use parity_scale_codec::{Decode, Encode};
use rayon::prelude::*;
use std::error::Error;
use std::mem;
use std::simd::Simd;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::crypto::{blake3_hash, blake3_hash_parallel, Scalar};
use subspace_core_primitives::{
ArchivedHistorySegment, Blake3Hash, Piece, PieceIndex, PieceOffset, PublicKey, Record, SBucket,
SectorId, SectorIndex,
Blake3Hash, PieceIndex, PieceOffset, PublicKey, Record, SBucket, SectorId, SectorIndex,
};
use subspace_erasure_coding::ErasureCoding;
use subspace_proof_of_space::{Table, TableGenerator};
Expand All @@ -42,60 +39,6 @@ fn default_backoff() -> ExponentialBackoff {
}
}

/// Defines retry policy on error during piece acquiring.
#[derive(PartialEq, Eq, Clone, Debug, Copy)]
pub enum PieceGetterRetryPolicy {
/// Retry N times (including zero)
Limited(u16),
/// No restrictions on retries
Unlimited,
}

impl Default for PieceGetterRetryPolicy {
#[inline]
fn default() -> Self {
Self::Limited(0)
}
}

/// Duplicate trait for the subspace_networking::PieceReceiver. The goal of this trait is
/// simplifying dependency graph.
#[async_trait]
pub trait PieceGetter {
async fn get_piece(
&self,
piece_index: PieceIndex,
retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>>;
}

#[async_trait]
impl<T> PieceGetter for Arc<T>
where
T: PieceGetter + Send + Sync,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
self.as_ref().get_piece(piece_index, retry_policy).await
}
}

#[async_trait]
impl PieceGetter for ArchivedHistorySegment {
async fn get_piece(
&self,
piece_index: PieceIndex,
_retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
Ok(self
.get(usize::try_from(u64::from(piece_index))?)
.map(Piece::from))
}
}

/// Information about sector that was plotted
#[derive(Debug, Clone, Encode, Decode)]
pub struct PlottedSector {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::plotting::{PieceGetter, PieceGetterRetryPolicy};
use crate::{PieceGetter, PieceGetterRetryPolicy};
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::{fmt, mem};
use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_farmer_components::{PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::kad::{ProviderRecord, RecordKey};
use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
Expand Down
3 changes: 1 addition & 2 deletions crates/subspace-farmer/src/piece_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use std::time::Duration;
use subspace_core_primitives::{
HistorySize, LastArchivedBlock, Piece, PieceIndex, SegmentHeader, SegmentIndex,
};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::identity;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::utils::multihash::ToMultihash;
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ use subspace_core_primitives::{
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::plotting::{PieceGetter, PlottedSector};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter};
use subspace_networking::KnownPeersManager;
use subspace_proof_of_space::Table;
use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use subspace_core_primitives::{
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::file_ext::FileExt;
use subspace_farmer_components::plotting;
use subspace_farmer_components::plotting::{
download_sector, encode_sector, DownloadSectorOptions, DownloadedSector, EncodeSectorOptions,
PieceGetter, PieceGetterRetryPolicy, PlottedSector,
PlottedSector,
};
use subspace_farmer_components::sector::SectorMetadataChecksummed;
use subspace_farmer_components::{plotting, PieceGetter, PieceGetterRetryPolicy};
use subspace_proof_of_space::Table;
use thiserror::Error;
use tokio::runtime::Handle;
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/utils/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use parking_lot::Mutex;
use std::error::Error;
use std::sync::Arc;
use subspace_core_primitives::{Piece, PieceIndex};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_farmer_components::{PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ fn main() -> Result<(), Error> {
// Domain node needs slots notifications for bundle production.
force_new_slot_notifications: true,
subspace_networking: SubspaceNetworking::Create { config: dsn_config },
dsn_piece_getter: None,
sync_from_dsn: true,
is_timekeeper: false,
timekeeper_cpu_cores: Default::default(),
Expand Down
7 changes: 7 additions & 0 deletions crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::StreamExt;
use libp2p::PeerId;
use std::collections::HashSet;
use std::error::Error;
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use subspace_core_primitives::{Piece, PieceIndex};
Expand Down Expand Up @@ -63,6 +64,12 @@ pub struct PieceProvider<PV> {
piece_validator: Option<PV>,
}

impl<PV> fmt::Debug for PieceProvider<PV> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("PieceProvider").finish_non_exhaustive()
}
}

impl<PV> PieceProvider<PV>
where
PV: PieceValidator,
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-node/src/commands/run/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ pub(super) fn create_consensus_chain_configuration(
// Domain node needs slots notifications for bundle production.
force_new_slot_notifications: domains_enabled,
subspace_networking: SubspaceNetworking::Create { config: dsn_config },
dsn_piece_getter: None,
sync_from_dsn,
is_timekeeper: timekeeper_options.timekeeper,
timekeeper_cpu_cores: timekeeper_options.timekeeper_cpu_cores,
Expand Down
3 changes: 3 additions & 0 deletions crates/subspace-service/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::dsn::DsnConfig;
use crate::sync_from_dsn::DsnSyncPieceGetter;
use prometheus_client::registry::Registry;
use sc_chain_spec::ChainSpec;
use sc_network::config::{
Expand Down Expand Up @@ -242,6 +243,8 @@ pub struct SubspaceConfiguration {
pub force_new_slot_notifications: bool,
/// Subspace networking (DSN).
pub subspace_networking: SubspaceNetworking,
/// DSN piece getter
pub dsn_piece_getter: Option<Arc<dyn DsnSyncPieceGetter + Send + Sync + 'static>>,
/// Enables DSN-sync on startup.
pub sync_from_dsn: bool,
/// Is this node a Timekeeper
Expand Down
17 changes: 15 additions & 2 deletions crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ pub mod config;
pub mod dsn;
mod metrics;
pub mod rpc;
mod sync_from_dsn;
pub mod sync_from_dsn;
pub mod transaction_pool;

use crate::config::{SubspaceConfiguration, SubspaceNetworking};
use crate::dsn::{create_dsn_instance, DsnConfigurationError};
use crate::metrics::NodeMetrics;
use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator;
use crate::transaction_pool::FullPool;
use core::sync::atomic::{AtomicU32, Ordering};
use cross_domain_message_gossip::xdm_gossip_peers_set_config;
Expand Down Expand Up @@ -105,6 +106,7 @@ use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
use subspace_core_primitives::{BlockNumber, PotSeed, REWARD_SIGNING_CONTEXT};
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_proof_of_space::Table;
use subspace_runtime_primitives::opaque::Block;
use subspace_runtime_primitives::{AccountId, Balance, Hash, Nonce};
Expand Down Expand Up @@ -844,6 +846,17 @@ where

network_wrapper.set(network_service.clone());
if config.sync_from_dsn {
let dsn_sync_piece_getter = config.dsn_piece_getter.unwrap_or_else(|| {
Arc::new(PieceProvider::new(
node.clone(),
Some(SegmentCommitmentPieceValidator::new(
node.clone(),
subspace_link.kzg().clone(),
segment_headers_store.clone(),
)),
))
});

let (observer, worker) = sync_from_dsn::create_observer_and_worker(
segment_headers_store.clone(),
Arc::clone(&network_service),
Expand All @@ -852,7 +865,7 @@ where
import_queue_service,
sync_target_block_number,
pause_sync,
subspace_link.kzg().clone(),
dsn_sync_piece_getter,
);
task_manager
.spawn_handle()
Expand Down
Loading
Loading