Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
someone235 committed Jan 5, 2024
1 parent 2d7d0cb commit d5bab35
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
14 changes: 10 additions & 4 deletions protocol/flows/src/v5/ibd/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ pub enum IbdType {
DownloadHeadersProof,
}

struct QueueChunkOutput {
jobs: Vec<BlockValidationFuture>,
daa_score: u64,
timestamp: u64,
}
// TODO: define a peer banning strategy

impl IbdFlow {
Expand Down Expand Up @@ -520,11 +525,12 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
let mut progress_reporter = ProgressReporter::new(low_header.daa_score, high_header.daa_score, "blocks");

let mut iter = hashes.chunks(IBD_BATCH_SIZE);
let (mut prev_jobs, mut prev_daa_score, mut prev_timestamp) =
let QueueChunkOutput { jobs: mut prev_jobs, daa_score: mut prev_daa_score, timestamp: mut prev_timestamp } =
self.queue_block_processing_chunk(consensus, iter.next().expect("hashes was non empty")).await?;

for chunk in iter {
let (current_jobs, current_daa_score, current_timestamp) = self.queue_block_processing_chunk(consensus, chunk).await?;
let QueueChunkOutput { jobs: current_jobs, daa_score: current_daa_score, timestamp: current_timestamp } =
self.queue_block_processing_chunk(consensus, chunk).await?;
let prev_chunk_len = prev_jobs.len();
// Join the previous chunk so that we always concurrently process a chunk and receive another
try_join_all(prev_jobs).await?;
Expand All @@ -546,7 +552,7 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
&mut self,
consensus: &ConsensusProxy,
chunk: &[Hash],
) -> Result<(Vec<BlockValidationFuture>, u64, u64), ProtocolError> {
) -> Result<QueueChunkOutput, ProtocolError> {
let mut jobs = Vec::with_capacity(chunk.len());
let mut current_daa_score = 0;
let mut current_timestamp = 0;
Expand All @@ -570,6 +576,6 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
jobs.push(consensus.validate_and_insert_block(block).virtual_state_task);
}

Ok((jobs, current_daa_score, current_timestamp))
Ok(QueueChunkOutput { jobs, daa_score: current_daa_score, timestamp: current_timestamp })
}
}
16 changes: 6 additions & 10 deletions protocol/flows/src/v5/ibd/progress.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::{Duration, Instant};

use chrono::DateTime;
use chrono::{Local, LocalResult, TimeZone};
use kaspa_core::info;

/// Minimum number of items to report
Expand Down Expand Up @@ -49,15 +49,11 @@ impl ProgressReporter {
let relative_daa_score = if current_daa_score > self.low_daa_score { current_daa_score - self.low_daa_score } else { 0 };
let percent = ((relative_daa_score as f64 / (self.high_daa_score - self.low_daa_score) as f64) * 100.0) as i32;
if percent > self.last_reported_percent {
info!(
"IBD: Processed {} {} ({}%) last block timestamp: {}",
self.processed,
self.object_name,
percent,
DateTime::from_timestamp(current_timestamp as i64 / 1000, 1000 * (current_timestamp as u32 % 1000))
.expect("consensus validated the timestamp is within a valid range")
.format("%Y-%m-%d %H:%M:%S.%3f:%z"),
);
let date = match Local.timestamp_opt(current_timestamp as i64 / 1000, 1000 * (current_timestamp as u32 % 1000)) {
LocalResult::None | LocalResult::Ambiguous(_, _) => "couldn't parse date".into(),
LocalResult::Single(date) => date.format("%Y-%m-%d %H:%M:%S.%3f:%z").to_string(),
};
info!("IBD: Processed {} {} ({}%) last block timestamp: {}", self.processed, self.object_name, percent, date);
self.last_reported_percent = percent;
}
self.last_log_time = now;
Expand Down

0 comments on commit d5bab35

Please sign in to comment.