diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index f5312b7d3a..2de93824d8 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -1,4 +1,4 @@ -# (C) British Crown Copyright 2017 - 2018, Met Office +# (C) British Crown Copyright 2017 - 2019, Met Office # # This file is part of Iris. # @@ -27,8 +27,9 @@ import dask import dask.array as da -import dask.context -from dask.local import get_sync as dget_sync +import dask.config +import dask.utils + import numpy as np import numpy.ma as ma @@ -58,26 +59,104 @@ def is_lazy_data(data): return result -# A magic value, chosen to minimise chunk creation time and chunk processing -# time within dask. -_MAX_CHUNK_SIZE = 8 * 1024 * 1024 * 2 +def _optimum_chunksize(chunks, shape, + limit=None, + dtype=np.dtype('f4')): + """ + Reduce or increase an initial chunk shape to get close to a chosen ideal + size, while prioritising the splitting of the earlier (outer) dimensions + and keeping intact the later (inner) ones. + Args: -def _limited_shape(shape): - # Reduce a shape to less than a default overall number-of-points, reducing - # earlier dimensions preferentially. - # Note: this is only a heuristic, assuming that earlier dimensions are - # 'outer' storage dimensions -- not *always* true, even for NetCDF data. - shape = list(shape) - i_reduce = 0 - while np.prod(shape) > _MAX_CHUNK_SIZE: - factor = np.ceil(np.prod(shape) / _MAX_CHUNK_SIZE) - new_dim = int(shape[i_reduce] / factor) - if new_dim < 1: - new_dim = 1 - shape[i_reduce] = new_dim - i_reduce += 1 - return tuple(shape) + * chunks (tuple of int, or None): + Pre-existing chunk shape of the target data : None if unknown. + * shape (tuple of int): + The full array shape of the target data. + * limit (int): + The 'ideal' target chunk size, in bytes. Default from dask.config. + * dtype (np.dtype): + Numpy dtype of target data. + + Returns: + * chunk (tuple of int): + The proposed shape of one full chunk. + + .. note:: + The purpose of this is very similar to + `dask.array.core.normalize_chunks`, when called as + `(chunks='auto', shape, dtype=dtype, previous_chunks=chunks, ...)`. + Except, the operation here is optimised specifically for a 'c-like' + dimension order, i.e. outer dimensions first, as for netcdf variables. + So if, in future, this policy can be implemented in dask, then we would + prefer to replace this function with a call to that one. + Accordingly, the arguments roughly match 'normalize_chunks', except + that we don't support the alternative argument forms of that routine. + The return value, however, is a single 'full chunk', rather than a + complete chunking scheme : so an equivalent code usage could be + "chunks = [c[0] for c in normalise_chunks('auto', ...)]". + + """ + # Set the chunksize limit. + if limit is None: + # Fetch the default 'optimal' chunksize from the dask config. + limit = dask.config.get('array.chunk-size') + # Convert to bytes + limit = dask.utils.parse_bytes(limit) + + point_size_limit = limit / dtype.itemsize + + # Create result chunks, starting with a copy of the input. + result = list(chunks) + + if np.prod(result) < point_size_limit: + # If size is less than maximum, expand the chunks, multiplying later + # (i.e. inner) dims first. + i_expand = len(shape) - 1 + while np.prod(result) < point_size_limit and i_expand >= 0: + factor = np.floor(point_size_limit * 1.0 / np.prod(result)) + new_dim = result[i_expand] * int(factor) + if new_dim >= shape[i_expand]: + # Clip to dim size : chunk dims must not exceed the full shape. + new_dim = shape[i_expand] + else: + # 'new_dim' is less than the relevant dim of 'shape' -- but it + # is also the largest possible multiple of the input-chunks, + # within the size limit. + # So : 'i_expand' is the outer (last) dimension over which we + # will multiply the input chunks, and 'new_dim' is a value that + # ensures the fewest possible chunks within that dim. + + # Now replace 'new_dim' with the value **closest to equal-size + # chunks**, for the same (minimum) number of chunks. + # More-equal chunks are practically better. + # E.G. : "divide 8 into multiples of 2, with a limit of 7", + # produces new_dim=6, which would mean chunks of sizes (6, 2). + # But (4, 4) is clearly better for memory and time cost. + + # Calculate how many (expanded) chunks fit into this dimension. + dim_chunks = np.ceil(shape[i_expand] * 1. / new_dim) + # Get "ideal" (equal) size for that many chunks. + ideal_equal_chunk_size = shape[i_expand] / dim_chunks + # Use the nearest whole multiple of input chunks >= ideal. + new_dim = int(result[i_expand] * + np.ceil(ideal_equal_chunk_size / + result[i_expand])) + + result[i_expand] = new_dim + i_expand -= 1 + else: + # Similarly, reduce if too big, reducing earlier (outer) dims first. + i_reduce = 0 + while np.prod(result) > point_size_limit: + factor = np.ceil(np.prod(result) / point_size_limit) + new_dim = int(result[i_reduce] / factor) + if new_dim < 1: + new_dim = 1 + result[i_reduce] = new_dim + i_reduce += 1 + + return tuple(result) def as_lazy_data(data, chunks=None, asarray=False): @@ -86,29 +165,41 @@ def as_lazy_data(data, chunks=None, asarray=False): Args: - * data: - An array. This will be converted to a dask array. + * data (array-like): + An indexable object with 'shape', 'dtype' and 'ndim' properties. + This will be converted to a dask array. Kwargs: - * chunks: - Describes how the created dask array should be split up. Defaults to a - value first defined in biggus (being `8 * 1024 * 1024 * 2`). - For more information see - http://dask.pydata.org/en/latest/array-creation.html#chunks. + * chunks (list of int): + If present, a source chunk shape, e.g. for a chunked netcdf variable. - * asarray: + * asarray (bool): If True, then chunks will be converted to instances of `ndarray`. Set to False (default) to pass passed chunks through unchanged. Returns: The input array converted to a dask array. + .. note:: + The result chunk size is a multiple of 'chunks', if given, up to the + dask default chunksize, i.e. `dask.config.get('array.chunk-size'), + or the full data shape if that is smaller. + If 'chunks' is not given, the result has chunks of the full data shape, + but reduced by a factor if that exceeds the dask default chunksize. + """ if chunks is None: - # Default to the shape of the wrapped array-like, - # but reduce it if larger than a default maximum size. - chunks = _limited_shape(data.shape) + # No existing chunks : Make a chunk the shape of the entire input array + # (but we will subdivide it if too big). + chunks = list(data.shape) + + # Adjust chunk size for better dask performance, + # NOTE: but only if no shape dimension is zero, so that we can handle the + # PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0). + if all(elem > 0 for elem in data.shape): + # Expand or reduce the basic chunk shape to an optimum size. + chunks = _optimum_chunksize(chunks, shape=data.shape, dtype=data.dtype) if isinstance(data, ma.core.MaskedConstant): data = ma.masked_array(data.data, mask=data.mask) diff --git a/lib/iris/fileformats/netcdf.py b/lib/iris/fileformats/netcdf.py index 8c965ef1a5..1fc71c5f36 100644 --- a/lib/iris/fileformats/netcdf.py +++ b/lib/iris/fileformats/netcdf.py @@ -1,4 +1,4 @@ -# (C) British Crown Copyright 2010 - 2018, Met Office +# (C) British Crown Copyright 2010 - 2019, Met Office # # This file is part of Iris. # @@ -510,8 +510,10 @@ def _get_cf_var_data(cf_var, filename): netCDF4.default_fillvals[cf_var.dtype.str[1:]]) proxy = NetCDFDataProxy(cf_var.shape, dtype, filename, cf_var.cf_name, fill_value) + # Get the chunking specified for the variable : this is either a shape, or + # maybe the string "contiguous". chunks = cf_var.cf_data.chunking() - # Chunks can be an iterable, None, or `'contiguous'`. + # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. if chunks == 'contiguous': chunks = None return as_lazy_data(proxy, chunks=chunks) diff --git a/lib/iris/tests/unit/analysis/test_RMS.py b/lib/iris/tests/unit/analysis/test_RMS.py index 150e0f9040..2ad7d75ed3 100644 --- a/lib/iris/tests/unit/analysis/test_RMS.py +++ b/lib/iris/tests/unit/analysis/test_RMS.py @@ -92,8 +92,7 @@ def test_masked_weighted(self): class Test_lazy_aggregate(tests.IrisTest): def test_1d(self): # 1-dimensional input. - data = as_lazy_data(np.array([5, 2, 6, 4], dtype=np.float64), - chunks=-1) + data = as_lazy_data(np.array([5, 2, 6, 4], dtype=np.float64)) rms = RMS.lazy_aggregate(data, 0) expected_rms = 4.5 self.assertAlmostEqual(rms, expected_rms) @@ -101,16 +100,14 @@ def test_1d(self): def test_2d(self): # 2-dimensional input. data = as_lazy_data(np.array([[5, 2, 6, 4], [12, 4, 10, 8]], - dtype=np.float64), - chunks=-1) + dtype=np.float64)) expected_rms = np.array([4.5, 9.0], dtype=np.float64) rms = RMS.lazy_aggregate(data, 1) self.assertArrayAlmostEqual(rms, expected_rms) def test_1d_weighted(self): # 1-dimensional input with weights. - data = as_lazy_data(np.array([4, 7, 10, 8], dtype=np.float64), - chunks=-1) + data = as_lazy_data(np.array([4, 7, 10, 8], dtype=np.float64)) weights = np.array([1, 4, 3, 2], dtype=np.float64) expected_rms = 8.0 # https://github.com/dask/dask/issues/3846. @@ -120,10 +117,8 @@ def test_1d_weighted(self): def test_1d_lazy_weighted(self): # 1-dimensional input with lazy weights. - data = as_lazy_data(np.array([4, 7, 10, 8], dtype=np.float64), - chunks=-1) - weights = as_lazy_data(np.array([1, 4, 3, 2], dtype=np.float64), - chunks=-1) + data = as_lazy_data(np.array([4, 7, 10, 8], dtype=np.float64)) + weights = as_lazy_data(np.array([1, 4, 3, 2], dtype=np.float64)) expected_rms = 8.0 # https://github.com/dask/dask/issues/3846. with self.assertRaisesRegexp(TypeError, 'unexpected keyword argument'): @@ -133,8 +128,7 @@ def test_1d_lazy_weighted(self): def test_2d_weighted(self): # 2-dimensional input with weights. data = as_lazy_data(np.array([[4, 7, 10, 8], [14, 16, 20, 8]], - dtype=np.float64), - chunks=-1) + dtype=np.float64)) weights = np.array([[1, 4, 3, 2], [2, 1, 1.5, 0.5]], dtype=np.float64) expected_rms = np.array([8.0, 16.0], dtype=np.float64) # https://github.com/dask/dask/issues/3846. @@ -144,8 +138,7 @@ def test_2d_weighted(self): def test_unit_weighted(self): # Unit weights should be the same as no weights. - data = as_lazy_data(np.array([5, 2, 6, 4], dtype=np.float64), - chunks=-1) + data = as_lazy_data(np.array([5, 2, 6, 4], dtype=np.float64)) weights = np.ones_like(data) expected_rms = 4.5 # https://github.com/dask/dask/issues/3846. @@ -157,8 +150,7 @@ def test_masked(self): # Masked entries should be completely ignored. data = as_lazy_data(ma.array([5, 10, 2, 11, 6, 4], mask=[False, True, False, True, False, False], - dtype=np.float64), - chunks=-1) + dtype=np.float64)) expected_rms = 4.5 rms = RMS.lazy_aggregate(data, 0) self.assertAlmostEqual(rms, expected_rms) @@ -169,8 +161,7 @@ def test_masked_weighted(self): # For now, masked weights are simply not supported. data = as_lazy_data(ma.array([4, 7, 18, 10, 11, 8], mask=[False, False, True, False, True, False], - dtype=np.float64), - chunks=-1) + dtype=np.float64)) weights = np.array([1, 4, 5, 3, 8, 2]) expected_rms = 8.0 with self.assertRaisesRegexp(TypeError, 'unexpected keyword argument'): diff --git a/lib/iris/tests/unit/cube/test_Cube.py b/lib/iris/tests/unit/cube/test_Cube.py index 13fbc5660f..59745193ce 100644 --- a/lib/iris/tests/unit/cube/test_Cube.py +++ b/lib/iris/tests/unit/cube/test_Cube.py @@ -1,4 +1,4 @@ -# (C) British Crown Copyright 2013 - 2018, Met Office +# (C) British Crown Copyright 2013 - 2019, Met Office # # This file is part of Iris. # @@ -1475,9 +1475,7 @@ def test__masked_scalar_arraymask(self): self._check_copy(cube, cube.copy()) def test__lazy(self): - # Note: multiple chunks added as a workaround suggested to dask#3751, - # which is fixed in dask#3754. - cube = Cube(as_lazy_data(np.array([1, 0]), chunks=(1, 1))) + cube = Cube(as_lazy_data(np.array([1, 0]))) self._check_copy(cube, cube.copy()) diff --git a/lib/iris/tests/unit/fileformats/netcdf/test__get_cf_var_data.py b/lib/iris/tests/unit/fileformats/netcdf/test__get_cf_var_data.py index 6a1aeb9d61..ed951c4bf9 100644 --- a/lib/iris/tests/unit/fileformats/netcdf/test__get_cf_var_data.py +++ b/lib/iris/tests/unit/fileformats/netcdf/test__get_cf_var_data.py @@ -1,4 +1,4 @@ -# (C) British Crown Copyright 2018, Met Office +# (C) British Crown Copyright 2019, Met Office # # This file is part of Iris. # @@ -26,7 +26,7 @@ from dask.array import Array as dask_array import numpy as np -from iris._lazy_data import _limited_shape +from iris._lazy_data import _optimum_chunksize import iris.fileformats.cf from iris.fileformats.netcdf import _get_cf_var_data from iris.tests import mock @@ -35,8 +35,8 @@ class Test__get_cf_var_data(tests.IrisTest): def setUp(self): self.filename = 'DUMMY' - self.shape = (3, 240, 200) - self.expected_chunks = _limited_shape(self.shape) + self.shape = (300000, 240, 200) + self.expected_chunks = _optimum_chunksize(self.shape, self.shape) def _make(self, chunksizes): cf_data = mock.Mock(_FillValue=None) @@ -55,15 +55,16 @@ def test_cf_data_type(self): self.assertIsInstance(lazy_data, dask_array) def test_cf_data_chunks(self): - chunks = [1, 12, 100] + chunks = [2500, 240, 200] cf_var = self._make(chunks) lazy_data = _get_cf_var_data(cf_var, self.filename) lazy_data_chunks = [c[0] for c in lazy_data.chunks] - self.assertArrayEqual(chunks, lazy_data_chunks) + expected_chunks = _optimum_chunksize(chunks, self.shape) + self.assertArrayEqual(lazy_data_chunks, expected_chunks) def test_cf_data_no_chunks(self): # No chunks means chunks are calculated from the array's shape by - # `iris._lazy_data._limited_shape()`. + # `iris._lazy_data._optimum_chunksize()`. chunks = None cf_var = self._make(chunks) lazy_data = _get_cf_var_data(cf_var, self.filename) diff --git a/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py b/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py index 7acb0b6284..641283ef0d 100644 --- a/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py +++ b/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py @@ -1,4 +1,4 @@ -# (C) British Crown Copyright 2017 - 2018, Met Office +# (C) British Crown Copyright 2017 - 2019, Met Office # # This file is part of Iris. # @@ -24,17 +24,17 @@ import iris.tests as tests import dask.array as da +import dask.config import numpy as np import numpy.ma as ma -from iris._lazy_data import as_lazy_data, _MAX_CHUNK_SIZE, _limited_shape +from iris._lazy_data import as_lazy_data, _optimum_chunksize from iris.tests import mock class Test_as_lazy_data(tests.IrisTest): def test_lazy(self): - data = da.from_array(np.arange(24).reshape((2, 3, 4)), - chunks=_MAX_CHUNK_SIZE) + data = da.from_array(np.arange(24).reshape((2, 3, 4)), chunks='auto') result = as_lazy_data(data) self.assertIsInstance(result, da.core.Array) @@ -50,17 +50,10 @@ def test_masked(self): def test_non_default_chunks(self): data = np.arange(24) - chunks = 12 + chunks = (12,) lazy_data = as_lazy_data(data, chunks=chunks) result, = np.unique(lazy_data.chunks) - self.assertEqual(result, chunks) - - def test_non_default_chunks__chunks_already_set(self): - chunks = 12 - data = da.from_array(np.arange(24), chunks=chunks) - lazy_data = as_lazy_data(data) - result, = np.unique(lazy_data.chunks) - self.assertEqual(result, chunks) + self.assertEqual(result, 24) def test_with_masked_constant(self): masked_data = ma.masked_array([8], mask=True) @@ -68,6 +61,11 @@ def test_with_masked_constant(self): result = as_lazy_data(masked_constant) self.assertIsInstance(result, da.core.Array) + +class Test__optimised_chunks(tests.IrisTest): + # Stable, known chunksize for testing. + FIXED_CHUNKSIZE_LIMIT = 1024 * 1024 * 64 + @staticmethod def _dummydata(shape): return mock.Mock(spec=da.core.Array, @@ -75,7 +73,7 @@ def _dummydata(shape): shape=shape) def test_chunk_size_limiting(self): - # Check the default chunksizes for large data. + # Check default chunksizes for large data (with a known size limit). given_shapes_and_resulting_chunks = [ ((16, 1024, 1024), (16, 1024, 1024)), # largest unmodified ((17, 1011, 1022), (8, 1011, 1022)), @@ -84,29 +82,87 @@ def test_chunk_size_limiting(self): ((17, 1, 1011, 1022), (8, 1, 1011, 1022)), ((11, 2, 1011, 1022), (5, 2, 1011, 1022)) ] - err_fmt = 'Result of reducing shape {} was {}, expected {}' + err_fmt = 'Result of optimising chunks {} was {}, expected {}' for (shape, expected) in given_shapes_and_resulting_chunks: - chunks = _limited_shape(shape) + chunks = _optimum_chunksize(shape, shape, + limit=self.FIXED_CHUNKSIZE_LIMIT) msg = err_fmt.format(shape, chunks, expected) self.assertEqual(chunks, expected, msg) + def test_chunk_size_expanding(self): + # Check the expansion of small chunks, (with a known size limit). + given_shapes_and_resulting_chunks = [ + ((1, 100, 100), (16, 100, 100), (16, 100, 100)), + ((1, 100, 100), (5000, 100, 100), (1667, 100, 100)), + ((3, 300, 200), (10000, 3000, 2000), (3, 1500, 2000)), + ((3, 300, 200), (10000, 300, 2000), (27, 300, 2000)), + ((3, 300, 200), (8, 300, 2000), (8, 300, 2000)), + ((3, 300, 200), (117, 300, 1000), (39, 300, 1000)), + ] + err_fmt = 'Result of optimising shape={};chunks={} was {}, expected {}' + for (shape, fullshape, expected) in given_shapes_and_resulting_chunks: + chunks = _optimum_chunksize(chunks=shape, shape=fullshape, + limit=self.FIXED_CHUNKSIZE_LIMIT) + msg = err_fmt.format(fullshape, shape, chunks, expected) + self.assertEqual(chunks, expected, msg) + + def test_chunk_expanding_equal_division(self): + # Check that expansion chooses equal chunk sizes as far as possible. + + # Table of test cases: + # (input-chunkshape, full-shape, size-limit, result-chunkshape) + testcases_chunksin_fullshape_limit_result = [ + ((4,), (12,), 15, (12,)), # gives a single chunk, of size 12 + ((4,), (13,), 15, (8,)), # chooses chunks of 8+5, better than 12+1 + ((4,), (16,), 15, (8,)), # 8+8 is better than 12+4; 16 is too big. + ((4,), (96,), 15, (12,)), # 12 is largest 'allowed' + ((4,), (96,), 31, (24,)), # 28 doesn't divide 96 so neatly, + # A multi-dimensional case, where trailing dims are 'filled'. + ((4, 5, 100), (25, 10, 200), 16*2000, (16, 10, 200)), + # Equivalent case with additional initial dimensions. + ((1, 1, 4, 5, 100), (3, 5, 25, 10, 200), 16*2000, + (1, 1, 16, 10, 200)), # effectively the same as the previous. + ] + err_fmt_main = ('Main chunks result of optimising ' + 'chunks={},shape={},limit={} ' + 'was {}, expected {}') + for (chunks, shape, limit, expected_result) in \ + testcases_chunksin_fullshape_limit_result: + result = _optimum_chunksize(chunks=chunks, + shape=shape, + limit=limit, + dtype=np.dtype('b1')) + msg = err_fmt_main.format(chunks, shape, limit, + result, expected_result) + self.assertEqual(result, expected_result, msg) + + def test_default_chunksize(self): + # Check that the "ideal" chunksize is taken from the dask config. + with dask.config.set({'array.chunk-size': '20b'}): + chunks = _optimum_chunksize((1, 8), + shape=(400, 20), + dtype=np.dtype('f4')) + self.assertEqual(chunks, (1, 4)) + def test_default_chunks_limiting(self): - # Check that chunking is limited when no specific 'chunks' given. - limitcall_patch = self.patch('iris._lazy_data._limited_shape') + # Check that chunking is still controlled when no specific 'chunks' + # is passed. + limitcall_patch = self.patch('iris._lazy_data._optimum_chunksize') test_shape = (3, 2, 4) data = self._dummydata(test_shape) as_lazy_data(data) self.assertEqual(limitcall_patch.call_args_list, - [mock.call(test_shape)]) - - def test_large_specific_chunk_passthrough(self): - # Check that even a too-large specific 'chunks' arg is honoured. - limitcall_patch = self.patch('iris._lazy_data._limited_shape') - huge_test_shape = (1001, 1002, 1003, 1004) - data = self._dummydata(huge_test_shape) - result = as_lazy_data(data, chunks=huge_test_shape) - self.assertEqual(limitcall_patch.call_args_list, []) - self.assertEqual(result.shape, huge_test_shape) + [mock.call(list(test_shape), + shape=test_shape, + dtype=np.dtype('f4'))]) + + def test_shapeless_data(self): + # Check that chunk optimisation is skipped if shape contains a zero. + limitcall_patch = self.patch('iris._lazy_data._optimum_chunksize') + test_shape = (2, 1, 0, 2) + data = self._dummydata(test_shape) + as_lazy_data(data, chunks=test_shape) + self.assertFalse(limitcall_patch.called) if __name__ == '__main__': diff --git a/lib/iris/tests/unit/lazy_data/test_is_lazy_data.py b/lib/iris/tests/unit/lazy_data/test_is_lazy_data.py index 40ef6213ff..14148cfb5a 100644 --- a/lib/iris/tests/unit/lazy_data/test_is_lazy_data.py +++ b/lib/iris/tests/unit/lazy_data/test_is_lazy_data.py @@ -1,4 +1,4 @@ -# (C) British Crown Copyright 2017, Met Office +# (C) British Crown Copyright 2017 - 2019, Met Office # # This file is part of Iris. # @@ -26,13 +26,13 @@ import dask.array as da import numpy as np -from iris._lazy_data import is_lazy_data, _MAX_CHUNK_SIZE +from iris._lazy_data import is_lazy_data class Test_is_lazy_data(tests.IrisTest): def test_lazy(self): values = np.arange(30).reshape((2, 5, 3)) - lazy_array = da.from_array(values, chunks=_MAX_CHUNK_SIZE) + lazy_array = da.from_array(values, chunks='auto') self.assertTrue(is_lazy_data(lazy_array)) def test_real(self):