Skip to content

Commit

Permalink
minor: Add comments for GroupedHashAggregateStream struct (#12127)
Browse files Browse the repository at this point in the history
* Add doc for Aggr struct

* Fix format
  • Loading branch information
2010YOUY01 authored Aug 24, 2024
1 parent 6f18304 commit 2b3d456
Showing 1 changed file with 83 additions and 41 deletions.
124 changes: 83 additions & 41 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,35 +76,45 @@ use super::AggregateExec;

/// This encapsulates the spilling state
struct SpillState {
/// If data has previously been spilled, the locations of the
/// spill files (in Arrow IPC format)
spills: Vec<RefCountedTempFile>,

// ========================================================================
// PROPERTIES:
// These fields are initialized at the start and remain constant throughout
// the execution.
// ========================================================================
/// Sorting expression for spilling batches
spill_expr: Vec<PhysicalSortExpr>,

/// Schema for spilling batches
spill_schema: SchemaRef,

/// true when streaming merge is in progress
is_stream_merging: bool,

/// aggregate_arguments for merging spilled data
merging_aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,

/// GROUP BY expressions for merging spilled data
merging_group_by: PhysicalGroupBy,

// ========================================================================
// STATES:
// Fields changes during execution. Can be buffer, or state flags that
// influence the exeuction in parent `GroupedHashAggregateStream`
// ========================================================================
/// If data has previously been spilled, the locations of the
/// spill files (in Arrow IPC format)
spills: Vec<RefCountedTempFile>,

/// true when streaming merge is in progress
is_stream_merging: bool,
}

/// Tracks if the aggregate should skip partial aggregations
///
/// See "partial aggregation" discussion on [`GroupedHashAggregateStream`]
struct SkipAggregationProbe {
/// Number of processed input rows (updated during probing)
input_rows: usize,
/// Number of total group values for `input_rows` (updated during probing)
num_groups: usize,

// ========================================================================
// PROPERTIES:
// These fields are initialized at the start and remain constant throughout
// the execution.
// ========================================================================
/// Aggregation ratio check performed when the number of input rows exceeds
/// this threshold (from `SessionConfig`)
probe_rows_threshold: usize,
Expand All @@ -113,6 +123,16 @@ struct SkipAggregationProbe {
/// is skipped and input rows are directly converted to output
probe_ratio_threshold: f64,

// ========================================================================
// STATES:
// Fields changes during execution. Can be buffer, or state flags that
// influence the exeuction in parent `GroupedHashAggregateStream`
// ========================================================================
/// Number of processed input rows (updated during probing)
input_rows: usize,
/// Number of total group values for `input_rows` (updated during probing)
num_groups: usize,

/// Flag indicating further data aggregation may be skipped (decision made
/// when probing complete)
should_skip: bool,
Expand Down Expand Up @@ -316,17 +336,15 @@ impl SkipAggregationProbe {
/// └─────────────────┘ └─────────────────┘
/// ```
pub(crate) struct GroupedHashAggregateStream {
// ========================================================================
// PROPERTIES:
// These fields are initialized at the start and remain constant throughout
// the execution.
// ========================================================================
schema: SchemaRef,
input: SendableRecordBatchStream,
mode: AggregateMode,

/// Accumulators, one for each `AggregateExpr` in the query
///
/// For example, if the query has aggregates, `SUM(x)`,
/// `COUNT(y)`, there will be two accumulators, each one
/// specialized for that particular aggregate and its input types
accumulators: Vec<Box<dyn GroupsAccumulator>>,

/// Arguments to pass to each accumulator.
///
/// The arguments in `accumulator[i]` is passed `aggregate_arguments[i]`
Expand All @@ -347,48 +365,72 @@ pub(crate) struct GroupedHashAggregateStream {
/// GROUP BY expressions
group_by: PhysicalGroupBy,

/// The memory reservation for this grouping
reservation: MemoryReservation,
/// max rows in output RecordBatches
batch_size: usize,

/// Optional soft limit on the number of `group_values` in a batch
/// If the number of `group_values` in a single batch exceeds this value,
/// the `GroupedHashAggregateStream` operation immediately switches to
/// output mode and emits all groups.
group_values_soft_limit: Option<usize>,

// ========================================================================
// STATE FLAGS:
// These fields will be updated during the execution. And control the flow of
// the execution.
// ========================================================================
/// Tracks if this stream is generating input or output
exec_state: ExecutionState,

/// Have we seen the end of the input
input_done: bool,

// ========================================================================
// STATE BUFFERS:
// These fields will accumulate intermediate results during the execution.
// ========================================================================
/// An interning store of group keys
group_values: Box<dyn GroupValues>,

/// scratch space for the current input [`RecordBatch`] being
/// processed. Reused across batches here to avoid reallocations
current_group_indices: Vec<usize>,

/// Tracks if this stream is generating input or output
exec_state: ExecutionState,

/// Execution metrics
baseline_metrics: BaselineMetrics,

/// max rows in output RecordBatches
batch_size: usize,
/// Accumulators, one for each `AggregateExpr` in the query
///
/// For example, if the query has aggregates, `SUM(x)`,
/// `COUNT(y)`, there will be two accumulators, each one
/// specialized for that particular aggregate and its input types
accumulators: Vec<Box<dyn GroupsAccumulator>>,

// ========================================================================
// TASK-SPECIFIC STATES:
// Inner states groups together properties, states for a specific task.
// ========================================================================
/// Optional ordering information, that might allow groups to be
/// emitted from the hash table prior to seeing the end of the
/// input
group_ordering: GroupOrdering,

/// Have we seen the end of the input
input_done: bool,

/// The [`RuntimeEnv`] associated with the [`TaskContext`] argument
runtime: Arc<RuntimeEnv>,

/// The spill state object
spill_state: SpillState,

/// Optional soft limit on the number of `group_values` in a batch
/// If the number of `group_values` in a single batch exceeds this value,
/// the `GroupedHashAggregateStream` operation immediately switches to
/// output mode and emits all groups.
group_values_soft_limit: Option<usize>,

/// Optional probe for skipping data aggregation, if supported by
/// current stream.
skip_aggregation_probe: Option<SkipAggregationProbe>,

// ========================================================================
// EXECUTION RESOURCES:
// Fields related to managing execution resources and monitoring performance.
// ========================================================================
/// The memory reservation for this grouping
reservation: MemoryReservation,

/// Execution metrics
baseline_metrics: BaselineMetrics,

/// The [`RuntimeEnv`] associated with the [`TaskContext`] argument
runtime: Arc<RuntimeEnv>,
}

impl GroupedHashAggregateStream {
Expand Down

0 comments on commit 2b3d456

Please sign in to comment.