diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 4896478234..1b39aa6d37 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -2,13 +2,13 @@ mod dsn; mod metrics; use crate::commands::farm::dsn::configure_dsn; -use crate::commands::farm::metrics::FarmerMetrics; +use crate::commands::farm::metrics::{FarmerMetrics, SectorState}; use crate::utils::shutdown_signal; use anyhow::anyhow; use bytesize::ByteSize; use clap::{Parser, ValueHint}; use futures::channel::oneshot; -use futures::stream::FuturesUnordered; +use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{FutureExt, StreamExt}; use parking_lot::Mutex; use prometheus_client::registry::Registry; @@ -665,25 +665,23 @@ where info!("Finished collecting already plotted pieces successfully"); - for single_disk_farm in single_disk_farms.iter() { - farmer_metrics.update_farm_size( - single_disk_farm.id(), - single_disk_farm.total_sectors_count(), - ); - farmer_metrics.inc_farm_plotted( - single_disk_farm.id(), - single_disk_farm - .plotted_sectors_count() - .await - .try_into() - .unwrap(), - ); - } + let total_and_plotted_sectors = single_disk_farms + .iter() + .map(|single_disk_farm| async { + let total_sector_count = single_disk_farm.total_sectors_count(); + let plotted_sectors_count = single_disk_farm.plotted_sectors_count().await; + + (total_sector_count, plotted_sectors_count) + }) + .collect::>() + .collect::>() + .await; let mut single_disk_farms_stream = single_disk_farms .into_iter() .enumerate() - .map(|(disk_farm_index, single_disk_farm)| { + .zip(total_and_plotted_sectors) + .map(|((disk_farm_index, single_disk_farm), sector_counts)| { let disk_farm_index = disk_farm_index.try_into().expect( "More than 256 plots are not supported, this is checked above already; qed", ); @@ -709,6 +707,17 @@ where } }; + let (total_sector_count, plotted_sectors_count) = sector_counts; + farmer_metrics.update_sectors_total( + single_disk_farm.id(), + total_sector_count - plotted_sectors_count, + SectorState::NotPlotted, + ); + farmer_metrics.update_sectors_total( + single_disk_farm.id(), + plotted_sectors_count, + SectorState::Plotted, + ); single_disk_farm .on_sector_update(Arc::new({ let single_disk_farm_id = *single_disk_farm.id(); @@ -748,19 +757,24 @@ where on_plotted_sector_callback(plotted_sector, old_plotted_sector); farmer_metrics.observe_sector_plotting_time(&single_disk_farm_id, time); farmer_metrics.sector_plotted.inc(); - if old_plotted_sector.is_some() { - farmer_metrics.inc_farm_replotted(&single_disk_farm_id); - } else { - farmer_metrics.inc_farm_plotted(&single_disk_farm_id, 1); - } + farmer_metrics + .update_sector_state(&single_disk_farm_id, SectorState::Plotted); } SectorUpdate::Expiration(SectorExpirationDetails::AboutToExpire) => { - farmer_metrics.inc_farm_about_to_expire(&single_disk_farm_id, 1); + farmer_metrics.update_sector_state( + &single_disk_farm_id, + SectorState::AboutToExpire, + ); } SectorUpdate::Expiration(SectorExpirationDetails::Expired) => { - farmer_metrics.inc_farm_expired(&single_disk_farm_id, 1); + farmer_metrics + .update_sector_state(&single_disk_farm_id, SectorState::Expired); + } + SectorUpdate::Expiration(SectorExpirationDetails::Determined { + .. + }) => { + // Not interested in here } - _ => {} } })) .detach(); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs index 88766f9d4a..58e4d592da 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs @@ -3,12 +3,32 @@ use prometheus_client::metrics::family::Family; use prometheus_client::metrics::gauge::Gauge; use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; use prometheus_client::registry::{Registry, Unit}; +use std::fmt; use std::sync::atomic::{AtomicI64, AtomicU64}; use std::time::Duration; use subspace_core_primitives::SectorIndex; use subspace_farmer::single_disk_farm::farming::ProvingResult; use subspace_farmer::single_disk_farm::{FarmingError, SingleDiskFarmId}; +#[derive(Debug, Copy, Clone)] +pub(super) enum SectorState { + NotPlotted, + Plotted, + AboutToExpire, + Expired, +} + +impl fmt::Display for SectorState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + Self::NotPlotted => "NotPlotted", + Self::Plotted => "Plotted", + Self::AboutToExpire => "AboutToExpire", + Self::Expired => "Expired", + }) + } +} + #[derive(Debug, Clone)] pub(super) struct FarmerMetrics { auditing_time: Family, Histogram>, @@ -18,10 +38,7 @@ pub(super) struct FarmerMetrics { sector_encoding_time: Family, Histogram>, sector_writing_time: Family, Histogram>, sector_plotting_time: Family, Histogram>, - farm_size: Family, Gauge>, - farm_plotted: Family, Gauge>, - farm_expired: Family, Gauge>, - farm_about_to_expire: Family, Gauge>, + sectors_total: Family, Gauge>, pub(super) sector_downloading: Counter, pub(super) sector_downloaded: Counter, pub(super) sector_encoding: Counter, @@ -110,40 +127,13 @@ impl FarmerMetrics { sector_plotting_time.clone(), ); - let farm_size = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default); - - sub_registry.register_with_unit( - "farm_size", - "Farm size", - Unit::Other("sectors".to_string()), - farm_size.clone(), - ); - - let farm_plotted = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default); - - sub_registry.register_with_unit( - "farm_plotted", - "Number of plotted farm sectors", - Unit::Other("sectors".to_string()), - farm_plotted.clone(), - ); - - let farm_expired = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default); - - sub_registry.register_with_unit( - "farm_expired", - "Number of expired farm sectors", - Unit::Other("sectors".to_string()), - farm_expired.clone(), - ); - - let farm_about_to_expire = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default); + let sectors_total = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default); sub_registry.register_with_unit( - "farm_about_to_expire", - "Number of farm sectors about to expire", + "sectors_total", + "Total number of sectors with corresponding state", Unit::Other("sectors".to_string()), - farm_about_to_expire.clone(), + sectors_total.clone(), ); let sector_downloading = Counter::<_, _>::default(); @@ -226,10 +216,7 @@ impl FarmerMetrics { sector_encoding_time, sector_writing_time, sector_plotting_time, - farm_size, - farm_plotted, - farm_expired, - farm_about_to_expire, + sectors_total, sector_downloading, sector_downloaded, sector_encoding, @@ -281,6 +268,73 @@ impl FarmerMetrics { .inc(); } + pub(super) fn update_sectors_total( + &self, + single_disk_farm_id: &SingleDiskFarmId, + sectors: SectorIndex, + state: SectorState, + ) { + self.sectors_total + .get_or_create(&vec![ + ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("state".to_string(), state.to_string()), + ]) + .set(i64::from(sectors)); + } + + pub(super) fn update_sector_state( + &self, + single_disk_farm_id: &SingleDiskFarmId, + state: SectorState, + ) { + self.sectors_total + .get_or_create(&vec![ + ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("state".to_string(), state.to_string()), + ]) + .inc(); + match state { + SectorState::NotPlotted => { + // Never called, doesn't make sense + } + SectorState::Plotted => { + let not_plotted_sectors = self.sectors_total.get_or_create(&vec![ + ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("state".to_string(), SectorState::NotPlotted.to_string()), + ]); + if not_plotted_sectors.get() > 0 { + // Initial plotting + not_plotted_sectors.dec(); + } else { + let expired_sectors = self.sectors_total.get_or_create(&vec![ + ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("state".to_string(), SectorState::Expired.to_string()), + ]); + if expired_sectors.get() > 0 { + // Replaced expired sector + expired_sectors.dec(); + } else { + // Replaced about to expire sector + self.sectors_total + .get_or_create(&vec![ + ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("state".to_string(), SectorState::AboutToExpire.to_string()), + ]) + .dec(); + } + } + } + SectorState::AboutToExpire | SectorState::Expired => { + self.sectors_total + .get_or_create(&vec![ + ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("state".to_string(), SectorState::Plotted.to_string()), + ]) + .dec(); + } + } + } + pub(super) fn observe_sector_downloading_time( &self, single_disk_farm_id: &SingleDiskFarmId, @@ -332,100 +386,4 @@ impl FarmerMetrics { )]) .observe(time.as_secs_f64()); } - - pub(super) fn update_farm_size( - &self, - single_disk_farm_id: &SingleDiskFarmId, - sectors: SectorIndex, - ) { - self.farm_size - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) - .set(i64::from(sectors)); - } - - pub(super) fn inc_farm_plotted( - &self, - single_disk_farm_id: &SingleDiskFarmId, - sectors: SectorIndex, - ) { - self.farm_plotted - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) - .inc_by(i64::from(sectors)); - } - - pub(super) fn inc_farm_expired( - &self, - single_disk_farm_id: &SingleDiskFarmId, - sectors: SectorIndex, - ) { - self.farm_expired - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) - .inc_by(i64::from(sectors)); - self.farm_plotted - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) - .dec_by(i64::from(sectors)); - } - - pub(super) fn inc_farm_about_to_expire( - &self, - single_disk_farm_id: &SingleDiskFarmId, - sectors: SectorIndex, - ) { - self.farm_about_to_expire - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) - .inc_by(i64::from(sectors)); - self.farm_plotted - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) - .dec_by(i64::from(sectors)); - } - - pub(super) fn inc_farm_replotted(&self, single_disk_farm_id: &SingleDiskFarmId) { - self.farm_plotted - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) - .inc(); - if self - .farm_expired - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) - .get() - > 0 - { - self.farm_expired - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) - .dec(); - } else { - self.farm_about_to_expire - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) - .dec(); - } - } } diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 06d746b605..350e4d3b45 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -1275,8 +1275,13 @@ impl SingleDiskFarm { } /// Number of sectors successfully plotted so far - pub async fn plotted_sectors_count(&self) -> usize { - self.sectors_metadata.read().await.len() + pub async fn plotted_sectors_count(&self) -> SectorIndex { + self.sectors_metadata + .read() + .await + .len() + .try_into() + .expect("Number of sectors never exceeds `SectorIndex` type; qed") } /// Read information about sectors plotted so far