From 7d05e616309eab4c18d380a915dbe2aa8a180d97 Mon Sep 17 00:00:00 2001 From: Ian Thomas Date: Mon, 12 Jun 2023 11:14:56 +0100 Subject: [PATCH] Use enum for row index column rather than None --- datashader/core.py | 8 +++-- datashader/reductions.py | 75 ++++++++++++++++++++++------------------ 2 files changed, 48 insertions(+), 35 deletions(-) diff --git a/datashader/core.py b/datashader/core.py index 7d6a59ba2..f0914413a 100644 --- a/datashader/core.py +++ b/datashader/core.py @@ -1300,6 +1300,10 @@ def _bypixel_sanitise(source, glyph, agg): def _cols_to_keep(columns, glyph, agg): + """ + Return which columns from the supplied data source are kept as they are + needed by the specified agg. Excludes any SpecialColumn. + """ cols_to_keep = OrderedDict({col: False for col in columns}) for col in glyph.required_columns(): cols_to_keep[col] = True @@ -1310,9 +1314,9 @@ def recurse(cols_to_keep, agg): recurse(cols_to_keep, subagg) elif hasattr(agg, 'columns'): for column in agg.columns: - if column is not None: + if column not in (None, rd.SpecialColumn.RowIndex): cols_to_keep[column] = True - elif agg.column is not None: + elif agg.column not in (None, rd.SpecialColumn.RowIndex): cols_to_keep[agg.column] = True recurse(cols_to_keep, agg) diff --git a/datashader/reductions.py b/datashader/reductions.py index 9d07636c1..94f44f35e 100644 --- a/datashader/reductions.py +++ b/datashader/reductions.py @@ -1,5 +1,6 @@ from __future__ import annotations import copy +from enum import Enum from packaging.version import Version import numpy as np from datashape import dshape, isnumeric, Record, Option @@ -33,9 +34,21 @@ ) +class SpecialColumn(Enum): + """ + Internally datashader identifies the columns required by the user's + Reductions and extracts them from the supplied source (e.g. DataFrame) to + pass through the dynamically-generated append function in compiler.py and + end up as arguments to the Reduction._append* functions. Each column is + a string name or a SpecialColumn. A column of None is used in Reduction + classes to denote that no column is required. + """ + RowIndex = 1 + + class Preprocess(Expr): """Base clase for preprocessing steps.""" - def __init__(self, column): + def __init__(self, column: str | SpecialColumn | None): self.column = column @property @@ -50,13 +63,12 @@ def nan_check_column(self): class extract(Preprocess): """Extract a column from a dataframe as a numpy array of values.""" def apply(self, df): - if self.column is None: - # self.column of None means use virtual row index column. + if self.column is SpecialColumn.RowIndex: attrs = getattr(df, "attrs", None) row_offset = getattr(attrs or df, "_datashader_row_offset", 0) if cudf and isinstance(df, cudf.DataFrame): - if self.column is None: + if self.column is SpecialColumn.RowIndex: return cp.arange(row_offset, row_offset+len(df), dtype=np.int64) if df[self.column].dtype.kind == 'f': @@ -69,7 +81,7 @@ def apply(self, df): elif isinstance(df, xr.Dataset): # DataArray could be backed by numpy or cupy array return df[self.column].data - elif self.column is None: + elif self.column is SpecialColumn.RowIndex: return np.arange(row_offset, row_offset+len(df), dtype=np.int64) else: return df[self.column].values @@ -237,24 +249,31 @@ def apply(self, df): a = self.categorizer.apply(df) if cudf and isinstance(df, cudf.DataFrame): import cupy - if df[self.column].dtype.kind == 'f': + if self.column == SpecialColumn.RowIndex: + nullval = -1 + elif df[self.column].dtype.kind == 'f': nullval = np.nan else: nullval = 0 a = cupy.asarray(a) - if Version(cudf.__version__) >= Version("22.02"): + if self.column == SpecialColumn.RowIndex: + b = extract(SpecialColumn.RowIndex).apply(df) + elif Version(cudf.__version__) >= Version("22.02"): b = df[self.column].to_cupy(na_value=nullval) else: b = cupy.asarray(df[self.column].fillna(nullval)) return cupy.stack((a, b), axis=-1) else: - b = df[self.column].values + if self.column == SpecialColumn.RowIndex: + b = extract(SpecialColumn.RowIndex).apply(df) + else: + b = df[self.column].values return np.stack((a, b), axis=-1) class Reduction(Expr): """Base class for per-bin reductions.""" - def __init__(self, column=None): + def __init__(self, column: str | SpecialColumn | None=None): self.column = column self._nan_check_column = None @@ -293,6 +312,8 @@ def uses_row_index(self, cuda, partitioned): return False def validate(self, in_dshape): + if self.column == SpecialColumn.RowIndex: + return if not self.column in in_dshape.dict: raise ValueError("specified column not found") if not isnumeric(in_dshape.measure[self.column]): @@ -653,12 +674,7 @@ def out_dshape(self, input_dshape, antialias, cuda, partitioned): @property def inputs(self): - ret = (self.preprocess, ) - # val_column of None here could mean None (e.g. by(any) or could be row index column. - if self.val_column is None and isinstance(self.reduction, (_max_or_min_row_index, _max_n_or_min_n_row_index)): - # Row index column - ret += self.reduction.inputs - return ret + return (self.preprocess,) def uses_cuda_mutex(self): return self.reduction.uses_cuda_mutex() @@ -1641,7 +1657,7 @@ def __init__(self, selector: Reduction, lookup_column: str | None=None): raise TypeError( "selector can only be a first, first_n, last, last_n, " "max, max_n, min or min_n reduction") - super().__init__(lookup_column) + super().__init__(SpecialColumn.RowIndex if lookup_column is None else lookup_column) self.selector = selector # List of all column names that this reduction uses. self.columns = (selector.column, lookup_column) @@ -1650,7 +1666,7 @@ def __hash__(self): return hash((type(self), self._hashable_inputs(), self.selector)) def out_dshape(self, input_dshape, antialias, cuda, partitioned): - if self.column is None: + if self.column == SpecialColumn.RowIndex: return dshape(ct.int64) else: return dshape(ct.float64) @@ -1659,18 +1675,18 @@ def uses_cuda_mutex(self): return True def uses_row_index(self, cuda, partitioned): - return self.column is None or self.selector.uses_row_index(cuda, partitioned) + return self.column == SpecialColumn.RowIndex or self.selector.uses_row_index(cuda, partitioned) def validate(self, in_dshape): - if self.column is not None: + if self.column != SpecialColumn.RowIndex: super().validate(in_dshape) self.selector.validate(in_dshape) - if self.column is not None and self.column == self.selector.column: + if self.column != SpecialColumn.RowIndex and self.column == self.selector.column: raise ValueError("where and its contained reduction cannot use the same column") def _antialias_stage_2(self, self_intersect, array_module): ret = self.selector._antialias_stage_2(self_intersect, array_module) - if self.column is None: + if self.column == SpecialColumn.RowIndex: # Override antialiased zero value when returning integer row index. ret = ((ret[0][0], -1),) return ret @@ -1734,7 +1750,7 @@ def _append_cuda(x, y, agg, field, update_index): return update_index def _build_append(self, dshape, schema, cuda, antialias, self_intersect): - # If self.column is None then append function is still passed a + # If self.column is SpecialColumn.RowIndex then append function is passed a # 'field' argument which is the row index. if cuda: if antialias: @@ -1752,7 +1768,7 @@ def _build_bases(self, cuda, partitioned): if isinstance(selector, (_first_or_last, _first_n_or_last_n)) and selector.uses_row_index(cuda, partitioned): # Need to swap out the selector with an equivalent row index selector row_index_selector = selector._create_row_index_selector() - if self.column is None: + if self.column == SpecialColumn.RowIndex: # If selector uses a row index and this where returns the same row index, # can just swap out this where reduction with the row_index_selector. row_index_selector._nan_check_column = self.selector.column @@ -1934,9 +1950,8 @@ def inputs(self): class _max_or_min_row_index(OptionalFieldReduction): """Abstract base class of max and min row_index reductions. """ - @property - def inputs(self): - return (extract(None),) # row index column + def __init__(self): + super().__init__(column=SpecialColumn.RowIndex) def out_dshape(self, in_dshape, antialias, cuda, partitioned): return dshape(ct.int64) @@ -1945,7 +1960,6 @@ def uses_row_index(self, cuda, partitioned): return True def _build_append(self, dshape, schema, cuda, antialias, self_intersect): - # self.column is None but row index is passed as field argument to append functions # Doesn't yet support antialiasing if cuda: return self._append_cuda @@ -2053,13 +2067,9 @@ class _max_n_or_min_n_row_index(FloatingNReduction): """Abstract base class of max_n and min_n row_index reductions. """ def __init__(self, n=1): - super().__init__(column=None) + super().__init__(column=SpecialColumn.RowIndex) self.n = n if n >= 1 else 1 - @property - def inputs(self): - return (extract(None),) # row index column - def out_dshape(self, in_dshape, antialias, cuda, partitioned): return dshape(ct.int64) @@ -2070,7 +2080,6 @@ def uses_row_index(self, cuda, partitioned): return True def _build_append(self, dshape, schema, cuda, antialias, self_intersect): - # self.column is None but row index is passed as field argument to append functions # Doesn't yet support antialiasing if cuda: return self._append_cuda