From 2b3d45666701d6bb053494f9f1d4c4b3186368e0 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 24 Aug 2024 17:37:25 +0800 Subject: [PATCH] minor: Add comments for `GroupedHashAggregateStream` struct (#12127) * Add doc for Aggr struct * Fix format --- .../physical-plan/src/aggregates/row_hash.rs | 124 ++++++++++++------ 1 file changed, 83 insertions(+), 41 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index b3221752d034..05f4ec621813 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -76,35 +76,45 @@ use super::AggregateExec; /// This encapsulates the spilling state struct SpillState { - /// If data has previously been spilled, the locations of the - /// spill files (in Arrow IPC format) - spills: Vec, - + // ======================================================================== + // PROPERTIES: + // These fields are initialized at the start and remain constant throughout + // the execution. + // ======================================================================== /// Sorting expression for spilling batches spill_expr: Vec, /// Schema for spilling batches spill_schema: SchemaRef, - /// true when streaming merge is in progress - is_stream_merging: bool, - /// aggregate_arguments for merging spilled data merging_aggregate_arguments: Vec>>, /// GROUP BY expressions for merging spilled data merging_group_by: PhysicalGroupBy, + + // ======================================================================== + // STATES: + // Fields changes during execution. Can be buffer, or state flags that + // influence the exeuction in parent `GroupedHashAggregateStream` + // ======================================================================== + /// If data has previously been spilled, the locations of the + /// spill files (in Arrow IPC format) + spills: Vec, + + /// true when streaming merge is in progress + is_stream_merging: bool, } /// Tracks if the aggregate should skip partial aggregations /// /// See "partial aggregation" discussion on [`GroupedHashAggregateStream`] struct SkipAggregationProbe { - /// Number of processed input rows (updated during probing) - input_rows: usize, - /// Number of total group values for `input_rows` (updated during probing) - num_groups: usize, - + // ======================================================================== + // PROPERTIES: + // These fields are initialized at the start and remain constant throughout + // the execution. + // ======================================================================== /// Aggregation ratio check performed when the number of input rows exceeds /// this threshold (from `SessionConfig`) probe_rows_threshold: usize, @@ -113,6 +123,16 @@ struct SkipAggregationProbe { /// is skipped and input rows are directly converted to output probe_ratio_threshold: f64, + // ======================================================================== + // STATES: + // Fields changes during execution. Can be buffer, or state flags that + // influence the exeuction in parent `GroupedHashAggregateStream` + // ======================================================================== + /// Number of processed input rows (updated during probing) + input_rows: usize, + /// Number of total group values for `input_rows` (updated during probing) + num_groups: usize, + /// Flag indicating further data aggregation may be skipped (decision made /// when probing complete) should_skip: bool, @@ -316,17 +336,15 @@ impl SkipAggregationProbe { /// └─────────────────┘ └─────────────────┘ /// ``` pub(crate) struct GroupedHashAggregateStream { + // ======================================================================== + // PROPERTIES: + // These fields are initialized at the start and remain constant throughout + // the execution. + // ======================================================================== schema: SchemaRef, input: SendableRecordBatchStream, mode: AggregateMode, - /// Accumulators, one for each `AggregateExpr` in the query - /// - /// For example, if the query has aggregates, `SUM(x)`, - /// `COUNT(y)`, there will be two accumulators, each one - /// specialized for that particular aggregate and its input types - accumulators: Vec>, - /// Arguments to pass to each accumulator. /// /// The arguments in `accumulator[i]` is passed `aggregate_arguments[i]` @@ -347,9 +365,30 @@ pub(crate) struct GroupedHashAggregateStream { /// GROUP BY expressions group_by: PhysicalGroupBy, - /// The memory reservation for this grouping - reservation: MemoryReservation, + /// max rows in output RecordBatches + batch_size: usize, + + /// Optional soft limit on the number of `group_values` in a batch + /// If the number of `group_values` in a single batch exceeds this value, + /// the `GroupedHashAggregateStream` operation immediately switches to + /// output mode and emits all groups. + group_values_soft_limit: Option, + // ======================================================================== + // STATE FLAGS: + // These fields will be updated during the execution. And control the flow of + // the execution. + // ======================================================================== + /// Tracks if this stream is generating input or output + exec_state: ExecutionState, + + /// Have we seen the end of the input + input_done: bool, + + // ======================================================================== + // STATE BUFFERS: + // These fields will accumulate intermediate results during the execution. + // ======================================================================== /// An interning store of group keys group_values: Box, @@ -357,38 +396,41 @@ pub(crate) struct GroupedHashAggregateStream { /// processed. Reused across batches here to avoid reallocations current_group_indices: Vec, - /// Tracks if this stream is generating input or output - exec_state: ExecutionState, - - /// Execution metrics - baseline_metrics: BaselineMetrics, - - /// max rows in output RecordBatches - batch_size: usize, + /// Accumulators, one for each `AggregateExpr` in the query + /// + /// For example, if the query has aggregates, `SUM(x)`, + /// `COUNT(y)`, there will be two accumulators, each one + /// specialized for that particular aggregate and its input types + accumulators: Vec>, + // ======================================================================== + // TASK-SPECIFIC STATES: + // Inner states groups together properties, states for a specific task. + // ======================================================================== /// Optional ordering information, that might allow groups to be /// emitted from the hash table prior to seeing the end of the /// input group_ordering: GroupOrdering, - /// Have we seen the end of the input - input_done: bool, - - /// The [`RuntimeEnv`] associated with the [`TaskContext`] argument - runtime: Arc, - /// The spill state object spill_state: SpillState, - /// Optional soft limit on the number of `group_values` in a batch - /// If the number of `group_values` in a single batch exceeds this value, - /// the `GroupedHashAggregateStream` operation immediately switches to - /// output mode and emits all groups. - group_values_soft_limit: Option, - /// Optional probe for skipping data aggregation, if supported by /// current stream. skip_aggregation_probe: Option, + + // ======================================================================== + // EXECUTION RESOURCES: + // Fields related to managing execution resources and monitoring performance. + // ======================================================================== + /// The memory reservation for this grouping + reservation: MemoryReservation, + + /// Execution metrics + baseline_metrics: BaselineMetrics, + + /// The [`RuntimeEnv`] associated with the [`TaskContext`] argument + runtime: Arc, } impl GroupedHashAggregateStream {