Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk control #3361

Merged
merged 19 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 123 additions & 32 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2017 - 2018, Met Office
# (C) British Crown Copyright 2017 - 2019, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -27,8 +27,9 @@

import dask
import dask.array as da
import dask.context
from dask.local import get_sync as dget_sync
import dask.config
import dask.utils

import numpy as np
import numpy.ma as ma

Expand Down Expand Up @@ -58,26 +59,104 @@ def is_lazy_data(data):
return result


# A magic value, chosen to minimise chunk creation time and chunk processing
# time within dask.
_MAX_CHUNK_SIZE = 8 * 1024 * 1024 * 2
def _optimum_chunksize(chunks, shape,
limit=None,
dtype=np.dtype('f4')):
"""
Reduce or increase an initial chunk shape to get close to a chosen ideal
size, while prioritising the splitting of the earlier (outer) dimensions
and keeping intact the later (inner) ones.

Args:

def _limited_shape(shape):
# Reduce a shape to less than a default overall number-of-points, reducing
# earlier dimensions preferentially.
# Note: this is only a heuristic, assuming that earlier dimensions are
# 'outer' storage dimensions -- not *always* true, even for NetCDF data.
shape = list(shape)
i_reduce = 0
while np.prod(shape) > _MAX_CHUNK_SIZE:
factor = np.ceil(np.prod(shape) / _MAX_CHUNK_SIZE)
new_dim = int(shape[i_reduce] / factor)
if new_dim < 1:
new_dim = 1
shape[i_reduce] = new_dim
i_reduce += 1
return tuple(shape)
* chunks (tuple of int, or None):
Pre-existing chunk shape of the target data : None if unknown.
* shape (tuple of int):
The full array shape of the target data.
* limit (int):
The 'ideal' target chunk size, in bytes. Default from dask.config.
* dtype (np.dtype):
Numpy dtype of target data.

Returns:
* chunk (tuple of int):
The proposed shape of one full chunk.

.. note::
The purpose of this is very similar to
`dask.array.core.normalize_chunks`, when called as
`(chunks='auto', shape, dtype=dtype, previous_chunks=chunks, ...)`.
Except, the operation here is optimised specifically for a 'c-like'
dimension order, i.e. outer dimensions first, as for netcdf variables.
So if, in future, this policy can be implemented in dask, then we would
prefer to replace this function with a call to that one.
Accordingly, the arguments roughly match 'normalize_chunks', except
that we don't support the alternative argument forms of that routine.
The return value, however, is a single 'full chunk', rather than a
complete chunking scheme : so an equivalent code usage could be
"chunks = [c[0] for c in normalise_chunks('auto', ...)]".

"""
# Set the chunksize limit.
if limit is None:
# Fetch the default 'optimal' chunksize from the dask config.
limit = dask.config.get('array.chunk-size')
# Convert to bytes
limit = dask.utils.parse_bytes(limit)

point_size_limit = limit / dtype.itemsize

# Create result chunks, starting with a copy of the input.
result = list(chunks)

if np.prod(result) < point_size_limit:
# If size is less than maximum, expand the chunks, multiplying later
# (i.e. inner) dims first.
i_expand = len(shape) - 1
while np.prod(result) < point_size_limit and i_expand >= 0:
factor = np.floor(point_size_limit * 1.0 / np.prod(result))
new_dim = result[i_expand] * int(factor)
if new_dim >= shape[i_expand]:
# Clip to dim size : chunk dims must not exceed the full shape.
new_dim = shape[i_expand]
else:
# 'new_dim' is less than the relevant dim of 'shape' -- but it
# is also the largest possible multiple of the input-chunks,
# within the size limit.
# So : 'i_expand' is the outer (last) dimension over which we
# will multiply the input chunks, and 'new_dim' is a value that
# ensures the fewest possible chunks within that dim.

# Now replace 'new_dim' with the value **closest to equal-size
# chunks**, for the same (minimum) number of chunks.
# More-equal chunks are practically better.
# E.G. : "divide 8 into multiples of 2, with a limit of 7",
# produces new_dim=6, which would mean chunks of sizes (6, 2).
# But (4, 4) is clearly better for memory and time cost.

# Calculate how many (expanded) chunks fit into this dimension.
dim_chunks = np.ceil(shape[i_expand] * 1. / new_dim)
# Get "ideal" (equal) size for that many chunks.
ideal_equal_chunk_size = shape[i_expand] / dim_chunks
# Use the nearest whole multiple of input chunks >= ideal.
new_dim = int(result[i_expand] *
np.ceil(ideal_equal_chunk_size /
result[i_expand]))

result[i_expand] = new_dim
i_expand -= 1
else:
# Similarly, reduce if too big, reducing earlier (outer) dims first.
i_reduce = 0
while np.prod(result) > point_size_limit:
factor = np.ceil(np.prod(result) / point_size_limit)
new_dim = int(result[i_reduce] / factor)
if new_dim < 1:
new_dim = 1
result[i_reduce] = new_dim
i_reduce += 1

