Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamps to IBD progress #376

Merged
merged 5 commits into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ tower-http = { version = "0.4.4", features = [
] }
tower = "0.4.7"
hyper = "0.14.27"
chrono = "0.4.31"
# workflow dependencies that are not a part of core libraries

# workflow-perf-monitor = { path = "../../../workflow-perf-monitor-rs" }
Expand Down
3 changes: 2 additions & 1 deletion protocol/flows/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ log.workspace = true
parking_lot.workspace = true
rand.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = [ "rt-multi-thread", "macros", "signal" ] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }
tokio-stream = { workspace = true, features = ["net"] }
uuid = { workspace = true, features = ["v4", "fast-rng"] }
chrono.workspace = true
32 changes: 24 additions & 8 deletions protocol/flows/src/v5/ibd/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ pub enum IbdType {
DownloadHeadersProof,
}

struct QueueChunkOutput {
jobs: Vec<BlockValidationFuture>,
daa_score: u64,
timestamp: u64,
}
// TODO: define a peer banning strategy

impl IbdFlow {
Expand Down Expand Up @@ -372,12 +377,18 @@ impl IbdFlow {
let mut chunk_stream = HeadersChunkStream::new(&self.router, &mut self.incoming_route);

if let Some(chunk) = chunk_stream.next().await? {
let mut prev_daa_score = chunk.last().expect("chunk is never empty").daa_score;
let (mut prev_daa_score, mut prev_timestamp) = {
let last_header = chunk.last().expect("chunk is never empty");
(last_header.daa_score, last_header.timestamp)
};
let mut prev_jobs: Vec<BlockValidationFuture> =
chunk.into_iter().map(|h| consensus.validate_and_insert_block(Block::from_header_arc(h)).virtual_state_task).collect();

while let Some(chunk) = chunk_stream.next().await? {
let current_daa_score = chunk.last().expect("chunk is never empty").daa_score;
let (current_daa_score, current_timestamp) = {
let last_header = chunk.last().expect("chunk is never empty");
(last_header.daa_score, last_header.timestamp)
};
let current_jobs = chunk
.into_iter()
.map(|h| consensus.validate_and_insert_block(Block::from_header_arc(h)).virtual_state_task)
Expand All @@ -386,8 +397,9 @@ impl IbdFlow {
// Join the previous chunk so that we always concurrently process a chunk and receive another
try_join_all(prev_jobs).await?;
// Log the progress
progress_reporter.report(prev_chunk_len, prev_daa_score);
progress_reporter.report(prev_chunk_len, prev_daa_score, prev_timestamp);
prev_daa_score = current_daa_score;
prev_timestamp = current_timestamp;
prev_jobs = current_jobs;
}

Expand Down Expand Up @@ -513,17 +525,19 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
let mut progress_reporter = ProgressReporter::new(low_header.daa_score, high_header.daa_score, "blocks");

let mut iter = hashes.chunks(IBD_BATCH_SIZE);
let (mut prev_jobs, mut prev_daa_score) =
let QueueChunkOutput { jobs: mut prev_jobs, daa_score: mut prev_daa_score, timestamp: mut prev_timestamp } =
self.queue_block_processing_chunk(consensus, iter.next().expect("hashes was non empty")).await?;

for chunk in iter {
let (current_jobs, current_daa_score) = self.queue_block_processing_chunk(consensus, chunk).await?;
let QueueChunkOutput { jobs: current_jobs, daa_score: current_daa_score, timestamp: current_timestamp } =
self.queue_block_processing_chunk(consensus, chunk).await?;
let prev_chunk_len = prev_jobs.len();
// Join the previous chunk so that we always concurrently process a chunk and receive another
try_join_all(prev_jobs).await?;
// Log the progress
progress_reporter.report(prev_chunk_len, prev_daa_score);
progress_reporter.report(prev_chunk_len, prev_daa_score, prev_timestamp);
prev_daa_score = current_daa_score;
prev_timestamp = current_timestamp;
prev_jobs = current_jobs;
}

Expand All @@ -538,9 +552,10 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
&mut self,
consensus: &ConsensusProxy,
chunk: &[Hash],
) -> Result<(Vec<BlockValidationFuture>, u64), ProtocolError> {
) -> Result<QueueChunkOutput, ProtocolError> {
let mut jobs = Vec::with_capacity(chunk.len());
let mut current_daa_score = 0;
let mut current_timestamp = 0;
self.router
.enqueue(make_message!(
Payload::RequestIbdBlocks,
Expand All @@ -557,9 +572,10 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
return Err(ProtocolError::OtherOwned(format!("sent header of {} where expected block with body", block.hash())));
}
current_daa_score = block.header.daa_score;
current_timestamp = block.header.timestamp;
jobs.push(consensus.validate_and_insert_block(block).virtual_state_task);
}

Ok((jobs, current_daa_score))
Ok(QueueChunkOutput { jobs, daa_score: current_daa_score, timestamp: current_timestamp })
}
}
9 changes: 7 additions & 2 deletions protocol/flows/src/v5/ibd/progress.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::{Duration, Instant};

use chrono::{Local, LocalResult, TimeZone};
use kaspa_core::info;

/// Minimum number of items to report
Expand Down Expand Up @@ -34,7 +35,7 @@ impl ProgressReporter {
}
}

pub fn report(&mut self, processed_delta: usize, current_daa_score: u64) {
pub fn report(&mut self, processed_delta: usize, current_daa_score: u64, current_timestamp: u64) {
self.current_batch += processed_delta;
let now = Instant::now();
if now - self.last_log_time < REPORT_TIME_GRANULARITY && self.current_batch < REPORT_BATCH_GRANULARITY && self.processed > 0 {
Expand All @@ -48,7 +49,11 @@ impl ProgressReporter {
let relative_daa_score = if current_daa_score > self.low_daa_score { current_daa_score - self.low_daa_score } else { 0 };
let percent = ((relative_daa_score as f64 / (self.high_daa_score - self.low_daa_score) as f64) * 100.0) as i32;
if percent > self.last_reported_percent {
info!("IBD: Processed {} {} ({}%)", self.processed, self.object_name, percent);
let date = match Local.timestamp_opt(current_timestamp as i64 / 1000, 1000 * (current_timestamp as u32 % 1000)) {
LocalResult::None | LocalResult::Ambiguous(_, _) => "cannot parse date".into(),
LocalResult::Single(date) => date.format("%Y-%m-%d %H:%M:%S.%3f:%z").to_string(),
};
info!("IBD: Processed {} {} ({}%) last block timestamp: {}", self.processed, self.object_name, percent, date);
self.last_reported_percent = percent;
}
self.last_log_time = now;
Expand Down