Skip to content

Commit

Permalink
Merge pull request #2531 from subspace/refactor-farmer-metrics
Browse files Browse the repository at this point in the history
Refactor farmer metrics
  • Loading branch information
nazar-pc authored Feb 15, 2024
2 parents 5e59809 + d978ece commit 863c4db
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 162 deletions.
64 changes: 39 additions & 25 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ mod dsn;
mod metrics;

use crate::commands::farm::dsn::configure_dsn;
use crate::commands::farm::metrics::FarmerMetrics;
use crate::commands::farm::metrics::{FarmerMetrics, SectorState};
use crate::utils::shutdown_signal;
use anyhow::anyhow;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
use futures::channel::oneshot;
use futures::stream::FuturesUnordered;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{FutureExt, StreamExt};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
Expand Down Expand Up @@ -665,25 +665,23 @@ where

info!("Finished collecting already plotted pieces successfully");

for single_disk_farm in single_disk_farms.iter() {
farmer_metrics.update_farm_size(
single_disk_farm.id(),
single_disk_farm.total_sectors_count(),
);
farmer_metrics.inc_farm_plotted(
single_disk_farm.id(),
single_disk_farm
.plotted_sectors_count()
.await
.try_into()
.unwrap(),
);
}
let total_and_plotted_sectors = single_disk_farms
.iter()
.map(|single_disk_farm| async {
let total_sector_count = single_disk_farm.total_sectors_count();
let plotted_sectors_count = single_disk_farm.plotted_sectors_count().await;

(total_sector_count, plotted_sectors_count)
})
.collect::<FuturesOrdered<_>>()
.collect::<Vec<_>>()
.await;

let mut single_disk_farms_stream = single_disk_farms
.into_iter()
.enumerate()
.map(|(disk_farm_index, single_disk_farm)| {
.zip(total_and_plotted_sectors)
.map(|((disk_farm_index, single_disk_farm), sector_counts)| {
let disk_farm_index = disk_farm_index.try_into().expect(
"More than 256 plots are not supported, this is checked above already; qed",
);
Expand All @@ -709,6 +707,17 @@ where
}
};

let (total_sector_count, plotted_sectors_count) = sector_counts;
farmer_metrics.update_sectors_total(
single_disk_farm.id(),
total_sector_count - plotted_sectors_count,
SectorState::NotPlotted,
);
farmer_metrics.update_sectors_total(
single_disk_farm.id(),
plotted_sectors_count,
SectorState::Plotted,
);
single_disk_farm
.on_sector_update(Arc::new({
let single_disk_farm_id = *single_disk_farm.id();
Expand Down Expand Up @@ -748,19 +757,24 @@ where
on_plotted_sector_callback(plotted_sector, old_plotted_sector);
farmer_metrics.observe_sector_plotting_time(&single_disk_farm_id, time);
farmer_metrics.sector_plotted.inc();
if old_plotted_sector.is_some() {
farmer_metrics.inc_farm_replotted(&single_disk_farm_id);
} else {
farmer_metrics.inc_farm_plotted(&single_disk_farm_id, 1);
}
farmer_metrics
.update_sector_state(&single_disk_farm_id, SectorState::Plotted);
}
SectorUpdate::Expiration(SectorExpirationDetails::AboutToExpire) => {
farmer_metrics.inc_farm_about_to_expire(&single_disk_farm_id, 1);
farmer_metrics.update_sector_state(
&single_disk_farm_id,
SectorState::AboutToExpire,
);
}
SectorUpdate::Expiration(SectorExpirationDetails::Expired) => {
farmer_metrics.inc_farm_expired(&single_disk_farm_id, 1);
farmer_metrics
.update_sector_state(&single_disk_farm_id, SectorState::Expired);
}
SectorUpdate::Expiration(SectorExpirationDetails::Determined {
..
}) => {
// Not interested in here
}
_ => {}
}
}))
.detach();
Expand Down
228 changes: 93 additions & 135 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,32 @@ use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::{Registry, Unit};
use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::time::Duration;
use subspace_core_primitives::SectorIndex;
use subspace_farmer::single_disk_farm::farming::ProvingResult;
use subspace_farmer::single_disk_farm::{FarmingError, SingleDiskFarmId};

#[derive(Debug, Copy, Clone)]
pub(super) enum SectorState {
NotPlotted,
Plotted,
AboutToExpire,
Expired,
}

impl fmt::Display for SectorState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::NotPlotted => "NotPlotted",
Self::Plotted => "Plotted",
Self::AboutToExpire => "AboutToExpire",
Self::Expired => "Expired",
})
}
}

#[derive(Debug, Clone)]
pub(super) struct FarmerMetrics {
auditing_time: Family<Vec<(String, String)>, Histogram>,
Expand All @@ -18,10 +38,7 @@ pub(super) struct FarmerMetrics {
sector_encoding_time: Family<Vec<(String, String)>, Histogram>,
sector_writing_time: Family<Vec<(String, String)>, Histogram>,
sector_plotting_time: Family<Vec<(String, String)>, Histogram>,
farm_size: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
farm_plotted: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
farm_expired: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
farm_about_to_expire: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
sectors_total: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
pub(super) sector_downloading: Counter<u64, AtomicU64>,
pub(super) sector_downloaded: Counter<u64, AtomicU64>,
pub(super) sector_encoding: Counter<u64, AtomicU64>,
Expand Down Expand Up @@ -110,40 +127,13 @@ impl FarmerMetrics {
sector_plotting_time.clone(),
);

let farm_size = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default);

sub_registry.register_with_unit(
"farm_size",
"Farm size",
Unit::Other("sectors".to_string()),
farm_size.clone(),
);

let farm_plotted = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default);

