Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use enum for row index column rather than None #1234

Merged
merged 1 commit into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions datashader/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,10 @@ def _bypixel_sanitise(source, glyph, agg):


def _cols_to_keep(columns, glyph, agg):
"""
Return which columns from the supplied data source are kept as they are
needed by the specified agg. Excludes any SpecialColumn.
"""
cols_to_keep = OrderedDict({col: False for col in columns})
for col in glyph.required_columns():
cols_to_keep[col] = True
Expand All @@ -1310,9 +1314,9 @@ def recurse(cols_to_keep, agg):
recurse(cols_to_keep, subagg)
elif hasattr(agg, 'columns'):
for column in agg.columns:
if column is not None:
if column not in (None, rd.SpecialColumn.RowIndex):
cols_to_keep[column] = True
elif agg.column is not None:
elif agg.column not in (None, rd.SpecialColumn.RowIndex):
cols_to_keep[agg.column] = True

recurse(cols_to_keep, agg)
Expand Down
75 changes: 42 additions & 33 deletions datashader/reductions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations
import copy
from enum import Enum
from packaging.version import Version
import numpy as np
from datashape import dshape, isnumeric, Record, Option
Expand Down Expand Up @@ -33,9 +34,21 @@
)


class SpecialColumn(Enum):
"""
Internally datashader identifies the columns required by the user's
Reductions and extracts them from the supplied source (e.g. DataFrame) to
pass through the dynamically-generated append function in compiler.py and
end up as arguments to the Reduction._append* functions. Each column is
a string name or a SpecialColumn. A column of None is used in Reduction
classes to denote that no column is required.
"""
RowIndex = 1


class Preprocess(Expr):
"""Base clase for preprocessing steps."""
def __init__(self, column):
def __init__(self, column: str | SpecialColumn | None):
self.column = column

@property
Expand All @@ -50,13 +63,12 @@ def nan_check_column(self):
class extract(Preprocess):
"""Extract a column from a dataframe as a numpy array of values."""
def apply(self, df):
if self.column is None:
# self.column of None means use virtual row index column.
if self.column is SpecialColumn.RowIndex:
attrs = getattr(df, "attrs", None)
row_offset = getattr(attrs or df, "_datashader_row_offset", 0)

if cudf and isinstance(df, cudf.DataFrame):
if self.column is None:
if self.column is SpecialColumn.RowIndex:
return cp.arange(row_offset, row_offset+len(df), dtype=np.int64)

if df[self.column].dtype.kind == 'f':
Expand All @@ -69,7 +81,7 @@ def apply(self, df):
elif isinstance(df, xr.Dataset):
# DataArray could be backed by numpy or cupy array
return df[self.column].data
elif self.column is None:
elif self.column is SpecialColumn.RowIndex:
return np.arange(row_offset, row_offset+len(df), dtype=np.int64)
else:
return df[self.column].values
Expand Down Expand Up @@ -237,24 +249,31 @@ def apply(self, df):
a = self.categorizer.apply(df)
if cudf and isinstance(df, cudf.DataFrame):
import cupy
if df[self.column].dtype.kind == 'f':
if self.column == SpecialColumn.RowIndex:
nullval = -1
elif df[self.column].dtype.kind == 'f':
nullval = np.nan
else:
nullval = 0
a = cupy.asarray(a)
if Version(cudf.__version__) >= Version("22.02"):
if self.column == SpecialColumn.RowIndex:
b = extract(SpecialColumn.RowIndex).apply(df)
elif Version(cudf.__version__) >= Version("22.02"):
b = df[self.column].to_cupy(na_value=nullval)
else:
b = cupy.asarray(df[self.column].fillna(nullval))
return cupy.stack((a, b), axis=-1)
else:
b = df[self.column].values
if self.column == SpecialColumn.RowIndex:
b = extract(SpecialColumn.RowIndex).apply(df)
else:
b = df[self.column].values
return np.stack((a, b), axis=-1)


class Reduction(Expr):
"""Base class for per-bin reductions."""
def __init__(self, column=None):
def __init__(self, column: str | SpecialColumn | None=None):
self.column = column
self._nan_check_column = None

Expand Down Expand Up @@ -293,6 +312,8 @@ def uses_row_index(self, cuda, partitioned):
return False

def validate(self, in_dshape):
if self.column == SpecialColumn.RowIndex:
return
if not self.column in in_dshape.dict:
raise ValueError("specified column not found")
if not isnumeric(in_dshape.measure[self.column]):
Expand Down Expand Up @@ -653,12 +674,7 @@ def out_dshape(self, input_dshape, antialias, cuda, partitioned):

