diff --git a/datashader/compiler.py b/datashader/compiler.py index 78227f29f..dd3a9988a 100644 --- a/datashader/compiler.py +++ b/datashader/compiler.py @@ -6,7 +6,7 @@ import numpy as np import xarray as xr -from .reductions import by, category_codes, summary +from .reductions import by, category_codes, summary, where from .utils import ngjit @@ -76,18 +76,20 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False): self_intersect = False antialias_stage_2 = False - # List of tuples of (append, base, input columns, temps) + # List of tuples of (append, base, input columns, temps, combine temps) calls = [_get_call_tuples(b, d, schema, cuda, antialias, self_intersect) for (b, d) in zip(bases, dshapes)] + # List of unique column names needed cols = list(unique(concat(pluck(2, calls)))) # List of temps needed temps = list(pluck(3, calls)) + combine_temps = list(pluck(4, calls)) create = make_create(bases, dshapes, cuda) info = make_info(cols) append = make_append(bases, cols, calls, glyph, isinstance(agg, by), antialias) - combine = make_combine(bases, dshapes, temps, antialias) + combine = make_combine(bases, dshapes, temps, combine_temps, antialias) finalize = make_finalize(bases, agg, schema, cuda) return create, info, append, combine, finalize, antialias_stage_2 @@ -110,6 +112,7 @@ def _get_call_tuples(base, dshape, schema, cuda, antialias, self_intersect): (base,), # bases base.inputs, # cols base._build_temps(cuda), # temps + base._build_combine_temps(cuda), # combine temps ) @@ -141,7 +144,7 @@ def make_append(bases, cols, calls, glyph, categorical, antialias): else: subscript = None - for func, bases, cols, temps in calls: + for func, bases, cols, temps, _ in calls: local_lk.update(zip(temps, (next(names) for i in temps))) func_name = next(names) namespace[func_name] = func @@ -163,6 +166,13 @@ def make_append(bases, cols, calls, glyph, categorical, antialias): body.append('{0}(x, y, {1})'.format(func_name, ', '.join(args))) + where_reduction = len(bases) == 1 and isinstance(bases[0], where) + if where_reduction: + # where reduction needs access to the return of the contained + # reduction, which is the preceding one here. + body[-2] = 'if ' + body[-2] + ':' + body[-1] = ' ' + body[-1] + body = ['{0} = {1}[y, x]'.format(name, arg_lk[agg]) for agg, name in local_lk.items()] + body @@ -187,14 +197,30 @@ def make_append(bases, cols, calls, glyph, categorical, antialias): return ngjit(namespace['append']) -def make_combine(bases, dshapes, temps, antialias): +def make_combine(bases, dshapes, temps, combine_temps, antialias): arg_lk = dict((k, v) for (v, k) in enumerate(bases)) - calls = [(b._build_combine(d, antialias), [arg_lk[i] for i in (b,) + t]) - for (b, d, t) in zip(bases, dshapes, temps)] + + # where._combine() deals with combine of preceding reduction so exclude + # it from explicit combine calls. + base_is_where = [isinstance(b, where) for b in bases] + next_base_is_where = base_is_where[1:] + [False] + calls = [(None if n else b._build_combine(d, antialias), [arg_lk[i] for i in (b,) + t + ct]) + for (b, d, t, ct, n) in zip(bases, dshapes, temps, combine_temps, next_base_is_where)] def combine(base_tuples): bases = tuple(np.stack(bs) for bs in zip(*base_tuples)) - return tuple(f(*get(inds, bases)) for (f, inds) in calls) + ret = [] + for is_where, (func, inds) in zip(base_is_where, calls): + if func is None: + continue + call = func(*get(inds, bases)) + if is_where: + # Separate aggs of where reduction and its selector, + # selector's goes first to match order of bases. + ret.extend(call[::-1]) + else: + ret.append(call) + return tuple(ret) return combine diff --git a/datashader/core.py b/datashader/core.py index 1a4459c66..060e61c31 100644 --- a/datashader/core.py +++ b/datashader/core.py @@ -442,7 +442,7 @@ def line(self, source, x=None, y=None, agg=None, axis=0, geometry=None, if not isinstance(non_cat_agg, ( rd.any, rd.count, rd.max, rd.min, rd.sum, rd.summary, rd._sum_zero, - rd.first, rd.last, rd.mean + rd.first, rd.last, rd.mean, rd.where )): raise NotImplementedError( f"{type(non_cat_agg)} reduction not implemented for antialiased lines") @@ -1276,6 +1276,7 @@ def _bypixel_sanitise(source, glyph, agg): # by only retaining the necessary columns: # https://github.com/bokeh/datashader/issues/396 # Preserve column ordering without duplicates + cols_to_keep = _cols_to_keep(source.columns, glyph, agg) if len(cols_to_keep) < len(source.columns): # If _sindex is set, ensure it is not dropped diff --git a/datashader/reductions.py b/datashader/reductions.py index 91c18c23d..a1684aaae 100644 --- a/datashader/reductions.py +++ b/datashader/reductions.py @@ -255,7 +255,16 @@ def _antialias_stage_2(self, self_intersect, array_module): def _build_bases(self, cuda=False): return (self,) + def _build_combine_temps(self, cuda=False): + # Temporaries (i.e. not returned to user) that are reductions, the + # aggs of which are passed to the combine() function but not the + # append() functions, as opposed to _build_temps() which are passed + # to both append() and combine(). + return () + def _build_temps(self, cuda=False): + # Temporaries (i.e. not returned to user) that are reductions, the + # aggs of which are passed to both append() and combine() functions. return () def _build_create(self, required_dshape): @@ -340,7 +349,8 @@ def inputs(self): return (extract(self.column),) if self.column is not None else () def validate(self, in_dshape): - pass + if self.column is not None: + super().validate(in_dshape) @staticmethod def _finalize(bases, cuda=False, **kwargs): @@ -408,6 +418,8 @@ def _antialias_stage_2(self, self_intersect, array_module): def _append(x, y, agg, field): if not isnull(field): agg[y, x] += 1 + return True + return False @staticmethod @ngjit @@ -417,6 +429,8 @@ def _append_antialias(x, y, agg, field, aa_factor): agg[y, x] = aa_factor else: agg[y, x] += aa_factor + return True + return False @staticmethod @ngjit @@ -424,11 +438,14 @@ def _append_antialias_not_self_intersect(x, y, agg, field, aa_factor): if not isnull(field): if isnull(agg[y, x]) or aa_factor > agg[y, x]: agg[y, x] = aa_factor + return True + return False @staticmethod @ngjit def _append_no_field(x, y, agg): agg[y, x] += 1 + return True @staticmethod @ngjit @@ -437,39 +454,46 @@ def _append_no_field_antialias(x, y, agg, aa_factor): agg[y, x] = aa_factor else: agg[y, x] += aa_factor + return True @staticmethod @ngjit def _append_no_field_antialias_not_self_intersect(x, y, agg, aa_factor): if isnull(agg[y, x]) or aa_factor > agg[y, x]: agg[y, x] = aa_factor + return True + return False # GPU append functions @staticmethod @nb_cuda.jit(device=True) def _append_antialias_cuda(x, y, agg, field, aa_factor): - cuda_atomic_nanmax(agg, (y, x), field*aa_factor) + value = field*aa_factor + return cuda_atomic_nanmax(agg, (y, x), value) != value @staticmethod @nb_cuda.jit(device=True) def _append_no_field_antialias_cuda_not_self_intersect(x, y, agg, aa_factor): - cuda_atomic_nanmax(agg, (y, x), aa_factor) + return cuda_atomic_nanmax(agg, (y, x), aa_factor) != aa_factor @staticmethod @nb_cuda.jit(device=True) def _append_cuda(x, y, agg, field): if not isnull(field): nb_cuda.atomic.add(agg, (y, x), 1) + return True + return False @staticmethod @nb_cuda.jit(device=True) def _append_no_field_antialias_cuda(x, y, agg, aa_factor): - cuda_atomic_nanmax(agg, (y, x), aa_factor) + return cuda_atomic_nanmax(agg, (y, x), aa_factor) != aa_factor @staticmethod @nb_cuda.jit(device=True) def _append_no_field_cuda(x, y, agg): nb_cuda.atomic.add(agg, (y, x), 1) + return True def _build_combine(self, dshape, antialias): if antialias: @@ -507,6 +531,11 @@ def __init__(self, cat_column, reduction=count()): self.categorizer = category_codes(cat_column) else: raise TypeError("first argument must be a column name or a CategoryPreprocess instance") + + if isinstance(reduction, where): + raise TypeError( + "'by' reduction does not support 'where' reduction for its first argument") + self.column = self.categorizer.column # for backwards compatibility with count_cat self.columns = (self.categorizer.column, getattr(reduction, 'column', None)) self.reduction = reduction @@ -596,6 +625,8 @@ def _antialias_stage_2(self, self_intersect, array_module): def _append(x, y, agg, field): if not isnull(field): agg[y, x] = True + return True + return False @staticmethod @ngjit @@ -603,17 +634,22 @@ def _append_antialias(x, y, agg, field, aa_factor): if not isnull(field): if isnull(agg[y, x]) or aa_factor > agg[y, x]: agg[y, x] = aa_factor + return True + return False @staticmethod @ngjit def _append_no_field(x, y, agg): agg[y, x] = True + return True @staticmethod @ngjit def _append_no_field_antialias(x, y, agg, aa_factor): if isnull(agg[y, x]) or aa_factor > agg[y, x]: agg[y, x] = aa_factor + return True + return False # GPU append functions _append_cuda =_append @@ -706,6 +742,8 @@ def _append(x, y, agg, field): if not isnull(field): # agg[y, x] cannot be null as initialised to zero. agg[y, x] += field + return True + return False @staticmethod @ngjit @@ -714,6 +752,8 @@ def _append_antialias(x, y, agg, field, aa_factor): if not isnull(value): # agg[y, x] cannot be null as initialised to zero. agg[y, x] += value + return True + return False @staticmethod @ngjit @@ -722,6 +762,8 @@ def _append_antialias_not_self_intersect(x, y, agg, field, aa_factor): if not isnull(value) and value > agg[y, x]: # agg[y, x] cannot be null as initialised to zero. agg[y, x] = value + return True + return False # GPU append functions @staticmethod @@ -729,6 +771,8 @@ def _append_antialias_not_self_intersect(x, y, agg, field, aa_factor): def _append_cuda(x, y, agg, field): if not isnull(field): nb_cuda.atomic.add(agg, (y, x), field) + return True + return False @staticmethod def _combine(aggs): @@ -798,6 +842,8 @@ def _append(x, y, agg, field): agg[y, x] = field else: agg[y, x] += field + return True + return False @staticmethod @ngjit @@ -808,6 +854,8 @@ def _append_antialias(x, y, agg, field, aa_factor): agg[y, x] = value else: agg[y, x] += value + return True + return False @staticmethod @ngjit @@ -816,6 +864,8 @@ def _append_antialias_not_self_intersect(x, y, agg, field, aa_factor): if not isnull(value): if isnull(agg[y, x]) or value > agg[y, x]: agg[y, x] = value + return True + return False @staticmethod def _combine(aggs): @@ -865,6 +915,8 @@ def _append(x, y, m2, field, sum, count): u1 = np.float64(sum) / count u = np.float64(sum + field) / (count + 1) m2[y, x] += (field - u1) * (field - u) + return True + return False @staticmethod def _combine(Ms, sums, ns): @@ -892,10 +944,10 @@ def _antialias_stage_2(self, self_intersect, array_module): @staticmethod @ngjit def _append(x, y, agg, field): - if isnull(agg[y, x]): - agg[y, x] = field - elif agg[y, x] > field: + if isnull(agg[y, x]) or agg[y, x] > field: agg[y, x] = field + return True + return False @staticmethod @ngjit @@ -903,12 +955,14 @@ def _append_antialias(x, y, agg, field, aa_factor): value = field*aa_factor if isnull(agg[y, x]) or value > agg[y, x]: agg[y, x] = value + return True + return False # GPU append functions @staticmethod @nb_cuda.jit(device=True) def _append_cuda(x, y, agg, field): - cuda_atomic_nanmin(agg, (y, x), field) + return cuda_atomic_nanmin(agg, (y, x), field) != field @staticmethod def _combine(aggs): @@ -931,10 +985,10 @@ def _antialias_stage_2(self, self_intersect, array_module): @staticmethod @ngjit def _append(x, y, agg, field): - if isnull(agg[y, x]): - agg[y, x] = field - elif agg[y, x] < field: + if isnull(agg[y, x]) or agg[y, x] < field: agg[y, x] = field + return True + return False @staticmethod @ngjit @@ -942,17 +996,20 @@ def _append_antialias(x, y, agg, field, aa_factor): value = field*aa_factor if isnull(agg[y, x]) or value > agg[y, x]: agg[y, x] = value + return True + return False # GPU append functions @staticmethod @nb_cuda.jit(device=True) def _append_antialias_cuda(x, y, agg, field, aa_factor): - cuda_atomic_nanmax(agg, (y, x), field*aa_factor) + value = field*aa_factor + return cuda_atomic_nanmax(agg, (y, x), value) != value @staticmethod @nb_cuda.jit(device=True) def _append_cuda(x, y, agg, field): - cuda_atomic_nanmax(agg, (y, x), field) + return cuda_atomic_nanmax(agg, (y, x), field) != field @staticmethod def _combine(aggs): @@ -1062,6 +1119,8 @@ def _antialias_stage_2(self, self_intersect, array_module): def _append(x, y, agg, field): if not isnull(field) and isnull(agg[y, x]): agg[y, x] = field + return True + return False @staticmethod @ngjit @@ -1069,6 +1128,8 @@ def _append_antialias(x, y, agg, field, aa_factor): value = field*aa_factor if isnull(agg[y, x]) or value > agg[y, x]: agg[y, x] = value + return True + return False @staticmethod def _combine(aggs): @@ -1108,6 +1169,8 @@ def _antialias_stage_2(self, self_intersect, array_module): def _append(x, y, agg, field): if not isnull(field): agg[y, x] = field + return True + return False @staticmethod @ngjit @@ -1115,6 +1178,8 @@ def _append_antialias(x, y, agg, field, aa_factor): value = field*aa_factor if isnull(agg[y, x]) or value > agg[y, x]: agg[y, x] = value + return True + return False @staticmethod def _combine(aggs): @@ -1159,6 +1224,96 @@ def _finalize(bases, **kwargs): raise NotImplementedError("mode is currently implemented only for rasters") +class where(FloatingReduction): + """ + Returns values from a ``lookup_column`` corresponding to a ``selector`` + reduction that is applied to some other column. + + Example + ------- + >>> canvas.line(df, 'x', 'y', agg=ds.where(ds.max("value"), "other")) # doctest: +SKIP + + This returns the values of the "other" column that correspond to the + maximum of the "value" column in each bin. + + Parameters + ---------- + selector: Reduction + Reduction used to select the values of the ``lookup_column`` which are + returned by this ``where`` reduction. + + lookup_column : str + Column containing values that are returned from this ``where`` + reduction. + """ + def __init__(self, selector: Reduction, lookup_column: str): + if not isinstance(selector, (max, min)): + raise TypeError("selector can only be a max or min reduction") + super().__init__(lookup_column) + self.selector = selector + # List of all column names that this reduction uses. + self.columns = (selector.column, lookup_column) + + def __hash__(self): + return hash((type(self), self._hashable_inputs(), self.selector)) + + def out_dshape(self, input_dshape, antialias): + return self.selector.out_dshape(input_dshape, antialias) + + def validate(self, in_dshape): + super().validate(in_dshape) + self.selector.validate(in_dshape) + if self.column is not None and self.column == self.selector.column: + raise ValueError("where and its contained reduction cannot use the same column") + + def _antialias_stage_2(self, self_intersect, array_module): + return self.selector._antialias_stage_2(self_intersect, array_module) + + # CPU append functions + @staticmethod + @ngjit + def _append(x, y, agg, field): + agg[y, x] = field + return True + + @staticmethod + @ngjit + def _append_antialias(x, y, agg, field, aa_factor): + agg[y, x] = field + return True + + def _build_append(self, dshape, schema, cuda, antialias, self_intersect): + if cuda: + raise NotImplementedError("where reduction not supported on CUDA") + return super()._build_append(dshape, schema, cuda, antialias, self_intersect) + + def _build_bases(self, cuda=False): + return self.selector._build_bases(cuda=cuda) + super()._build_bases(cuda=cuda) + + def _build_combine(self, dshape, antialias): + # Does not support categorical reductions or CUDA. + append = self.selector._append + + @ngjit + def combine(aggs, selector_aggs): + if len(aggs) > 1: + ny, nx = aggs[0].shape + for y in range(ny): + for x in range(nx): + value = selector_aggs[1][y, x] + if not isnull(value) and append(x, y, selector_aggs[0], value): + aggs[0][y, x] = aggs[1][y, x] + return aggs[0], selector_aggs[0] + + return combine + + def _build_combine_temps(self, cuda=False): + return (self.selector,) + + @staticmethod + def _finalize(bases, cuda=False, **kwargs): + return xr.DataArray(bases[-1], **kwargs) + class summary(Expr): """A collection of named reductions. diff --git a/datashader/tests/test_dask.py b/datashader/tests/test_dask.py index a2ec02f61..ce15f1dd0 100644 --- a/datashader/tests/test_dask.py +++ b/datashader/tests/test_dask.py @@ -37,6 +37,7 @@ 'i64': np.arange(20, dtype='i8'), 'f32': np.arange(20, dtype='f4'), 'f64': np.arange(20, dtype='f8'), + 'reverse': np.arange(20, 0, -1), 'empty_bin': np.array([0.] * 15 + [np.nan] * 5), 'cat': ['a']*5 + ['b']*5 + ['c']*5 + ['d']*5, 'cat_int': np.array([10]*5 + [11]*5 + [12]*5 + [13]*5)}) @@ -63,7 +64,8 @@ def dask_DataFrame(*args, **kwargs): # GPU testing disabled even though cudf/cupy are available raise ImportError - ddfs = [_ddf, dask_cudf.from_dask_dataframe(_ddf)] + cudf_ddf = dask_cudf.from_dask_dataframe(_ddf) + ddfs = [_ddf, cudf_ddf] def dask_cudf_DataFrame(*args, **kwargs): assert not kwargs.pop("geo", False) @@ -75,6 +77,7 @@ def dask_cudf_DataFrame(*args, **kwargs): DataFrames = [dask_DataFrame, dask_cudf_DataFrame] except ImportError: cudf = cupy = dask_cudf = None + cudf_ddf = None ddfs = [_ddf] DataFrames = [dask_DataFrame] dask_cudf_DataFrame = None @@ -174,6 +177,38 @@ def test_max(ddf): assert_eq_xr(c.points(ddf, 'x', 'y', ds.max('f64')), out) +@pytest.mark.parametrize('ddf', [_ddf]) +@pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) +def test_where_max(ddf, npartitions): + # Important to test with npartitions > 2 to have multiple combination stages. + # Identical results to equivalent pandas test. + ddf = ddf.repartition(npartitions) + out = xr.DataArray([[16, 6], [11, 1]], coords=coords, dims=dims) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.max('i32'), 'reverse')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.max('i64'), 'reverse')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.max('f32'), 'reverse')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.max('f64'), 'reverse')), out) + + +@pytest.mark.parametrize('ddf',[_ddf]) +@pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) +def test_where_min(ddf, npartitions): + # Important to test with npartitions > 2 to have multiple combination stages. + # Identical results to equivalent pandas test. + ddf = ddf.repartition(npartitions) + out = xr.DataArray([[20, 10], [15, 5]], coords=coords, dims=dims) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.min('i32'), 'reverse')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.min('i64'), 'reverse')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.min('f32'), 'reverse')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.min('f64'), 'reverse')), out) + + +@pytest.mark.skipif(not test_gpu, reason="DATASHADER_TEST_GPU not set") +def test_where_cuda(): + with pytest.raises(NotImplementedError, match="where reduction not supported on CUDA"): + c.points(cudf_ddf, 'x', 'y', ds.where(ds.min('i32'), 'reverse')) + + @pytest.mark.parametrize('ddf', ddfs) def test_mean(ddf): out = xr.DataArray( @@ -1470,3 +1505,54 @@ def test_combine_dtype(ddf, reduction, dtype, aa_dtype): def test_log_axis_not_positive(ddf, canvas): with pytest.raises(ValueError, match='Range values must be >0 for logarithmic axes'): canvas.line(ddf, 'x', 'y') + + +@pytest.mark.parametrize('npartitions', [1, 2, 3]) +def test_line_antialias_where(npartitions): + x = np.arange(3) + df = pd.DataFrame(dict( + y0 = [0.0, 0.5, 1.0], + y1 = [1.0, 0.0, 0.5], + y2 = [0.0, 1.0, 0.0], + value = [1.1, 2.2, 3.3], + other = [-9.0, -7.0, -5.0], + )) + ddf = dd.from_pandas(df, npartitions=npartitions) + if ddf.npartitions != npartitions: + pytest.skip("Dask partitioning not as expected") + + cvs = ds.Canvas(plot_width=7, plot_height=5) + + sol_where_max = np.array([ + [-9., -7., -7., -7., -7., -5., -5.], + [-7., -7., -7., -5., -5., -5., -9.], + [-7., -9., -5., -5., -5., -7., nan], + [-5., -5., -5., -5., -9., -7., -7.], + [-5., -5., -9., -9., -9., -7., -7.], + ]) + agg_where_max = cvs.line( + source=ddf, x=x, y=["y0", "y1", "y2"], axis=1, line_width=1.0, + agg=ds.where(ds.max("value"), "other"), + ) + assert_eq_ndarray(agg_where_max.data, sol_where_max) + + sol_where_min = np.array([ + [-9., -9., -7., -7., -7., -9., -9.], + [-9., -9., -7., -7., -7., -9., -9.], + [-7., -9., -9., -5., -9., -9., nan], + [-5., -9., -9., -9., -9., -9., -7.], + [-5., -5., -9., -9., -9., -7., -7.], + ]) + agg_where_min = cvs.line( + source=ddf, x=x, y=["y0", "y1", "y2"], axis=1, line_width=1.0, + agg=ds.where(ds.min("value"), "other"), + ) + # dask solution differs slightly depending on number of partitions. + # Exclude array elements that may differ from comparison. + if npartitions == 2: + sol_where_min[1, 6] = agg_where_min[1, 6] = nan + elif npartitions == 3: + for j, i in ((1, 5), (1, 6), (2, 1)): + sol_where_min[j, i] = agg_where_min[j, i] = nan + + assert_eq_ndarray(agg_where_min.data, sol_where_min) diff --git a/datashader/tests/test_pandas.py b/datashader/tests/test_pandas.py index b545412bc..fe8809d9c 100644 --- a/datashader/tests/test_pandas.py +++ b/datashader/tests/test_pandas.py @@ -21,6 +21,7 @@ 'i64': np.arange(20, dtype='i8'), 'f32': np.arange(20, dtype='f4'), 'f64': np.arange(20, dtype='f8'), + 'reverse': np.arange(20, 0, -1), 'empty_bin': np.array([0.] * 15 + [np.nan] * 5), 'cat': ['a']*5 + ['b']*5 + ['c']*5 + ['d']*5, 'cat_int': np.array([10]*5 + [11]*5 + [12]*5 + [13]*5)}) @@ -207,6 +208,30 @@ def test_max(df): assert_eq_xr(c.points(df, 'x', 'y', ds.max('f64')), out) +@pytest.mark.parametrize('df', dfs_pd) +def test_where_max(df): + out = xr.DataArray([[16, 6], [11, 1]], coords=coords, dims=dims) + assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.max('i32'), 'reverse')), out) + assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.max('i64'), 'reverse')), out) + assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.max('f32'), 'reverse')), out) + assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.max('f64'), 'reverse')), out) + + +@pytest.mark.parametrize('df', dfs_pd) +def test_where_min(df): + out = xr.DataArray([[20, 10], [15, 5]], coords=coords, dims=dims) + assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.min('i32'), 'reverse')), out) + assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.min('i64'), 'reverse')), out) + assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.min('f32'), 'reverse')), out) + assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.min('f64'), 'reverse')), out) + + +@pytest.mark.skipif(not test_gpu, reason="DATASHADER_TEST_GPU not set") +def test_where_cuda(): + with pytest.raises(NotImplementedError, match="where reduction not supported on CUDA"): + c.points(df_cuda, 'x', 'y', ds.where(ds.min('i32'), 'reverse')) + + @pytest.mark.parametrize('df', dfs) def test_mean(df): out = xr.DataArray(values(df.i32).reshape((2, 2, 5)).mean(axis=2, dtype='f8').T, @@ -2184,12 +2209,8 @@ def test_line_antialias_duplicate_points(self_intersect): @pytest.mark.parametrize('reduction', [ - #ds.mean('value'), ds.std('value'), ds.var('value'), - #ds.summary(c=ds.count()), ds.sum('value'), - #ds.first('value'), - #ds.last('value'), ]) def test_line_antialias_reduction_not_implemented(reduction): # Issue #1133, detect and report reductions that are not implemented. @@ -2200,6 +2221,47 @@ def test_line_antialias_reduction_not_implemented(reduction): cvs.line(df, 'x', 'y', line_width=1, agg=reduction) +def test_line_antialias_where(): + x = np.arange(3) + df = pd.DataFrame(dict( + y0 = [0.0, 0.5, 1.0], + y1 = [1.0, 0.0, 0.5], + y2 = [0.0, 1.0, 0.0], + value = [1.1, 2.2, 3.3], + other = [-9.0, -7.0, -5.0], + )) + + cvs = ds.Canvas(plot_width=7, plot_height=5) + + sol_where_max = np.array([ + [-9., -7., -7., -7., -7., -5., -5.], + [-7., -7., -7., -5., -5., -5., -9.], + [-7., -9., -5., -5., -5., -7., nan], + [-5., -5., -5., -5., -9., -7., -7.], + [-5., -5., -9., -9., -9., -7., -7.], + ]) + + agg_where_max = cvs.line( + source=df, x=x, y=["y0", "y1", "y2"], axis=1, line_width=1.0, + agg=ds.where(ds.max("value"), "other"), + ) + assert_eq_ndarray(agg_where_max.data, sol_where_max) + + sol_where_min = np.array([ + [-9., -9., -7., -7., -7., -9., -9.], + [-9., -9., -7., -7., -7., -9., -9.], + [-7., -9., -9., -5., -9., -9., nan], + [-5., -9., -9., -9., -9., -9., -7.], + [-5., -5., -9., -9., -9., -7., -7.], + ]) + + agg_where_min = cvs.line( + source=df, x=x, y=["y0", "y1", "y2"], axis=1, line_width=1.0, + agg=ds.where(ds.min("value"), "other"), + ) + assert_eq_ndarray(agg_where_min.data, sol_where_min) + + @pytest.mark.parametrize('reduction,dtype,aa_dtype', [ (ds.any(), bool, np.float32), (ds.count(), np.uint32, np.float32), @@ -2232,6 +2294,36 @@ def test_log_axis_not_positive(df, canvas): canvas.line(df, 'x', 'y') +@pytest.mark.parametrize('selector', [ + ds.any(), + ds.count(), + ds.first('value'), + ds.last('value'), + ds.mean('value'), + ds.std('value'), + ds.sum('value'), + ds.summary(any=ds.any()), + ds.var('value'), + ds.where(ds.max('value'), 'other'), +]) +def test_where_unsupported_selector(selector): + cvs = ds.Canvas(plot_width=10, plot_height=10) + df = pd.DataFrame(dict(x=[0, 1], y=[1, 2], value=[1, 2], )) + + with pytest.raises(TypeError, match='selector can only be a max or min reduction'): + cvs.line(df, 'x', 'y', agg=ds.where(selector, 'value')) + + +def test_by_cannot_use_where(): + cvs = ds.Canvas(plot_width=10, plot_height=10) + df = pd.DataFrame(dict(x=[0, 1], y=[1, 2], value=[1, 2], cat=['a', 'b'])) + df["cat"] = df["cat"].astype("category") + + msg = "'by' reduction does not support 'where' reduction for its first argument" + with pytest.raises(TypeError, match=msg): + cvs.line(df, 'x', 'y',agg=ds.by('cat', ds.where(ds.max('value'), 'other'))) + + def test_line_coordinate_lengths(): # Issue #1159. cvs = ds.Canvas(plot_width=10, plot_height=6)