Skip to content

Commit

Permalink
ARROW-12432: [Rust] [DataFusion] Add metrics to SortExec
Browse files Browse the repository at this point in the history
Add `outputRows` and `sortTime` metrics to SortExec.

Example output from Ballista:

```
SortExec { input: ProjectionExec { expr: [(Column { name: "l_shipmode" }, "l_shipmode"), (Column { name: "SUM(CASE WHEN
  Metrics: sortTime=44444, outputRows=2
```

Closes #10078 from andygrove/sortexec-metrics

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
  • Loading branch information
andygrove committed Apr 18, 2021
1 parent 26cdb15 commit 9a4ef46
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 15 deletions.
7 changes: 2 additions & 5 deletions rust/datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use futures::{
};

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{Accumulator, AggregateExpr, MetricType, SQLMetric};
use crate::physical_plan::{Accumulator, AggregateExpr, SQLMetric};
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, PhysicalExpr};

use arrow::{
Expand Down Expand Up @@ -144,10 +144,7 @@ impl HashAggregateExec {

let schema = Arc::new(schema);

let output_rows = Arc::new(Mutex::new(SQLMetric::new(
"outputRows",
MetricType::Counter,
)));
let output_rows = SQLMetric::counter("outputRows");

Ok(HashAggregateExec {
mode,
Expand Down
14 changes: 13 additions & 1 deletion rust/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Traits for physical query plan, supporting parallel execution for partitioned relations.
use std::fmt::{Debug, Display};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::{any::Any, pin::Pin};

use crate::execution::context::ExecutionContextState;
Expand Down Expand Up @@ -52,6 +52,8 @@ pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send + Sync
pub enum MetricType {
/// Simple counter
Counter,
/// Wall clock time in nanoseconds
TimeNanos,
}

/// SQL metric such as counter (number of input or output rows) or timing information about
Expand All @@ -67,6 +69,16 @@ pub struct SQLMetric {
}

impl SQLMetric {
/// Create a new metric for tracking a counter
pub fn counter(name: &str) -> Arc<Mutex<SQLMetric>> {
Arc::new(Mutex::new(SQLMetric::new(name, MetricType::Counter)))
}

/// Create a new metric for tracking time in nanoseconds
pub fn time_nanos(name: &str) -> Arc<Mutex<SQLMetric>> {
Arc::new(Mutex::new(SQLMetric::new(name, MetricType::TimeNanos)))
}

/// Create a new SQLMetric
pub fn new(name: &str, metric_type: MetricType) -> Self {
Self {
Expand Down
71 changes: 62 additions & 9 deletions rust/datafusion/src/physical_plan/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Instant;

use async_trait::async_trait;
use futures::stream::Stream;
use futures::Future;
use hashbrown::HashMap;

use pin_project_lite::pin_project;

Expand All @@ -37,9 +40,9 @@ use arrow::{array::ArrayRef, error::ArrowError};
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{common, Distribution, ExecutionPlan, Partitioning};

use async_trait::async_trait;
use crate::physical_plan::{
common, Distribution, ExecutionPlan, Partitioning, SQLMetric,
};

/// Sort execution plan
#[derive(Debug)]
Expand All @@ -48,6 +51,10 @@ pub struct SortExec {
input: Arc<dyn ExecutionPlan>,
/// Sort expressions
expr: Vec<PhysicalSortExpr>,
/// Output rows
output_rows: Arc<Mutex<SQLMetric>>,
/// Time to sort batches
sort_time_nanos: Arc<Mutex<SQLMetric>>,
}

impl SortExec {
Expand All @@ -56,7 +63,12 @@ impl SortExec {
expr: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
Ok(Self { expr, input })
Ok(Self {
expr,
input,
output_rows: SQLMetric::counter("outputRows"),
sort_time_nanos: SQLMetric::time_nanos("sortTime"),
})
}

/// Input schema
Expand Down Expand Up @@ -125,7 +137,25 @@ impl ExecutionPlan for SortExec {
}
let input = self.input.execute(0).await?;

Ok(Box::pin(SortStream::new(input, self.expr.clone())))
Ok(Box::pin(SortStream::new(
input,
self.expr.clone(),
self.output_rows.clone(),
self.sort_time_nanos.clone(),
)))
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert(
"outputRows".to_owned(),
self.output_rows.lock().unwrap().clone(),
);
metrics.insert(
"sortTime".to_owned(),
self.sort_time_nanos.lock().unwrap().clone(),
);
metrics
}
}

Expand Down Expand Up @@ -194,11 +224,17 @@ pin_project! {
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
finished: bool,
schema: SchemaRef,
output_rows: Arc<Mutex<SQLMetric>>,
}
}

impl SortStream {
fn new(input: SendableRecordBatchStream, expr: Vec<PhysicalSortExpr>) -> Self {
fn new(
input: SendableRecordBatchStream,
expr: Vec<PhysicalSortExpr>,
output_rows: Arc<Mutex<SQLMetric>>,
sort_time: Arc<Mutex<SQLMetric>>,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();

let schema = input.schema();
Expand All @@ -207,7 +243,13 @@ impl SortStream {
let sorted_batch = common::collect(input)
.await
.map_err(DataFusionError::into_arrow_external_error)
.and_then(move |batches| sort_batches(&batches, &schema, &expr));
.and_then(move |batches| {
let now = Instant::now();
let result = sort_batches(&batches, &schema, &expr);
let mut sort_time = sort_time.lock().unwrap();
sort_time.add(now.elapsed().as_nanos() as usize);
result
});

tx.send(sorted_batch)
});
Expand All @@ -216,6 +258,7 @@ impl SortStream {
output: rx,
finished: false,
schema,
output_rows,
}
}
}
Expand All @@ -224,6 +267,8 @@ impl Stream for SortStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let output_rows = self.output_rows.clone();

if self.finished {
return Poll::Ready(None);
}
Expand All @@ -241,6 +286,12 @@ impl Stream for SortStream {
Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
Ok(result) => result.transpose(),
};

if let Some(Ok(batch)) = &result {
let mut output_rows = output_rows.lock().unwrap();
output_rows.add(batch.num_rows());
}

Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
Expand Down Expand Up @@ -379,7 +430,9 @@ mod tests {
assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());

let result: Vec<RecordBatch> = collect(sort_exec).await?;
let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
assert!(sort_exec.metrics().get("sortTime").unwrap().value > 0);
assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value, 8);
assert_eq!(result.len(), 1);

let columns = result[0].columns();
Expand Down

0 comments on commit 9a4ef46

Please sign in to comment.