Skip to content

Commit

Permalink
Merge branch 'main' into dashboard
Browse files Browse the repository at this point in the history
  • Loading branch information
raunakab committed Feb 11, 2025
2 parents 8f931fa + e5ff39f commit 3580862
Show file tree
Hide file tree
Showing 29 changed files with 1,019 additions and 50 deletions.
3 changes: 3 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,7 @@ class PyExpr:
def bool_or(self) -> PyExpr: ...
def any_value(self, ignore_nulls: bool) -> PyExpr: ...
def agg_list(self) -> PyExpr: ...
def agg_set(self) -> PyExpr: ...
def agg_concat(self) -> PyExpr: ...
def __add__(self, other: PyExpr) -> PyExpr: ...
def __sub__(self, other: PyExpr) -> PyExpr: ...
Expand Down Expand Up @@ -1165,6 +1166,7 @@ def dt_truncate(expr: PyExpr, interval: str, relative_to: PyExpr) -> PyExpr: ...
# ---
def explode(expr: PyExpr) -> PyExpr: ...
def list_sort(expr: PyExpr, desc: PyExpr, nulls_first: PyExpr) -> PyExpr: ...
def list_distinct(expr: PyExpr) -> PyExpr: ...
def list_value_counts(expr: PyExpr) -> PyExpr: ...
def list_join(expr: PyExpr, delimiter: PyExpr) -> PyExpr: ...
def list_count(expr: PyExpr, mode: CountMode) -> PyExpr: ...
Expand Down Expand Up @@ -1277,6 +1279,7 @@ class PySeries:
def min(self) -> PySeries: ...
def max(self) -> PySeries: ...
def agg_list(self) -> PySeries: ...
def agg_set(self) -> PySeries: ...
def cast(self, dtype: PyDataType) -> PySeries: ...
def ceil(self) -> PySeries: ...
def floor(self) -> PySeries: ...
Expand Down
25 changes: 25 additions & 0 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2377,6 +2377,8 @@ def _map_agg_string_to_expr(self, expr: Expression, op: str) -> Expression:
return expr.any_value()
elif op == "list":
return expr.agg_list()
elif op == "set":
return expr.agg_set()
elif op == "concat":
return expr.agg_concat()

Expand Down Expand Up @@ -2560,6 +2562,18 @@ def agg_list(self, *cols: ColumnInputType) -> "DataFrame":
"""
return self._apply_agg_fn(Expression.agg_list, cols)

@DataframePublicAPI
def agg_set(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs a global set agg on the DataFrame (ignoring nulls).
Args:
*cols (Union[str, Expression]): columns to form into a set
Returns:
DataFrame: Globally aggregated set. Should be a single row.
"""
return self._apply_agg_fn(Expression.agg_set, cols)