return tuple(result)


def as_lazy_data(data, chunks=None, asarray=False):
Expand All @@ -86,29 +165,41 @@ def as_lazy_data(data, chunks=None, asarray=False):

Args:

* data:
An array. This will be converted to a dask array.
* data (array-like):
An indexable object with 'shape', 'dtype' and 'ndim' properties.
This will be converted to a dask array.

Kwargs:

* chunks:
Describes how the created dask array should be split up. Defaults to a
value first defined in biggus (being `8 * 1024 * 1024 * 2`).
For more information see
http://dask.pydata.org/en/latest/array-creation.html#chunks.
* chunks (list of int):
If present, a source chunk shape, e.g. for a chunked netcdf variable.

* asarray:
* asarray (bool):
If True, then chunks will be converted to instances of `ndarray`.
Set to False (default) to pass passed chunks through unchanged.

Returns:
The input array converted to a dask array.

.. note::
The result chunk size is a multiple of 'chunks', if given, up to the
dask default chunksize, i.e. `dask.config.get('array.chunk-size'),
or the full data shape if that is smaller.
If 'chunks' is not given, the result has chunks of the full data shape,
but reduced by a factor if that exceeds the dask default chunksize.

"""
if chunks is None:
# Default to the shape of the wrapped array-like,
# but reduce it if larger than a default maximum size.
chunks = _limited_shape(data.shape)
# No existing chunks : Make a chunk the shape of the entire input array
# (but we will subdivide it if too big).
chunks = list(data.shape)

# Adjust chunk size for better dask performance,
# NOTE: but only if no shape dimension is zero, so that we can handle the
# PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0).
if all(elem > 0 for elem in data.shape):
# Expand or reduce the basic chunk shape to an optimum size.
chunks = _optimum_chunksize(chunks, shape=data.shape, dtype=data.dtype)

if isinstance(data, ma.core.MaskedConstant):
data = ma.masked_array(data.data, mask=data.mask)
Expand Down
6 changes: 4 additions & 2 deletions lib/iris/fileformats/netcdf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2010 - 2018, Met Office
# (C) British Crown Copyright 2010 - 2019, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -510,8 +510,10 @@ def _get_cf_var_data(cf_var, filename):
netCDF4.default_fillvals[cf_var.dtype.str[1:]])
proxy = NetCDFDataProxy(cf_var.shape, dtype, filename, cf_var.cf_name,
fill_value)
# Get the chunking specified for the variable : this is either a shape, or
# maybe the string "contiguous".
chunks = cf_var.cf_data.chunking()
# Chunks can be an iterable, None, or `'contiguous'`.
# In the "contiguous" case, pass chunks=None to 'as_lazy_data'.
if chunks == 'contiguous':
chunks = None
return as_lazy_data(proxy, chunks=chunks)
Expand Down
27 changes: 9 additions & 18 deletions lib/iris/tests/unit/analysis/test_RMS.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,22 @@ def test_masked_weighted(self):
class Test_lazy_aggregate(tests.IrisTest):
def test_1d(self):
# 1-dimensional input.
data = as_lazy_data(np.array([5, 2, 6, 4], dtype=np.float64),
chunks=-1)
data = as_lazy_data(np.array([5, 2, 6, 4], dtype=np.float64))
rms = RMS.lazy_aggregate(data, 0)
expected_rms = 4.5
self.assertAlmostEqual(rms, expected_rms)

def test_2d(self):
# 2-dimensional input.
data = as_lazy_data(np.array([[5, 2, 6, 4], [12, 4, 10, 8]],
dtype=np.float64),
chunks=-1)
dtype=np.float64))
expected_rms = np.array([4.5, 9.0], dtype=np.float64)
rms = RMS.lazy_aggregate(data, 1)
self.assertArrayAlmostEqual(rms, expected_rms)

def test_1d_weighted(self):
# 1-dimensional input with weights.
data = as_lazy_data(np.array([4, 7, 10, 8], dtype=np.float64),
chunks=-1)
data = as_lazy_data(np.array([4, 7, 10, 8], dtype=np.float64))
weights = np.array([1, 4, 3, 2], dtype=np.float64)
expected_rms = 8.0
# https://github.com/dask/dask/issues/3846.
Expand All @@ -120,10 +117,8 @@ def test_1d_weighted(self):

def test_1d_lazy_weighted(self):
# 1-dimensional input with lazy weights.
data = as_lazy_data(np.array([4, 7, 10, 8], dtype=np.float64),
chunks=-1)
weights = as_lazy_data(np.array([1, 4, 3, 2], dtype=np.float64),
chunks=-1)
data = as_lazy_data(np.array([4, 7, 10, 8], dtype=np.float64))
weights = as_lazy_data(np.array([1, 4, 3, 2], dtype=np.float64))
expected_rms = 8.0
# https://github.com/dask/dask/issues/3846.
with self.assertRaisesRegexp(TypeError, 'unexpected keyword argument'):
Expand All @@ -133,8 +128,7 @@ def test_1d_lazy_weighted(self):
def test_2d_weighted(self):
# 2-dimensional input with weights.
data = as_lazy_data(np.array([[4, 7, 10, 8], [14, 16, 20, 8]],
dtype=np.float64),
chunks=-1)
dtype=np.float64))
weights = np.array([[1, 4, 3, 2], [2, 1, 1.5, 0.5]], dtype=np.float64)
expected_rms = np.array([8.0, 16.0], dtype=np.float64)
# https://github.com/dask/dask/issues/3846.
Expand All @@ -144,8 +138,7 @@ def test_2d_weighted(self):

def test_unit_weighted(self):
# Unit weights should be the same as no weights.
data = as_lazy_data(np.array([5, 2, 6, 4], dtype=np.float64),
chunks=-1)
data = as_lazy_data(np.array([5, 2, 6, 4], dtype=np.float64))
weights = np.ones_like(data)
expected_rms = 4.5
# https://github.com/dask/dask/issues/3846.
Expand All @@ -157,8 +150,7 @@ def test_masked(self):
# Masked entries should be completely ignored.
data = as_lazy_data(ma.array([5, 10, 2, 11, 6, 4],
mask=[False, True, False, True, False, False],
dtype=np.float64),
chunks=-1)
dtype=np.float64))
expected_rms = 4.5
rms = RMS.lazy_aggregate(data, 0)
self.assertAlmostEqual(rms, expected_rms)
Expand All @@ -169,8 +161,7 @@ def test_masked_weighted(self):
# For now, masked weights are simply not supported.
data = as_lazy_data(ma.array([4, 7, 18, 10, 11, 8],
mask=[False, False, True, False, True, False],
dtype=np.float64),
chunks=-1)
dtype=np.float64))
weights = np.array([1, 4, 5, 3, 8, 2])
expected_rms = 8.0
with self.assertRaisesRegexp(TypeError, 'unexpected keyword argument'):
Expand Down
6 changes: 2 additions & 4 deletions lib/iris/tests/unit/cube/test_Cube.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2013 - 2018, Met Office
# (C) British Crown Copyright 2013 - 2019, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -1475,9 +1475,7 @@ def test__masked_scalar_arraymask(self):
self._check_copy(cube, cube.copy())

def test__lazy(self):
# Note: multiple chunks added as a workaround suggested to dask#3751,
# which is fixed in dask#3754.
cube = Cube(as_lazy_data(np.array([1, 0]), chunks=(1, 1)))
cube = Cube(as_lazy_data(np.array([1, 0])))
self._check_copy(cube, cube.copy())


Expand Down
15 changes: 8 additions & 7 deletions lib/iris/tests/unit/fileformats/netcdf/test__get_cf_var_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2018, Met Office
# (C) British Crown Copyright 2019, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -26,7 +26,7 @@
from dask.array import Array as dask_array
import numpy as np

from iris._lazy_data import _limited_shape
from iris._lazy_data import _optimum_chunksize
import iris.fileformats.cf
from iris.fileformats.netcdf import _get_cf_var_data
from iris.tests import mock
Expand All @@ -35,8 +35,8 @@
class Test__get_cf_var_data(tests.IrisTest):
def setUp(self):
self.filename = 'DUMMY'
self.shape = (3, 240, 200)
self.expected_chunks = _limited_shape(self.shape)
self.shape = (300000, 240, 200)
self.expected_chunks = _optimum_chunksize(self.shape, self.shape)

def _make(self, chunksizes):
cf_data = mock.Mock(_FillValue=None)
Expand All @@ -55,15 +55,16 @@ def test_cf_data_type(self):
self.assertIsInstance(lazy_data, dask_array)

def test_cf_data_chunks(self):
chunks = [1, 12, 100]
chunks = [2500, 240, 200]
cf_var = self._make(chunks)
lazy_data = _get_cf_var_data(cf_var, self.filename)
lazy_data_chunks = [c[0] for c in lazy_data.chunks]
self.assertArrayEqual(chunks, lazy_data_chunks)
expected_chunks = _optimum_chunksize(chunks, self.shape)
self.assertArrayEqual(lazy_data_chunks, expected_chunks)

def test_cf_data_no_chunks(self):
# No chunks means chunks are calculated from the array's shape by
# `iris._lazy_data._limited_shape()`.
# `iris._lazy_data._optimum_chunksize()`.
chunks = None
cf_var = self._make(chunks)
lazy_data = _get_cf_var_data(cf_var, self.filename)
Expand Down
Loading