sub_registry.register_with_unit(
"farm_plotted",
"Number of plotted farm sectors",
Unit::Other("sectors".to_string()),
farm_plotted.clone(),
);

let farm_expired = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default);

sub_registry.register_with_unit(
"farm_expired",
"Number of expired farm sectors",
Unit::Other("sectors".to_string()),
farm_expired.clone(),
);

let farm_about_to_expire = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default);
let sectors_total = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default);

sub_registry.register_with_unit(
"farm_about_to_expire",
"Number of farm sectors about to expire",
"sectors_total",
"Total number of sectors with corresponding state",
Unit::Other("sectors".to_string()),
farm_about_to_expire.clone(),
sectors_total.clone(),
);

let sector_downloading = Counter::<_, _>::default();
Expand Down Expand Up @@ -226,10 +216,7 @@ impl FarmerMetrics {
sector_encoding_time,
sector_writing_time,
sector_plotting_time,
farm_size,
farm_plotted,
farm_expired,
farm_about_to_expire,
sectors_total,
sector_downloading,
sector_downloaded,
sector_encoding,
Expand Down Expand Up @@ -281,6 +268,73 @@ impl FarmerMetrics {
.inc();
}

pub(super) fn update_sectors_total(
&self,
single_disk_farm_id: &SingleDiskFarmId,
sectors: SectorIndex,
state: SectorState,
) {
self.sectors_total
.get_or_create(&vec![
("farm_id".to_string(), single_disk_farm_id.to_string()),
("state".to_string(), state.to_string()),
])
.set(i64::from(sectors));
}

pub(super) fn update_sector_state(
&self,
single_disk_farm_id: &SingleDiskFarmId,
state: SectorState,
) {
self.sectors_total
.get_or_create(&vec![
("farm_id".to_string(), single_disk_farm_id.to_string()),
("state".to_string(), state.to_string()),
])
.inc();
match state {
SectorState::NotPlotted => {
// Never called, doesn't make sense
}
SectorState::Plotted => {
let not_plotted_sectors = self.sectors_total.get_or_create(&vec![
("farm_id".to_string(), single_disk_farm_id.to_string()),
("state".to_string(), SectorState::NotPlotted.to_string()),
]);
if not_plotted_sectors.get() > 0 {
// Initial plotting
not_plotted_sectors.dec();
} else {
let expired_sectors = self.sectors_total.get_or_create(&vec![
("farm_id".to_string(), single_disk_farm_id.to_string()),
("state".to_string(), SectorState::Expired.to_string()),
]);
if expired_sectors.get() > 0 {
// Replaced expired sector
expired_sectors.dec();
} else {
// Replaced about to expire sector
self.sectors_total
.get_or_create(&vec![
("farm_id".to_string(), single_disk_farm_id.to_string()),
("state".to_string(), SectorState::AboutToExpire.to_string()),
])
.dec();
}
}
}
SectorState::AboutToExpire | SectorState::Expired => {
self.sectors_total
.get_or_create(&vec![
("farm_id".to_string(), single_disk_farm_id.to_string()),
("state".to_string(), SectorState::Plotted.to_string()),
])
.dec();
}
}
}

pub(super) fn observe_sector_downloading_time(
&self,
single_disk_farm_id: &SingleDiskFarmId,
Expand Down Expand Up @@ -332,100 +386,4 @@ impl FarmerMetrics {
)])
.observe(time.as_secs_f64());
}

pub(super) fn update_farm_size(
&self,
single_disk_farm_id: &SingleDiskFarmId,
sectors: SectorIndex,
) {
self.farm_size
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.set(i64::from(sectors));
}

pub(super) fn inc_farm_plotted(
&self,
single_disk_farm_id: &SingleDiskFarmId,
sectors: SectorIndex,
) {
self.farm_plotted
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.inc_by(i64::from(sectors));
}

pub(super) fn inc_farm_expired(
&self,
single_disk_farm_id: &SingleDiskFarmId,
sectors: SectorIndex,
) {
self.farm_expired
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.inc_by(i64::from(sectors));
self.farm_plotted
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.dec_by(i64::from(sectors));
}

pub(super) fn inc_farm_about_to_expire(
&self,
single_disk_farm_id: &SingleDiskFarmId,
sectors: SectorIndex,
) {
self.farm_about_to_expire
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.inc_by(i64::from(sectors));
self.farm_plotted
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.dec_by(i64::from(sectors));
}

pub(super) fn inc_farm_replotted(&self, single_disk_farm_id: &SingleDiskFarmId) {
self.farm_plotted
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.inc();
if self
.farm_expired
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.get()
> 0
{
self.farm_expired
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.dec();
} else {
self.farm_about_to_expire
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.dec();
}
}
}
9 changes: 7 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1275,8 +1275,13 @@ impl SingleDiskFarm {
}

/// Number of sectors successfully plotted so far
pub async fn plotted_sectors_count(&self) -> usize {
self.sectors_metadata.read().await.len()
pub async fn plotted_sectors_count(&self) -> SectorIndex {
self.sectors_metadata
.read()
.await
.len()
.try_into()
.expect("Number of sectors never exceeds `SectorIndex` type; qed")
}

/// Read information about sectors plotted so far
Expand Down

0 comments on commit 863c4db

Please sign in to comment.