@property
def inputs(self):
ret = (self.preprocess, )
# val_column of None here could mean None (e.g. by(any) or could be row index column.
if self.val_column is None and isinstance(self.reduction, (_max_or_min_row_index, _max_n_or_min_n_row_index)):
# Row index column
ret += self.reduction.inputs
return ret
return (self.preprocess,)

def uses_cuda_mutex(self):
return self.reduction.uses_cuda_mutex()
Expand Down Expand Up @@ -1641,7 +1657,7 @@ def __init__(self, selector: Reduction, lookup_column: str | None=None):
raise TypeError(
"selector can only be a first, first_n, last, last_n, "
"max, max_n, min or min_n reduction")
super().__init__(lookup_column)
super().__init__(SpecialColumn.RowIndex if lookup_column is None else lookup_column)
self.selector = selector
# List of all column names that this reduction uses.
self.columns = (selector.column, lookup_column)
Expand All @@ -1650,7 +1666,7 @@ def __hash__(self):
return hash((type(self), self._hashable_inputs(), self.selector))

def out_dshape(self, input_dshape, antialias, cuda, partitioned):
if self.column is None:
if self.column == SpecialColumn.RowIndex:
return dshape(ct.int64)
else:
return dshape(ct.float64)
Expand All @@ -1659,18 +1675,18 @@ def uses_cuda_mutex(self):
return True

def uses_row_index(self, cuda, partitioned):
return self.column is None or self.selector.uses_row_index(cuda, partitioned)
return self.column == SpecialColumn.RowIndex or self.selector.uses_row_index(cuda, partitioned)

def validate(self, in_dshape):
if self.column is not None:
if self.column != SpecialColumn.RowIndex:
super().validate(in_dshape)
self.selector.validate(in_dshape)
if self.column is not None and self.column == self.selector.column:
if self.column != SpecialColumn.RowIndex and self.column == self.selector.column:
raise ValueError("where and its contained reduction cannot use the same column")

def _antialias_stage_2(self, self_intersect, array_module):
ret = self.selector._antialias_stage_2(self_intersect, array_module)
if self.column is None:
if self.column == SpecialColumn.RowIndex:
# Override antialiased zero value when returning integer row index.
ret = ((ret[0][0], -1),)
return ret
Expand Down Expand Up @@ -1734,7 +1750,7 @@ def _append_cuda(x, y, agg, field, update_index):
return update_index

def _build_append(self, dshape, schema, cuda, antialias, self_intersect):
# If self.column is None then append function is still passed a
# If self.column is SpecialColumn.RowIndex then append function is passed a
# 'field' argument which is the row index.
if cuda:
if antialias:
Expand All @@ -1752,7 +1768,7 @@ def _build_bases(self, cuda, partitioned):
if isinstance(selector, (_first_or_last, _first_n_or_last_n)) and selector.uses_row_index(cuda, partitioned):
# Need to swap out the selector with an equivalent row index selector
row_index_selector = selector._create_row_index_selector()
if self.column is None:
if self.column == SpecialColumn.RowIndex:
# If selector uses a row index and this where returns the same row index,
# can just swap out this where reduction with the row_index_selector.
row_index_selector._nan_check_column = self.selector.column
Expand Down Expand Up @@ -1934,9 +1950,8 @@ def inputs(self):
class _max_or_min_row_index(OptionalFieldReduction):
"""Abstract base class of max and min row_index reductions.
"""
@property
def inputs(self):
return (extract(None),) # row index column
def __init__(self):
super().__init__(column=SpecialColumn.RowIndex)

def out_dshape(self, in_dshape, antialias, cuda, partitioned):
return dshape(ct.int64)
Expand All @@ -1945,7 +1960,6 @@ def uses_row_index(self, cuda, partitioned):
return True

def _build_append(self, dshape, schema, cuda, antialias, self_intersect):
# self.column is None but row index is passed as field argument to append functions
# Doesn't yet support antialiasing
if cuda:
return self._append_cuda
Expand Down Expand Up @@ -2053,13 +2067,9 @@ class _max_n_or_min_n_row_index(FloatingNReduction):
"""Abstract base class of max_n and min_n row_index reductions.
"""
def __init__(self, n=1):
super().__init__(column=None)
super().__init__(column=SpecialColumn.RowIndex)
self.n = n if n >= 1 else 1

@property
def inputs(self):
return (extract(None),) # row index column

def out_dshape(self, in_dshape, antialias, cuda, partitioned):
return dshape(ct.int64)

Expand All @@ -2070,7 +2080,6 @@ def uses_row_index(self, cuda, partitioned):
return True

def _build_append(self, dshape, schema, cuda, antialias, self_intersect):
# self.column is None but row index is passed as field argument to append functions
# Doesn't yet support antialiasing
if cuda:
return self._append_cuda
Expand Down