Skip to content

Commit

Permalink
workflow: Trace each step execution time
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Dec 6, 2024
1 parent 5a9ff8e commit 9141dc5
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions src/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ pub enum WorkflowStep {
Count,
}

impl std::fmt::Display for WorkflowStep {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkflowStep::Scan(..) => write!(f, "scan"),
WorkflowStep::Filter(..) => write!(f, "filter"),
WorkflowStep::Project(..) => write!(f, "project"),
WorkflowStep::Limit(limit) => write!(f, "limit({})", limit),
WorkflowStep::Sort(..) => write!(f, "sort"),
WorkflowStep::TopN(.., limit) => write!(f, "top-n({})", limit),
WorkflowStep::Summarize(..) => write!(f, "summarize"),
WorkflowStep::Union(..) => write!(f, "union"),
WorkflowStep::Count => write!(f, "count"),
}
}
}

#[derive(Debug, Clone)]
pub struct Workflow {
pub steps: Vec<WorkflowStep>,
Expand Down Expand Up @@ -112,11 +128,14 @@ async fn logs_vec_to_tx(logs: Vec<Log>, tx: mpsc::Sender<Log>, tag: &str) -> Res
}

impl WorkflowStep {
#[instrument(skip(rx, tx), fields(step = %self))]
async fn execute(
self: WorkflowStep,
rx: Option<mpsc::Receiver<Log>>,
tx: mpsc::Sender<Log>,
) -> Result<()> {
let start = Instant::now();

match self {
WorkflowStep::Scan(Scan {
collection,
Expand Down Expand Up @@ -232,6 +251,9 @@ impl WorkflowStep {
}
}

let duration = start.elapsed();
info!(elapsed_time = ?duration, "Workflow step execution time");

Ok(())
}
}
Expand Down

0 comments on commit 9141dc5

Please sign in to comment.