Skip to content

Commit

Permalink
Improve performance 10%-100% in FIRST_VALUE / LAST_VALUE by not s…
Browse files Browse the repository at this point in the history
…ort rows in `FirstValueAccumulator` (apache#14402)

* Switch to `LexicographicalComparator`

* Take `ignore_nulls` out of filter
  • Loading branch information
blaginin authored Feb 5, 2025
1 parent b80080e commit e8d9b62
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 56 deletions.
33 changes: 33 additions & 0 deletions datafusion/core/benches/aggregate_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,39 @@ fn criterion_benchmark(c: &mut Criterion) {
)
})
});

c.bench_function("first_last_many_columns", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\
last_value(u64_wide order by f64, u64_narrow, utf8) \
FROM t GROUP BY u64_narrow",
)
})
});

c.bench_function("first_last_ignore_nulls", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT first_value(u64_wide ignore nulls order by f64, u64_narrow, utf8), \
last_value(u64_wide ignore nulls order by f64, u64_narrow, utf8) \
FROM t GROUP BY u64_narrow",
)
})
});

c.bench_function("first_last_one_column", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT first_value(u64_wide order by f64), \
last_value(u64_wide order by f64) \
FROM t GROUP BY u64_narrow",
)
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
94 changes: 39 additions & 55 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::mem::size_of_val;
use std::sync::Arc;

use arrow::array::{ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, take_arrays, SortColumn};
use arrow::compute::{self, LexicographicalComparator, SortColumn};
use arrow::datatypes::{DataType, Field};
use datafusion_common::utils::{compare_rows, get_row_at_idx};
use datafusion_common::{
Expand Down Expand Up @@ -250,6 +250,7 @@ impl FirstValueAccumulator {
return Ok((!value.is_empty()).then_some(0));
}
}

let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
Expand All @@ -259,19 +260,17 @@ impl FirstValueAccumulator {
})
.collect::<Vec<_>>();

if self.ignore_nulls {
let indices = lexsort_to_indices(&sort_columns, None)?;
// If ignoring nulls, find the first non-null value.
for index in indices.iter().flatten() {
if !value.is_null(index as usize) {
return Ok(Some(index as usize));
}
}
Ok(None)
let comparator = LexicographicalComparator::try_new(&sort_columns)?;

let min_index = if self.ignore_nulls {
(0..value.len())
.filter(|&index| !value.is_null(index))
.min_by(|&a, &b| comparator.compare(a, b))
} else {
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
(0..value.len()).min_by(|&a, &b| comparator.compare(a, b))
};

Ok(min_index)
}
}

Expand Down Expand Up @@ -312,22 +311,19 @@ impl Accumulator for FirstValueAccumulator {
// last index contains is_set flag.
let is_set_idx = states.len() - 1;
let flags = states[is_set_idx].as_boolean();
let filtered_states = filter_states_according_to_is_set(states, flags)?;
let filtered_states =
filter_states_according_to_is_set(&states[0..is_set_idx], flags)?;
// 1..is_set_idx range corresponds to ordering section
let sort_cols = convert_to_sort_cols(
let sort_columns = convert_to_sort_cols(
&filtered_states[1..is_set_idx],
self.ordering_req.as_ref(),
);

let ordered_states = if sort_cols.is_empty() {
// When no ordering is given, use the existing state as is:
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
take_arrays(&filtered_states, &indices, None)?
};
if !ordered_states[0].is_empty() {
let first_row = get_row_at_idx(&ordered_states, 0)?;
let comparator = LexicographicalComparator::try_new(&sort_columns)?;
let min = (0..filtered_states[0].len()).min_by(|&a, &b| comparator.compare(a, b));

if let Some(first_idx) = min {
let first_row = get_row_at_idx(&filtered_states, first_idx)?;
// When collecting orderings, we exclude the is_set flag from the state.
let first_ordering = &first_row[1..is_set_idx];
let sort_options = get_sort_options(self.ordering_req.as_ref());
Expand Down Expand Up @@ -559,29 +555,22 @@ impl LastValueAccumulator {
let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
.map(|(values, req)| {
// Take the reverse ordering requirement. This enables us to
// use "fetch = 1" to get the last value.
SortColumn {
values: Arc::clone(values),
options: Some(!req.options),
}
.map(|(values, req)| SortColumn {
values: Arc::clone(values),
options: Some(req.options),
})
.collect::<Vec<_>>();

if self.ignore_nulls {
let indices = lexsort_to_indices(&sort_columns, None)?;
// If ignoring nulls, find the last non-null value.
for index in indices.iter().flatten() {
if !value.is_null(index as usize) {
return Ok(Some(index as usize));
}
}
Ok(None)
let comparator = LexicographicalComparator::try_new(&sort_columns)?;
let max_ind = if self.ignore_nulls {
(0..value.len())
.filter(|&index| !(value.is_null(index)))
.max_by(|&a, &b| comparator.compare(a, b))
} else {
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
(0..value.len()).max_by(|&a, &b| comparator.compare(a, b))
};

Ok(max_ind)
}

fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self {
Expand Down Expand Up @@ -627,24 +616,19 @@ impl Accumulator for LastValueAccumulator {
// last index contains is_set flag.
let is_set_idx = states.len() - 1;
let flags = states[is_set_idx].as_boolean();
let filtered_states = filter_states_according_to_is_set(states, flags)?;
let filtered_states =
filter_states_according_to_is_set(&states[0..is_set_idx], flags)?;
// 1..is_set_idx range corresponds to ordering section
let sort_cols = convert_to_sort_cols(
let sort_columns = convert_to_sort_cols(
&filtered_states[1..is_set_idx],
self.ordering_req.as_ref(),
);

let ordered_states = if sort_cols.is_empty() {
// When no ordering is given, use existing state as is:
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
take_arrays(&filtered_states, &indices, None)?
};
let comparator = LexicographicalComparator::try_new(&sort_columns)?;
let max = (0..filtered_states[0].len()).max_by(|&a, &b| comparator.compare(a, b));

if !ordered_states[0].is_empty() {
let last_idx = ordered_states[0].len() - 1;
let last_row = get_row_at_idx(&ordered_states, last_idx)?;
if let Some(last_idx) = max {
let last_row = get_row_at_idx(&filtered_states, last_idx)?;
// When collecting orderings, we exclude the is_set flag from the state.
let last_ordering = &last_row[1..is_set_idx];
let sort_options = get_sort_options(self.ordering_req.as_ref());
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3003,7 +3003,7 @@ SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
LAST_VALUE(amount ORDER BY ts ASC) AS fv2
FROM sales_global
----
30 100
30 80

# Conversion in between FIRST_VALUE and LAST_VALUE to resolve
# contradictory requirements should work in multi partitions.
Expand Down

0 comments on commit e8d9b62

Please sign in to comment.