@DataframePublicAPI
def agg_concat(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs a global list concatenation agg on the DataFrame.
Expand Down Expand Up @@ -3385,6 +3399,17 @@ def agg_list(self, *cols: ColumnInputType) -> "DataFrame":
"""
return self.df._apply_agg_fn(Expression.agg_list, cols, self.group_by)

def agg_set(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs grouped set on this GroupedDataFrame (ignoring nulls).
Args:
*cols (Union[str, Expression]): columns to form into a set
Returns:
DataFrame: DataFrame with grouped set per column.
"""
return self.df._apply_agg_fn(Expression.agg_set, cols, self.group_by)

def agg_concat(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs grouped concat on this GroupedDataFrame.
Expand Down
84 changes: 84 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from daft.daft import date_lit as _date_lit
from daft.daft import decimal_lit as _decimal_lit
from daft.daft import duration_lit as _duration_lit
from daft.daft import list_distinct as _list_distinct
from daft.daft import list_sort as _list_sort
from daft.daft import lit as _lit
from daft.daft import series_lit as _series_lit
Expand Down Expand Up @@ -1088,6 +1089,43 @@ def agg_list(self) -> Expression:
expr = self._expr.agg_list()
return Expression._from_pyexpr(expr)

def agg_set(self) -> Expression:
"""Aggregates the values in the expression into a set (ignoring nulls).
Example:
>>> import daft
>>> df = daft.from_pydict({"values": [1, 1, None, 2, 2, None]})
>>> df.agg(df["values"].agg_set().alias("unique_values")).show()
╭───────────────╮
│ unique_values │
│ --- │
│ List[Int64] │
╞═══════════════╡
│ [1, 2] │
╰───────────────╯
<BLANKLINE>
(Showing first 1 of 1 rows)
Note that null values are ignored by default:
>>> df = daft.from_pydict({"values": [None, None, None]})
>>> df.agg(df["values"].agg_set().alias("unique_values")).show()
╭───────────────╮
│ unique_values │
│ --- │
│ List[Null] │
╞═══════════════╡
│ [] │
╰───────────────╯
<BLANKLINE>
(Showing first 1 of 1 rows)
Returns:
Expression: A List expression containing the unique values from the input
"""
expr = self._expr.agg_set()
return Expression._from_pyexpr(expr)

def agg_concat(self) -> Expression:
"""Aggregates the values in the expression into a single string by concatenating them."""
expr = self._expr.agg_concat()
Expand Down Expand Up @@ -3397,6 +3435,52 @@ def sort(self, desc: bool | Expression = False, nulls_first: bool | Expression |
nulls_first = Expression._to_expression(nulls_first)
return Expression._from_pyexpr(_list_sort(self._expr, desc._expr, nulls_first._expr))

def distinct(self) -> Expression:
"""Returns a list of unique elements in each list, preserving order of first occurrence and ignoring nulls.
Example:
>>> import daft
>>> df = daft.from_pydict({"a": [[1, 2, 2, 3], [4, 4, 6, 2], [6, 7, 1], [None, 1, None, 1]]})
>>> df.select(df["a"].list.distinct()).show()
╭─────────────╮
│ a │
│ --- │
│ List[Int64] │
╞═════════════╡
│ [1, 2, 3] │
├╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ [4, 6, 2] │
├╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ [6, 7, 1] │
├╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ [1] │
╰─────────────╯
<BLANKLINE>
(Showing first 4 of 4 rows)
Note that null values are ignored:
>>> df = daft.from_pydict({"a": [[None, None], [1, None, 1], [None]]})
>>> df.select(df["a"].list.distinct()).show()
╭─────────────╮
│ a │
│ --- │
│ List[Int64] │
╞═════════════╡
│ [] │
├╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ [1] │
├╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ [] │
╰─────────────╯
<BLANKLINE>
(Showing first 3 of 3 rows)
Returns:
Expression: An expression with lists containing only unique elements
"""
return Expression._from_pyexpr(_list_distinct(self._expr))


class ExpressionStructNamespace(ExpressionNamespace):
def get(self, name: str) -> Expression:
Expand Down
3 changes: 3 additions & 0 deletions docs/sphinx/source/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,15 @@ The following can be used with DataFrame.agg or GroupedDataFrame.agg
Expression.bool_and
Expression.bool_or
Expression.count
Expression.count_distinct
Expression.sum
Expression.mean
Expression.stddev
Expression.min
Expression.max
Expression.any_value
Expression.agg_list
Expression.agg_set
Expression.agg_concat
Expression.approx_percentiles
Expression.approx_count_distinct
Expand Down Expand Up @@ -248,6 +250,7 @@ List
Expression.list.slice
Expression.list.sort
Expression.list.sum
Expression.list.distinct
Expression.list.value_counts

Struct
Expand Down
6 changes: 6 additions & 0 deletions src/daft-core/src/array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ pub trait DaftHllMergeAggable {
fn grouped_hll_merge(&self, groups: &GroupIndices) -> Self::Output;
}

pub trait DaftSetAggable {
type Output;
fn set(&self) -> Self::Output;
fn grouped_set(&self, groups: &GroupIndices) -> Self::Output;
}

pub trait DaftBoolAggable {
type Output;
fn bool_and(&self) -> Self::Output;
Expand Down
4 changes: 4 additions & 0 deletions src/daft-core/src/python/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ impl PySeries {
Ok((self.series).agg_list(None)?.into())
}

pub fn agg_set(&self) -> PyResult<Self> {
Ok((self.series).agg_set(None)?.into())
}

pub fn cast(&self, dtype: PyDataType) -> PyResult<Self> {
Ok(self.series.cast(&dtype.into())?.into())
}
Expand Down
14 changes: 13 additions & 1 deletion src/daft-core/src/series/array_impl/data_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::{ArrayWrapper, IntoSeries, Series};
use crate::datatypes::PythonArray;
use crate::{
array::{
ops::{broadcast::Broadcastable, DaftListAggable, GroupIndices},
ops::{broadcast::Broadcastable, DaftListAggable, DaftSetAggable, GroupIndices},
prelude::*,
DataArray,
},
Expand Down Expand Up @@ -159,6 +159,18 @@ macro_rules! impl_series_like_for_data_array {
None => Ok(self.0.list()?.into_series()),
}
}

fn agg_set(&self, groups: Option<&GroupIndices>) -> DaftResult<Series> {
match groups {
Some(groups) => self
.0
.clone()
.into_series()
.grouped_set(groups)
.map(|x| x.into_series()),
None => self.0.clone().into_series().set().map(|x| x.into_series()),
}
}
}
};
}
Expand Down
16 changes: 16 additions & 0 deletions src/daft-core/src/series/array_impl/logical_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,22 @@ macro_rules! impl_series_like_for_logical_array {
)
.into_series())
}

