Skip to content

Commit

Permalink
feat(resharding): add metrics for flat storage resharding (#12571)
Browse files Browse the repository at this point in the history
Adding two new metrics to monitor resharding:
- `near_flat_storage_resharding_status`
- `near_flat_storage_resharding_split_shard_processed_batches`

Reusing the existing metric `near_flat_storage_head_height` to monitor
shard catchup.

Part of #12174
  • Loading branch information
Trisfald authored Dec 11, 2024
1 parent e594174 commit 3a33b7a
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 26 deletions.
83 changes: 59 additions & 24 deletions chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use near_store::adapter::flat_store::{FlatStoreAdapter, FlatStoreUpdateAdapter};
use near_store::adapter::StoreAdapter;
use near_store::flat::{
BlockInfo, FlatStateChanges, FlatStorageError, FlatStorageReadyStatus,
FlatStorageReshardingShardCatchUpMetrics, FlatStorageReshardingShardSplitMetrics,
FlatStorageReshardingStatus, FlatStorageStatus, ParentSplitParameters,
};
use near_store::{ShardUId, StorageError};
Expand Down Expand Up @@ -261,6 +262,14 @@ impl FlatStorageResharder {
TaskExecutionStatus::NotStarted,
);
self.set_resharding_event(event);

let metrics = FlatStorageReshardingShardSplitMetrics::new(
parent_shard,
split_params.left_child_shard,
split_params.right_child_shard,
);
metrics.update_shards_status(&self.runtime.get_flat_storage_manager());

info!(target: "resharding", ?parent_shard, ?split_params,"scheduling flat storage shard split");
let resharder = self.clone();
// Send a request to schedule the execution of `split_shard_task`, to do the bulk of the
Expand Down Expand Up @@ -344,16 +353,30 @@ impl FlatStorageResharder {
}

// We know that the resharding block has become final so let's start the real work.
let task_status = self.split_shard_task_impl();
self.split_shard_task_postprocessing(task_status);
let (parent_shard, split_params) = self
.get_parent_shard_and_split_params()
.expect("flat storage resharding event must be Split!");
let metrics = FlatStorageReshardingShardSplitMetrics::new(
parent_shard,
split_params.left_child_shard,
split_params.right_child_shard,
);

let task_status = self.split_shard_task_impl(parent_shard, &split_params, &metrics);
self.split_shard_task_postprocessing(parent_shard, split_params, &metrics, task_status);
info!(target: "resharding", ?task_status, "flat storage shard split task finished");
task_status
}

