Skip to content

Commit

Permalink
Merge pull request #197 from Qrlew/implement_distinct
Browse files Browse the repository at this point in the history
Implement distinct
  • Loading branch information
ngrislain authored Nov 28, 2023
2 parents 28688c2 + 351438f commit 8e5e1b5
Show file tree
Hide file tree
Showing 11 changed files with 780 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
## Added
- implemented `DISTINCT`` in aggregations [#197](https://github.com/Qrlew/qrlew/issues/197)
- Implemented math functions: `PI`, `DEGREES`, `TAN`, `RANDOM`, `LOG10`, `LOG2`, `SQUARE` [#196](https://github.com/Qrlew/qrlew/issues/196)

## [0.5.2] - 2023-11-19
Expand Down
126 changes: 125 additions & 1 deletion src/data_type/function.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
borrow::BorrowMut,
cell::RefCell,
cmp, collections,
cmp, collections::{self, HashSet},
convert::{Infallible, TryFrom, TryInto},
error, fmt,
hash::Hasher,
Expand Down Expand Up @@ -1906,6 +1906,21 @@ pub fn mean() -> impl Function {
)
}

/// Mean distinct aggregation
pub fn mean_distinct() -> impl Function {
// Only works on types that can be converted to floats
Aggregate::from(
data_type::Float::full(),
|values| {
let (count, sum) = values.into_iter().collect::<HashSet<_>>().into_iter().fold((0.0, 0.0), |(count, sum), value| {
(count + 1.0, sum + f64::from(value))
});
(sum / count).into()
},
|(intervals, _size)| Ok(intervals.into_interval()),
)
}

/// Aggregate as a list
pub fn list() -> impl Function {
null()
Expand Down Expand Up @@ -1935,6 +1950,30 @@ pub fn count() -> impl Function {
))
}

/// Count distinct aggregation
pub fn count_distinct() -> impl Function {
Polymorphic::from((
// Any implementation
Aggregate::from(
DataType::Any,
|values| (values.iter().cloned().collect::<HashSet<_>>().len()as i64).into(),
|(_dt, size)| Ok(data_type::Integer::from_interval(1, *size.max().unwrap())),
),
// Optional implementation
Aggregate::from(
data_type::Optional::from(DataType::Any),
|values| {
values
.iter()
.filter_map(|value| value.as_ref().and(Some(1)))
.sum::<i64>()
.into()
},
|(_dt, size)| Ok(data_type::Integer::from_interval(0, *size.max().unwrap())),
),
))
}

/// Min aggregation
pub fn min() -> impl Function {
Polymorphic::from((
Expand Down Expand Up @@ -2035,6 +2074,32 @@ pub fn sum() -> impl Function {
))
}

/// Sum distinct aggregation
pub fn sum_distinct() -> impl Function {
Polymorphic::from((
// Integer implementation
Aggregate::from(
data_type::Integer::full(),
|values| values.iter().cloned().collect::<HashSet<_>>().into_iter().map(|f| *f).sum::<i64>().into(),
|(intervals, size)| {
Ok(data_type::Integer::try_from(multiply().super_image(
&DataType::structured_from_data_types([intervals.into(), size.into()]),
)?)?)
},
),
// Float implementation
Aggregate::from(
data_type::Float::full(),
|values| values.iter().cloned().collect::<HashSet<_>>().into_iter().map(|f| *f).sum::<f64>().into(),
|(intervals, size)| {
Ok(data_type::Float::try_from(multiply().super_image(
&DataType::structured_from_data_types([intervals.into(), size.into()]),
)?)?)
},
),
))
}

/// Agg groups aggregation
pub fn agg_groups() -> impl Function {
null()
Expand Down Expand Up @@ -2066,6 +2131,34 @@ pub fn std() -> impl Function {
)
}

/// Standard deviation distinct aggregation
pub fn std_distinct() -> impl Function {
// Only works on types that can be converted to floats
Aggregate::from(
data_type::Float::full(),
|values| {
let (count, sum, sum_2) =
values
.into_iter()
.collect::<HashSet<_>>()
.into_iter()
.fold((0.0, 0.0, 0.0), |(count, sum, sum_2), value| {
let value: f64 = value.into();
(
count + 1.0,
sum + f64::from(value),
sum_2 + (f64::from(value) * f64::from(value)),
)
});
((sum_2 - sum * sum / count) / (count - 1.)).sqrt().into()
},
|(intervals, _size)| match (intervals.min(), intervals.max()) {
(Some(&min), Some(&max)) => Ok(data_type::Float::from_interval(0., (max - min) / 2.)),
_ => Ok(data_type::Float::from_min(0.)),
},
)
}

/// Variance aggregation
pub fn var() -> impl Function {
// Only works on types that can be converted to floats
Expand Down Expand Up @@ -2095,6 +2188,37 @@ pub fn var() -> impl Function {
)
}

/// Variance distinct aggregation
pub fn var_distinct() -> impl Function {
// Only works on types that can be converted to floats
Aggregate::from(
data_type::Float::full(),
|values| {
let (count, sum, sum_2) =
values
.into_iter()
.collect::<HashSet<_>>()
.into_iter()
.fold((0.0, 0.0, 0.0), |(count, sum, sum_2), value| {
let value: f64 = value.into();
(
count + 1.0,
sum + f64::from(value),
sum_2 + (f64::from(value) * f64::from(value)),
)
});
((sum_2 - sum * sum / count) / (count - 1.)).into()
},
|(intervals, _size)| match (intervals.min(), intervals.max()) {
(Some(&min), Some(&max)) => Ok(data_type::Float::from_interval(
0.,
((max - min) / 2.).powi(2),
)),
_ => Ok(data_type::Float::from_min(0.)),
},
)
}

#[cfg(test)]
mod tests {
use super::{
Expand Down
4 changes: 3 additions & 1 deletion src/data_type/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl Variant for Boolean {
impl_variant_conversions!(Boolean);

/// Integer value
#[derive(Clone, Hash, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
#[derive(Clone, Hash, PartialEq, PartialOrd, Debug, Deserialize, Serialize, Eq)]
pub struct Integer(i64);

impl DataTyped for Integer {
Expand Down Expand Up @@ -333,6 +333,8 @@ impl_variant_conversions!(Enum);
#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
pub struct Float(f64);

impl Eq for Float {}

#[allow(clippy::derive_hash_xor_eq)]
impl hash::Hash for Float {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
Expand Down
Loading

0 comments on commit 8e5e1b5

Please sign in to comment.