fn agg_set(&self, groups: Option<&GroupIndices>) -> DaftResult<Series> {
use crate::array::{ops::DaftSetAggable, ListArray};
let data_array = match groups {
Some(groups) => self.0.physical.clone().into_series().grouped_set(groups)?,
None => self.0.physical.clone().into_series().set()?,
};
let new_field = self.field().to_list_field()?;
Ok(ListArray::new(
new_field,
data_array.flat_child.cast(self.data_type())?,
data_array.offsets().clone(),
data_array.validity().cloned(),
)
.into_series())
}
}
};
}
Expand Down
14 changes: 13 additions & 1 deletion src/daft-core/src/series/array_impl/nested_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use common_error::{DaftError, DaftResult};
use super::ArrayWrapper;
use crate::{
array::{
ops::{broadcast::Broadcastable, DaftIsNull, DaftNotNull, GroupIndices},
ops::{broadcast::Broadcastable, DaftIsNull, DaftNotNull, DaftSetAggable, GroupIndices},
FixedSizeListArray, ListArray, StructArray,
},
datatypes::{BooleanArray, DataType, Field},
Expand Down Expand Up @@ -70,6 +70,18 @@ macro_rules! impl_series_like_for_nested_arrays {
}
}

fn agg_set(&self, groups: Option<&GroupIndices>) -> DaftResult<Series> {
match groups {
Some(groups) => self
.0
.clone()
.into_series()
.grouped_set(groups)
.map(|x| x.into_series()),
None => self.0.clone().into_series().set().map(|x| x.into_series()),
}
}

fn broadcast(&self, num: usize) -> DaftResult<Series> {
Ok(self.0.broadcast(num)?.into_series())
}
Expand Down
24 changes: 10 additions & 14 deletions src/daft-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@ mod ops;
mod serdes;
mod series_like;
mod utils;
use std::{
collections::{hash_map::RawEntryMut, HashMap},
ops::Sub,
sync::Arc,
};
use std::{ops::Sub, sync::Arc};

pub use array_impl::IntoSeries;
use common_display::table_display::{make_comfy_table, StrValue};
use common_error::DaftResult;
use derive_more::Display;
use indexmap::{map::RawEntryApiV1, IndexMap};
pub use ops::cast_series_to_supertype;

pub(crate) use self::series_like::SeriesLike;
Expand Down Expand Up @@ -52,18 +49,18 @@ impl Series {
/// Its length can also be used to determine the *exact* number of unique elements in this [`Series`].
///
/// # Note
/// 1. This function returns a `HashMap<X, ()>` rather than a `HashSet<X>`. These two types are functionally equivalent.
/// 1. This function returns an `IndexMap<X, ()>` rather than a `HashSet<X>`. These two types are functionally equivalent.
///
/// 2. `NULL`s are *not* inserted into the returned hashset. They won't be counted towards the final number of unique elements.
pub fn build_probe_table_without_nulls(
&self,
) -> DaftResult<HashMap<IndexHash, (), IdentityBuildHasher>> {
) -> DaftResult<IndexMap<IndexHash, (), IdentityBuildHasher>> {
// Building a comparator function over a series of type `NULL` will result in a failure.
// (I.e., `let comparator = build_is_equal(..)` will fail).
//
// Therefore, exit early with an empty hashmap.
if matches!(self.data_type(), DataType::Null) {
return Ok(HashMap::default());
return Ok(IndexMap::default());
};

const DEFAULT_SIZE: usize = 20;
Expand All @@ -72,7 +69,7 @@ impl Series {
let comparator = build_is_equal(&*array, &*array, true, false)?;

let mut probe_table =
HashMap::<IndexHash, (), IdentityBuildHasher>::with_capacity_and_hasher(
IndexMap::<IndexHash, (), IdentityBuildHasher>::with_capacity_and_hasher(
DEFAULT_SIZE,
Default::default(),
);
Expand All @@ -82,19 +79,18 @@ impl Series {
Some(&hash) => hash,
None => continue,
};
let entry = probe_table.raw_entry_mut().from_hash(hash, |other| {
let entry = probe_table.raw_entry_v1().from_hash(hash, |other| {
(hash == other.hash) && comparator(idx, other.idx as _)
});
if let RawEntryMut::Vacant(entry) = entry {
entry.insert_hashed_nocheck(
hash,
if entry.is_none() {
probe_table.insert(
IndexHash {
idx: idx as u64,
hash,
},
(),
);
};
}
}

Ok(probe_table)
Expand Down
Loading

0 comments on commit 3580862

Please sign in to comment.