/// Performs the bulk of [split_shard_task].
///
/// Returns `true` if the routine completed successfully.
fn split_shard_task_impl(&self) -> FlatStorageReshardingSchedulableTaskResult {
fn split_shard_task_impl(
&self,
parent_shard: ShardUId,
split_params: &ParentSplitParameters,
metrics: &FlatStorageReshardingShardSplitMetrics,
) -> FlatStorageReshardingSchedulableTaskResult {
self.set_resharding_event_execution_status(TaskExecutionStatus::Started);

// Exit early if the task has already been cancelled.
Expand All @@ -364,12 +387,10 @@ impl FlatStorageResharder {
// Determines after how many bytes worth of key-values the process stops to commit changes
// and to check cancellation.
let batch_size = self.resharding_config.get().batch_size.as_u64() as usize;
metrics.set_split_shard_batch_size(batch_size);
// Delay between every batch.
let batch_delay = self.resharding_config.get().batch_delay.unsigned_abs();

let (parent_shard, split_params) = self
.get_parent_shard_and_split_params()
.expect("flat storage resharding event must be Split!");
info!(target: "resharding", ?parent_shard, ?split_params, ?batch_delay, ?batch_size, "flat storage shard split task: starting key-values copy");

// Prepare the store object for commits and the iterator over parent's flat storage.
Expand All @@ -387,6 +408,7 @@ impl FlatStorageResharder {
};

let mut num_batches_done: usize = 0;
metrics.set_split_shard_processed_bytes(0);
let mut iter_exhausted = false;

loop {
Expand Down Expand Up @@ -432,6 +454,8 @@ impl FlatStorageResharder {
}

num_batches_done += 1;
metrics.set_split_shard_processed_batches(num_batches_done);
metrics.inc_split_shard_processed_bytes_by(processed_size);

// If `iter`` is exhausted we can exit after the store commit.
if iter_exhausted {
Expand All @@ -457,11 +481,11 @@ impl FlatStorageResharder {
)]
fn split_shard_task_postprocessing(
&self,
parent_shard: ShardUId,
split_params: ParentSplitParameters,
metrics: &FlatStorageReshardingShardSplitMetrics,
task_status: FlatStorageReshardingSchedulableTaskResult,
) {
let (parent_shard, split_params) = self
.get_parent_shard_and_split_params()
.expect("flat storage resharding event must be Split!");
let ParentSplitParameters {
left_child_shard,
right_child_shard,
Expand Down Expand Up @@ -524,6 +548,7 @@ impl FlatStorageResharder {
}
store_update.commit().unwrap();
self.remove_resharding_event();
metrics.update_shards_status(&self.runtime.get_flat_storage_manager());
}

/// Returns an iterator over a shard's flat storage at the given block hash. This
Expand Down Expand Up @@ -591,14 +616,15 @@ impl FlatStorageResharder {
chain_store: &ChainStore,
) -> FlatStorageReshardingTaskResult {
info!(target: "resharding", ?shard_uid, ?flat_head_block_hash, "flat storage shard catchup task started");
let metrics = FlatStorageReshardingShardCatchUpMetrics::new(&shard_uid);
// Apply deltas and then create the flat storage.
let apply_result =
self.shard_catchup_apply_deltas(shard_uid, flat_head_block_hash, chain_store);
self.shard_catchup_apply_deltas(shard_uid, flat_head_block_hash, chain_store, &metrics);
let Ok((num_batches_done, flat_head)) = apply_result else {
error!(target: "resharding", ?shard_uid, err = ?apply_result.unwrap_err(), "flat storage shard catchup delta application failed!");
return FlatStorageReshardingTaskResult::Failed;
};
match self.shard_catchup_finalize_storage(shard_uid, &flat_head) {
match self.shard_catchup_finalize_storage(shard_uid, &flat_head, &metrics) {
Ok(_) => {
let task_status = FlatStorageReshardingTaskResult::Successful { num_batches_done };
info!(target: "resharding", ?shard_uid, ?task_status, "flat storage shard catchup task finished");
Expand All @@ -621,6 +647,7 @@ impl FlatStorageResharder {
shard_uid: ShardUId,
mut flat_head_block_hash: CryptoHash,
chain_store: &ChainStore,
metrics: &FlatStorageReshardingShardCatchUpMetrics,
) -> Result<(usize, Tip), Error> {
// How many block heights of deltas are applied in a single commit.
let catch_up_blocks = self.resharding_config.get().catch_up_blocks;
Expand Down Expand Up @@ -684,7 +711,9 @@ impl FlatStorageResharder {
)),
);
store_update.commit()?;

num_batches_done += 1;
metrics.set_head_height(chain_store.get_block_height(&flat_head_block_hash)?);

// Sleep between batches in order to throttle resharding and leave some resource for the
// regular node operation.
Expand All @@ -704,6 +733,7 @@ impl FlatStorageResharder {
&self,
shard_uid: ShardUId,
flat_head: &Tip,
metrics: &FlatStorageReshardingShardCatchUpMetrics,
) -> Result<(), Error> {
// GC deltas from forks which could have appeared on chain during catchup.
let store = self.runtime.store().flat_store();
Expand All @@ -720,17 +750,16 @@ impl FlatStorageResharder {
}
}
// Set the flat storage status to `Ready`.
store_update.set_flat_storage_status(
shard_uid,
FlatStorageStatus::Ready(FlatStorageReadyStatus {
flat_head: BlockInfo {
hash: flat_head.last_block_hash,
prev_hash: flat_head.prev_block_hash,
height: flat_head.height,
},
}),
);
let flat_storage_status = FlatStorageStatus::Ready(FlatStorageReadyStatus {
flat_head: BlockInfo {
hash: flat_head.last_block_hash,
prev_hash: flat_head.prev_block_hash,
height: flat_head.height,
},
});
store_update.set_flat_storage_status(shard_uid, flat_storage_status.clone());
store_update.commit()?;
metrics.set_status(&flat_storage_status);
info!(target: "resharding", ?shard_uid, %deltas_gc_count, "garbage collected flat storage deltas");
// Create the flat storage entry for this shard in the manager.
self.runtime.get_flat_storage_manager().create_flat_storage_for_shard(shard_uid)?;
Expand All @@ -747,7 +776,7 @@ impl FlatStorageResharder {
debug_assert!(!current_event.has_started());
// Clean up the database state.
match current_event {
FlatStorageReshardingEventStatus::SplitShard(parent_shard, split_status, _) => {
FlatStorageReshardingEventStatus::SplitShard(parent_shard, split_status, ..) => {
let flat_store = self.runtime.store().flat_store();
let mut store_update = flat_store.store_update();
// Parent go back to Ready state.
Expand Down Expand Up @@ -1038,7 +1067,7 @@ impl FlatStorageReshardingEventStatus {

fn resharding_hash(&self) -> CryptoHash {
match self {
FlatStorageReshardingEventStatus::SplitShard(_, split_status, _) => {
FlatStorageReshardingEventStatus::SplitShard(_, split_status, ..) => {
split_status.resharding_hash
}
}
Expand Down Expand Up @@ -1400,7 +1429,13 @@ mod tests {

// Immediately cancel the resharding and call the resharding task.
controller.handle.stop();
resharder.split_shard_task_impl();
let (parent_shard, split_params) = resharder.get_parent_shard_and_split_params().unwrap();
let metrics = FlatStorageReshardingShardSplitMetrics::new(
parent_shard,
split_params.left_child_shard,
split_params.right_child_shard,
);
resharder.split_shard_task_impl(parent_shard, &split_params, &metrics);

assert!(resharder.resharding_event().is_some());
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_err());
Expand Down
103 changes: 103 additions & 0 deletions core/store/src/flat/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::{FlatStorageManager, FlatStorageStatus};
use crate::metrics::flat_state_metrics;
use near_o11y::metrics::IntGauge;
use near_primitives::{shard_layout::ShardUId, types::BlockHeight};
Expand Down Expand Up @@ -50,3 +51,105 @@ impl FlatStorageMetrics {
self.cached_changes_size.set(cached_changes_size as i64);
}
}

/// Metrics for flat storage resharding.
///
/// This struct is a collection of metrics to monitor the operation of splitting a shard.
pub struct FlatStorageReshardingShardSplitMetrics {
parent_shard: ShardUId,
left_child_shard: ShardUId,
right_child_shard: ShardUId,
parent_status: IntGauge,
left_child_status: IntGauge,
right_child_status: IntGauge,
split_shard_processed_batches: IntGauge,
split_shard_batch_size: IntGauge,
split_shard_processed_bytes: IntGauge,
}

impl FlatStorageReshardingShardSplitMetrics {
pub fn new(
parent_shard: ShardUId,
left_child_shard: ShardUId,
right_child_shard: ShardUId,
) -> Self {
use flat_state_metrics::*;
let parent_shard_label = parent_shard.to_string();
let left_child_shard_label = left_child_shard.to_string();
let right_child_shard_label = right_child_shard.to_string();
Self {
parent_shard,
left_child_shard,
right_child_shard,
parent_status: resharding::STATUS.with_label_values(&[&parent_shard_label]),
left_child_status: resharding::STATUS.with_label_values(&[&left_child_shard_label]),
right_child_status: resharding::STATUS.with_label_values(&[&right_child_shard_label]),
split_shard_processed_batches: resharding::SPLIT_SHARD_PROCESSED_BATCHES
.with_label_values(&[&parent_shard_label]),
split_shard_batch_size: resharding::SPLIT_SHARD_BATCH_SIZE.clone(),
split_shard_processed_bytes: resharding::SPLIT_SHARD_PROCESSED_BYTES
.with_label_values(&[&parent_shard_label]),
}
}

pub fn set_parent_status(&self, status: &FlatStorageStatus) {
self.parent_status.set(status.into());
}

pub fn set_left_child_status(&self, status: &FlatStorageStatus) {
self.left_child_status.set(status.into());
}

pub fn set_right_child_status(&self, status: &FlatStorageStatus) {
self.right_child_status.set(status.into());
}

pub fn set_split_shard_processed_batches(&self, num_batches: usize) {
self.split_shard_processed_batches.set(num_batches as i64);
}

pub fn update_shards_status(&self, manager: &FlatStorageManager) {
self.set_parent_status(&manager.get_flat_storage_status(self.parent_shard));
self.set_left_child_status(&manager.get_flat_storage_status(self.left_child_shard));
self.set_right_child_status(&manager.get_flat_storage_status(self.right_child_shard));
}

pub fn set_split_shard_batch_size(&self, batch_size: usize) {
self.split_shard_batch_size.set(batch_size as i64);
}

pub fn set_split_shard_processed_bytes(&self, bytes: usize) {
self.split_shard_processed_bytes.set(bytes as i64);
}

pub fn inc_split_shard_processed_bytes_by(&self, processed_bytes: usize) {
self.split_shard_processed_bytes.add(processed_bytes as i64);
}
}

/// Metrics for flat storage resharding.
///
/// This struct is a collection of metrics to monitor the catch up phase of a new shard.
pub struct FlatStorageReshardingShardCatchUpMetrics {
status: IntGauge,
head_height: IntGauge,
}

impl FlatStorageReshardingShardCatchUpMetrics {
pub fn new(shard_uid: &ShardUId) -> Self {
use flat_state_metrics::*;
let shard_label = shard_uid.to_string();
Self {
status: resharding::STATUS.with_label_values(&[&shard_label]),
head_height: FLAT_STORAGE_HEAD_HEIGHT.with_label_values(&[&shard_label]),
}
}

pub fn set_status(&self, status: &FlatStorageStatus) {
self.status.set(status.into());
}

pub fn set_head_height(&self, height: u64) {
self.head_height.set(height as i64);
}
}
3 changes: 3 additions & 0 deletions core/store/src/flat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ mod types;
pub use chunk_view::FlatStorageChunkView;
pub use delta::{FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata};
pub use manager::FlatStorageManager;
pub use metrics::{
FlatStorageReshardingShardCatchUpMetrics, FlatStorageReshardingShardSplitMetrics,
};
pub use storage::FlatStorage;
pub use types::{
BlockInfo, FetchingStateStatus, FlatStateIterator, FlatStorageCreationStatus, FlatStorageError,
Expand Down
4 changes: 2 additions & 2 deletions core/store/src/flat/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl From<FlatStorageError> for StorageError {
pub type FlatStorageResult<T> = Result<T, FlatStorageError>;

#[derive(
BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq, serde::Serialize, ProtocolSchema,
BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq, serde::Serialize, ProtocolSchema, Clone,
)]
pub enum FlatStorageStatus {
/// Flat Storage is not supported.
Expand Down Expand Up @@ -89,7 +89,7 @@ impl Into<i64> for &FlatStorageStatus {
}

#[derive(
BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq, serde::Serialize, ProtocolSchema,
BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq, serde::Serialize, ProtocolSchema, Clone,
)]
pub struct FlatStorageReadyStatus {
pub flat_head: BlockInfo,
Expand Down
39 changes: 39 additions & 0 deletions core/store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,45 @@ pub mod flat_state_metrics {
.unwrap()
});
}

pub mod resharding {
use near_o11y::metrics::{
try_create_int_gauge, try_create_int_gauge_vec, IntGauge, IntGaugeVec,
};
use std::sync::LazyLock;

pub static STATUS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
try_create_int_gauge_vec(
"near_flat_storage_resharding_status",
"Integer representing status of flat storage resharding",
&["shard_uid"],
)
.unwrap()
});
pub static SPLIT_SHARD_PROCESSED_BATCHES: LazyLock<IntGaugeVec> = LazyLock::new(|| {
try_create_int_gauge_vec(
"near_flat_storage_resharding_split_shard_processed_batches",
"Number of processed batches inside the split shard task",
&["shard_uid"],
)
.unwrap()
});
pub static SPLIT_SHARD_BATCH_SIZE: LazyLock<IntGauge> = LazyLock::new(|| {
try_create_int_gauge(
"near_flat_storage_resharding_split_shard_batch_size",
"Size in bytes of every batch inside the split shard task",
)
.unwrap()
});
pub static SPLIT_SHARD_PROCESSED_BYTES: LazyLock<IntGaugeVec> = LazyLock::new(|| {
try_create_int_gauge_vec(
"near_flat_storage_resharding_split_shard_processed_bytes",
"Total bytes of Flat State that have been split inside the split shard task",
&["shard_uid"],
)
.unwrap()
});
}
}

pub static COLD_STORE_MIGRATION_BATCH_WRITE_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
Expand Down

0 comments on commit 3a33b7a

Please sign in to comment.