Skip to content

Commit

Permalink
Categorical max_row_index, max_n_row_index and min equivalents (#1233)
Browse files Browse the repository at this point in the history
* Support by(_max_row_index), by(_max_n_row_index) and min equivalents

* Fix on cuda
  • Loading branch information
ianthomas23 authored Jun 13, 2023
1 parent 7672706 commit 3317542
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 69 deletions.
8 changes: 5 additions & 3 deletions datashader/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,17 +192,20 @@ def make_append(bases, cols, calls, glyph, categorical, antialias):
else:
subscript = None
prev_cuda_mutex = False
categorical_arg = None

for func, bases, cols, nan_check_column, temps, _, uses_cuda_mutex in calls:
local_lk.update(zip(temps, (next(names) for i in temps)))
func_name = next(names)
namespace[func_name] = func
args = [arg_lk[i] for i in bases]
if categorical and isinstance(cols[0], category_codes):
pass
categorical_arg = arg_lk[cols[0]]
args.extend('{0}[{1}]'.format(arg_lk[col], subscript) for col in cols[1:])
elif ndims is None:
args.extend('{0}'.format(arg_lk[i]) for i in cols)
elif categorical:
categorical_arg = arg_lk[cols[0]]
args.extend('{0}[{1}][1]'.format(arg_lk[i], subscript)
for i in cols)
else:
Expand Down Expand Up @@ -264,8 +267,7 @@ def make_append(bases, cols, calls, glyph, categorical, antialias):
# Categorical aggregate arrays need to be unpacked
if categorical:
col_index = '' if isinstance(cols[0], category_codes) else '[0]'
signature_index = -2 if any_uses_cuda_mutex else -1
cat_var = 'cat = int({0}[{1}]{2})'.format(signature[signature_index], subscript, col_index)
cat_var = 'cat = int({0}[{1}]{2})'.format(categorical_arg, subscript, col_index)
aggs = ['{0} = {0}[:, :, cat]'.format(s) for s in signature[:len(calls)]]
body = [cat_var] + aggs + body

Expand Down
48 changes: 38 additions & 10 deletions datashader/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,11 +653,19 @@ def out_dshape(self, input_dshape, antialias, cuda, partitioned):

@property
def inputs(self):
return (self.preprocess, )
ret = (self.preprocess, )
# val_column of None here could mean None (e.g. by(any) or could be row index column.
if self.val_column is None and isinstance(self.reduction, (_max_or_min_row_index, _max_n_or_min_n_row_index)):
# Row index column
ret += self.reduction.inputs
return ret

def uses_cuda_mutex(self):
return self.reduction.uses_cuda_mutex()

def uses_row_index(self, cuda, partitioned):
return self.reduction.uses_row_index(cuda, partitioned)

def _antialias_requires_2_stages(self):
return self.reduction._antialias_requires_2_stages()

Expand Down Expand Up @@ -1975,6 +1983,7 @@ def _append_cuda(x, y, agg, field):
@staticmethod
def _combine(aggs):
# Maximum ignoring -1 values
# Works for CPU and GPU
if len(aggs) > 1:
# Works with numpy or cupy arrays
np.maximum(aggs[0], aggs[1], out=aggs[0])
Expand Down Expand Up @@ -2021,17 +2030,22 @@ def _build_combine(self, dshape, antialias, cuda, partitioned):
@staticmethod
def _combine(aggs):
# Minimum ignoring -1 values
ret = aggs[0]
if len(aggs) > 1:
# Can take 2d (ny, nx) or 3d (ny, nx, ncat) arrays.
row_min_in_place(aggs[0], aggs[1])
return aggs[0]
return ret

@staticmethod
def _combine_cuda(aggs):
ret = aggs[0]
if len(aggs) > 1:
kernel_args = cuda_args(ret.shape)
if ret.ndim == 2: # ndim is either 2 (ny, nx) or 3 (ny, nx, ncat)
# 3d view of each agg
aggs = [cp.expand_dims(agg, 2) for agg in aggs]
kernel_args = cuda_args(ret.shape[:3])
for i in range(1, len(aggs)):
cuda_row_min_in_place[kernel_args](ret, aggs[i])
cuda_row_min_in_place[kernel_args](aggs[0], aggs[i])
return ret


Expand Down Expand Up @@ -2115,17 +2129,24 @@ def _append_cuda(x, y, agg, field):

@staticmethod
def _combine(aggs):
ret = aggs[0]
if len(aggs) > 1:
if ret.ndim == 3: # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n)
# 4d view of each agg
aggs = [np.expand_dims(agg, 2) for agg in aggs]
row_max_n_in_place(aggs[0], aggs[1])
return aggs[0]
return ret

@staticmethod
def _combine_cuda(aggs):
ret = aggs[0]
if len(aggs) > 1:
kernel_args = cuda_args(ret.shape[:2])
if ret.ndim == 3: # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n)
# 4d view of each agg
aggs = [cp.expand_dims(agg, 2) for agg in aggs]
kernel_args = cuda_args(aggs[0].shape[:3])
for i in range(1, len(aggs)):
cuda_row_max_n_in_place[kernel_args](ret, aggs[i])
cuda_row_max_n_in_place[kernel_args](aggs[0], aggs[i])
return ret


Expand Down Expand Up @@ -2173,17 +2194,24 @@ def _append_cuda(x, y, agg, field):

@staticmethod
def _combine(aggs):
ret = aggs[0]
if len(aggs) > 1:
if ret.ndim == 3: # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n)
# 4d view of each agg
aggs = [np.expand_dims(agg, 2) for agg in aggs]
row_min_n_in_place(aggs[0], aggs[1])
return aggs[0]
return ret

@staticmethod
def _combine_cuda(aggs):
ret = aggs[0]
if len(aggs) > 1:
kernel_args = cuda_args(ret.shape[:2])
if ret.ndim == 3: # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n)
# 4d view of each agg
aggs = [cp.expand_dims(agg, 2) for agg in aggs]
kernel_args = cuda_args(aggs[0].shape[:3])
for i in range(1, len(aggs)):
cuda_row_min_n_in_place[kernel_args](ret, aggs[i])
cuda_row_min_n_in_place[kernel_args](aggs[0], aggs[i])
return ret


Expand Down
54 changes: 54 additions & 0 deletions datashader/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,60 @@ def test_categorical_max_n(ddf, npartitions):
assert_eq_ndarray(agg[..., 0].data, c.points(ddf, 'x', 'y', ds.by('cat2', ds.max('f32'))).data)


@pytest.mark.parametrize('ddf', ddfs)
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
def test_categorical_min_row_index(ddf, npartitions):
ddf = ddf.repartition(npartitions)
assert ddf.npartitions == npartitions
solution = np.array([[[0, 1, 2, 3], [12, 13, 10, 11]], [[8, 5, 6, 7], [16, 17, 18, 15]]])
agg = c.points(ddf, 'x', 'y', ds.by('cat2', ds._min_row_index()))
assert_eq_ndarray(agg.data, solution)


@pytest.mark.parametrize('ddf', ddfs)
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
def test_categorical_max_row_index(ddf, npartitions):
ddf = ddf.repartition(npartitions)
assert ddf.npartitions == npartitions
solution = np.array([[[4, 1, 2, 3], [12, 13, 14, 11]], [[8, 9, 6, 7], [16, 17, 18, 19]]])
agg = c.points(ddf, 'x', 'y', ds.by('cat2', ds._max_row_index()))
assert_eq_ndarray(agg.data, solution)


@pytest.mark.parametrize('ddf', ddfs)
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
def test_categorical_min_n_row_index(ddf, npartitions):
ddf = ddf.repartition(npartitions)
assert ddf.npartitions == npartitions
solution = np.array([[[[0, 4, -1], [1, -1, -1], [2, -1, -1], [3, -1, -1]],
[[12, -1, -1], [13, -1, -1], [10, 14, -1], [11, -1, -1]]],
[[[8, -1, -1], [5, 9, -1], [6, -1, -1], [7, -1, -1]],
[[16, -1, -1], [17, -1, -1], [18, -1, -1], [15, 19, -1]]]])
for n in range(1, 3):
agg = c.points(ddf, 'x', 'y', ds.by('cat2', ds._min_n_row_index(n=n)))
out = solution[:, :, :, :n]
assert_eq_ndarray(agg.data, out)
if n == 1:
assert_eq_ndarray(agg[..., 0].data, c.points(ddf, 'x', 'y', ds.by('cat2', ds._min_row_index())).data)


@pytest.mark.parametrize('ddf', ddfs)
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
def test_categorical_max_n_row_index(ddf, npartitions):
ddf = ddf.repartition(npartitions)
assert ddf.npartitions == npartitions
solution = np.array([[[[4, 0, -1], [1, -1, -1], [2, -1, -1], [3, -1, -1]],
[[12, -1, -1], [13, -1, -1], [14, 10, -1], [11, -1, -1]]],
[[[8, -1, -1], [9, 5, -1], [6, -1, -1], [7, -1, -1]],
[[16, -1, -1], [17, -1, -1], [18, -1, -1], [19, 15, -1]]]])
for n in range(1, 3):
agg = c.points(ddf, 'x', 'y', ds.by('cat2', ds._max_n_row_index(n=n)))
out = solution[:, :, :, :n]
assert_eq_ndarray(agg.data, out)
if n == 1:
assert_eq_ndarray(agg[..., 0].data, c.points(ddf, 'x', 'y', ds.by('cat2', ds._max_row_index())).data)


@pytest.mark.parametrize('ddf', ddfs)
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
def test_where_max(ddf, npartitions):
Expand Down
42 changes: 42 additions & 0 deletions datashader/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,48 @@ def test_categorical_max_n(df):
assert_eq_ndarray(agg[..., 0].data, c.points(df, 'x', 'y', ds.by('cat2', ds.max('f32'))).data)


@pytest.mark.parametrize('df', dfs)
def test_categorical_min_row_index(df):
solution = np.array([[[0, 1, 2, 3], [12, 13, 10, 11]], [[8, 5, 6, 7], [16, 17, 18, 15]]])
agg = c.points(df, 'x', 'y', ds.by('cat2', ds._min_row_index()))
assert_eq_ndarray(agg.data, solution)


@pytest.mark.parametrize('df', dfs)
def test_categorical_max_row_index(df):
solution = np.array([[[4, 1, 2, 3], [12, 13, 14, 11]], [[8, 9, 6, 7], [16, 17, 18, 19]]])
agg = c.points(df, 'x', 'y', ds.by('cat2', ds._max_row_index()))
assert_eq_ndarray(agg.data, solution)


@pytest.mark.parametrize('df', dfs)
def test_categorical_min_n_row_index(df):
solution = np.array([[[[0, 4, -1], [1, -1, -1], [2, -1, -1], [3, -1, -1]],
[[12, -1, -1], [13, -1, -1], [10, 14, -1], [11, -1, -1]]],
[[[8, -1, -1], [5, 9, -1], [6, -1, -1], [7, -1, -1]],
[[16, -1, -1], [17, -1, -1], [18, -1, -1], [15, 19, -1]]]])
for n in range(1, 3):
agg = c.points(df, 'x', 'y', ds.by('cat2', ds._min_n_row_index(n=n)))
out = solution[:, :, :, :n]
assert_eq_ndarray(agg.data, out)
if n == 1:
assert_eq_ndarray(agg[..., 0].data, c.points(df, 'x', 'y', ds.by('cat2', ds._min_row_index())).data)


@pytest.mark.parametrize('df', dfs)
def test_categorical_max_n_row_index(df):
solution = np.array([[[[4, 0, -1], [1, -1, -1], [2, -1, -1], [3, -1, -1]],
[[12, -1, -1], [13, -1, -1], [14, 10, -1], [11, -1, -1]]],
[[[8, -1, -1], [9, 5, -1], [6, -1, -1], [7, -1, -1]],
[[16, -1, -1], [17, -1, -1], [18, -1, -1], [19, 15, -1]]]])
for n in range(1, 3):
agg = c.points(df, 'x', 'y', ds.by('cat2', ds._max_n_row_index(n=n)))
out = solution[:, :, :, :n]
assert_eq_ndarray(agg.data, out)
if n == 1:
assert_eq_ndarray(agg[..., 0].data, c.points(df, 'x', 'y', ds.by('cat2', ds._max_row_index())).data)


@pytest.mark.parametrize('df', dfs)
def test_where_min_row_index(df):
out = xr.DataArray([[0, 10], [-5, -15]], coords=coords, dims=dims)
Expand Down
32 changes: 16 additions & 16 deletions datashader/transfer_functions/_cuda_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,22 +258,22 @@ def cuda_nanmin_n_in_place(ret, other):
def cuda_row_min_in_place(ret, other):
"""CUDA equivalent of row_min_in_place.
"""
ny, nx = ret.shape
x, y = cuda.grid(2)
if x < nx and y < ny:
if other[y, x] > -1 and (ret[y, x] == -1 or other[y, x] < ret[y, x]):
ret[y, x] = other[y, x]
ny, nx, ncat = ret.shape
x, y, cat = cuda.grid(3)
if x < nx and y < ny and cat < ncat:
if other[y, x, cat] > -1 and (ret[y, x, cat] == -1 or other[y, x, cat] < ret[y, x, cat]):
ret[y, x, cat] = other[y, x, cat]


@cuda.jit
def cuda_row_max_n_in_place(ret, other):
"""CUDA equivalent of row_max_n_in_place.
"""
ny, nx, n = ret.shape
x, y = cuda.grid(2)
if x < nx and y < ny:
ret_pixel = ret[y, x] # 1D array of n values for single pixel
other_pixel = other[y, x] # ditto
ny, nx, ncat, n = ret.shape
x, y, cat = cuda.grid(3)
if x < nx and y < ny and cat < ncat:
ret_pixel = ret[y, x, cat] # 1D array of n values for single pixel
other_pixel = other[y, x, cat] # ditto
# Walk along other_pixel array a value at a time, find insertion
# index in ret_pixel and bump values along to insert. Next
# other_pixel value is inserted at a higher index, so this walks
Expand All @@ -286,7 +286,7 @@ def cuda_row_max_n_in_place(ret, other):
for i in range(istart, n):
if ret_pixel[i] == -1 or other_value > ret_pixel[i]:
# Bump values along then insert.
for j in range(n-1, i, -1): # fails
for j in range(n-1, i, -1):
ret_pixel[j] = ret_pixel[j-1]
ret_pixel[i] = other_value
istart = i+1
Expand All @@ -297,11 +297,11 @@ def cuda_row_max_n_in_place(ret, other):
def cuda_row_min_n_in_place(ret, other):
"""CUDA equivalent of row_min_n_in_place.
"""
ny, nx, n = ret.shape
x, y = cuda.grid(2)
if x < nx and y < ny:
ret_pixel = ret[y, x] # 1D array of n values for single pixel
other_pixel = other[y, x] # ditto
ny, nx, ncat, n = ret.shape
x, y, cat = cuda.grid(3)
if x < nx and y < ny and cat < ncat:
ret_pixel = ret[y, x, cat] # 1D array of n values for single pixel
other_pixel = other[y, x, cat] # ditto
# Walk along other_pixel array a value at a time, find insertion
# index in ret_pixel and bump values along to insert. Next
# other_pixel value is inserted at a higher index, so this walks
Expand Down
Loading

0 comments on commit 3317542

Please sign in to comment.