Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk control #3361

Merged
merged 19 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 93 additions & 24 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2017 - 2018, Met Office
# (C) British Crown Copyright 2017 - 2019, Met Office
#
# This file is part of Iris.
#
Expand All @@ -23,12 +23,14 @@
from __future__ import (absolute_import, division, print_function)
from six.moves import (filter, input, map, range, zip) # noqa

from collections import Iterable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pp-mo See #3320 for context.

Currently in Python3.7, you get the following DeprecationWarning:

>>> from collections import Iterable
__main__:1: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working

Could you adopt the following pattern:

try:  # Python 3
    from collections.abc import Iterable
except ImportError:  # Python 2
    from collections import Iterable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, will do !

from functools import wraps

import dask
import dask.array as da
import dask.context
from dask.local import get_sync as dget_sync
import dask.array.core
import dask.config

import numpy as np
import numpy.ma as ma

Expand Down Expand Up @@ -58,26 +60,90 @@ def is_lazy_data(data):
return result


# A magic value, chosen to minimise chunk creation time and chunk processing
# time within dask.
_MAX_CHUNK_SIZE = 8 * 1024 * 1024 * 2
def _optimum_chunksize(chunks, shape,
limit=None,
dtype=np.dtype('f4')):
"""
Reduce or increase an initial chunk shape to get close to a chosen ideal
size, while prioritising the splitting of the earlier (outer) dimensions
and keeping intact the later (inner) ones.

Args:

* chunks (tuple of int, or None):
Pre-existing chunk shape of the target data : None if unknown.
* shape (tuple of int):
The full array shape of the target data.
* limit (int):
The 'ideal' target chunk size, in bytes. Default from dask.config.
* dtype (np.dtype):
Numpy dtype of target data.

Returns:
* chunk (tuple of int):
The proposed shape of one full chunk.

.. note::
The purpose of this is very similar to
`dask.array.core.normalize_chunks`, when called as
`(chunks='auto', shape, dtype=dtype, previous_chunks=chunks, ...)`.
Except, the operation here is optimised specifically for a 'c-like'
dimension order, i.e. outer dimensions first, as for netcdf variables.
So if, in future, this policy can be implemented in dask, then we would
prefer to replace this function with a call to that one.
Accordingly, the arguments roughly match 'normalize_chunks', except
that we don't support the alternative argument forms of that routine.
The return value, however, is a single 'full chunk', rather than a
complete chunking scheme : so an equivalent code usage could be
"chunks = [c[0] for c in normalise_chunks('auto', ...)]".

"""
# Return chunks unchanged, for types of invocation we don't comprehend.
if (any(elem <= 0 for elem in shape) or
not isinstance(chunks, Iterable) or
len(chunks) != len(shape)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have explicit tests for this check.
I'm not sure how thorough we want to be with testing this. It does seem like a bit of overkill to add tests for
_optimum_chunksize((200,300), (1,200,300))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an attempt to allow alternative, dask-type chunks arguments in 'as_lazy_data'.
Obviously we don't use anything like that at present. The only immediate need is to skip shapes with a zero in them (see comment).

I now see this is wrong anyway, as that initial test clause assumes that shape is iterable !
I will simplify to just what we need, and add a testcase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... after some thought, I have removed this to the caller + documented the behaviour there.

# Don't modify chunks for special values like -1, (0,), 'auto',
# or if shape contains 0 or -1 (like raw landsea-mask data proxies).
return chunks

# Set the chunksize limit.
if limit is None:
# Fetch the default 'optimal' chunksize from the dask config.
limit = dask.config.get('array.chunk-size')
# Convert to bytes
limit = da.core.parse_bytes(limit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you chose to get parse_bytes from da.core rather than from dask.utils?

Copy link
Member Author

@pp-mo pp-mo Aug 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I copied this from the dask sourcecode somewhere.
I will fix it.


point_size_limit = limit / dtype.itemsize

# Create result chunks, starting with a copy of the input.
result = list(chunks)
if shape is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would shape be None? I don't think we should be allowing for shape=None. you also iterate through shape on line 105

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was trying to mimic the API of dask.array.core.normalize_chunks, in case we can use that in the future.
Actually I haven't achieved that, and we never use this option, so I will remove it.

shape = result[:]

if np.prod(result) < point_size_limit:
# If size is less than maximum, expand the chunks, multiplying later
# (i.e. inner) dims first.
i_expand = len(shape) - 1
while np.prod(result) < point_size_limit and i_expand >= 0:
factor = np.floor(point_size_limit * 1.0 / np.prod(result))
new_dim = result[i_expand] * int(factor)
# Clip to dim size : N.B. means it cannot exceed the original dims.
if new_dim > shape[i_expand]:
new_dim = shape[i_expand]
result[i_expand] = new_dim
i_expand -= 1
else:
# Similarly, reduce if too big, reducing earlier (outer) dims first.
i_reduce = 0
while np.prod(result) > point_size_limit:
factor = np.ceil(np.prod(result) / point_size_limit)
new_dim = int(result[i_reduce] / factor)
if new_dim < 1:
new_dim = 1
result[i_reduce] = new_dim
i_reduce += 1

def _limited_shape(shape):
# Reduce a shape to less than a default overall number-of-points, reducing
# earlier dimensions preferentially.
# Note: this is only a heuristic, assuming that earlier dimensions are
# 'outer' storage dimensions -- not *always* true, even for NetCDF data.
shape = list(shape)
i_reduce = 0
while np.prod(shape) > _MAX_CHUNK_SIZE:
factor = np.ceil(np.prod(shape) / _MAX_CHUNK_SIZE)
new_dim = int(shape[i_reduce] / factor)
if new_dim < 1:
new_dim = 1
shape[i_reduce] = new_dim
i_reduce += 1
return tuple(shape)
return tuple(result)


def as_lazy_data(data, chunks=None, asarray=False):
Expand Down Expand Up @@ -106,9 +172,12 @@ def as_lazy_data(data, chunks=None, asarray=False):

"""
if chunks is None:
# Default to the shape of the wrapped array-like,
# but reduce it if larger than a default maximum size.
chunks = _limited_shape(data.shape)
# No existing chunks : Make a chunk the shape of the entire input array
# (but we will subdivide it if too big).
chunks = list(data.shape)

# Expand or reduce the basic chunk shape to an optimum size.
chunks = _optimum_chunksize(chunks, shape=data.shape, dtype=data.dtype)

if isinstance(data, ma.core.MaskedConstant):
data = ma.masked_array(data.data, mask=data.mask)
Expand Down
4 changes: 2 additions & 2 deletions lib/iris/fileformats/netcdf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2010 - 2018, Met Office
# (C) British Crown Copyright 2010 - 2019, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -511,7 +511,7 @@ def _get_cf_var_data(cf_var, filename):
proxy = NetCDFDataProxy(cf_var.shape, dtype, filename, cf_var.cf_name,
fill_value)
chunks = cf_var.cf_data.chunking()
# Chunks can be an iterable, None, or `'contiguous'`.
# Chunks can be an iterable, or `'contiguous'`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this change. You have removed None and yet two lines down it sets

chunks = None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two different sets of conventions here.

  • The 'chunks' value that nc.Variable.data_chunking returns can not be None, I believe.
    • I don't know quite why it ever said that : it just seems wrong to me.
  • The 'chunks' keyword we pass to as_lazy_data can be None. And it absolutely can't be 'contiguous', which is why we are converting here.

I will try to clarify in the comments.

if chunks == 'contiguous':
chunks = None
return as_lazy_data(proxy, chunks=chunks)
Expand Down
15 changes: 8 additions & 7 deletions lib/iris/tests/unit/fileformats/netcdf/test__get_cf_var_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2018, Met Office
# (C) British Crown Copyright 2019, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -26,7 +26,7 @@
from dask.array import Array as dask_array
import numpy as np

from iris._lazy_data import _limited_shape
from iris._lazy_data import _optimum_chunksize
import iris.fileformats.cf
from iris.fileformats.netcdf import _get_cf_var_data
from iris.tests import mock
Expand All @@ -35,8 +35,8 @@
class Test__get_cf_var_data(tests.IrisTest):
def setUp(self):
self.filename = 'DUMMY'
self.shape = (3, 240, 200)
self.expected_chunks = _limited_shape(self.shape)
self.shape = (300000, 240, 200)
self.expected_chunks = _optimum_chunksize(self.shape, self.shape)

def _make(self, chunksizes):
cf_data = mock.Mock(_FillValue=None)
Expand All @@ -55,15 +55,16 @@ def test_cf_data_type(self):
self.assertIsInstance(lazy_data, dask_array)

def test_cf_data_chunks(self):
chunks = [1, 12, 100]
chunks = [2500, 240, 200]
cf_var = self._make(chunks)
lazy_data = _get_cf_var_data(cf_var, self.filename)
lazy_data_chunks = [c[0] for c in lazy_data.chunks]
self.assertArrayEqual(chunks, lazy_data_chunks)
expected_chunks = _optimum_chunksize(chunks, self.shape)
self.assertArrayEqual(lazy_data_chunks, expected_chunks)

def test_cf_data_no_chunks(self):
# No chunks means chunks are calculated from the array's shape by
# `iris._lazy_data._limited_shape()`.
# `iris._lazy_data._optimum_chunksize()`.
chunks = None
cf_var = self._make(chunks)
lazy_data = _get_cf_var_data(cf_var, self.filename)
Expand Down
71 changes: 50 additions & 21 deletions lib/iris/tests/unit/lazy_data/test_as_lazy_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2017 - 2018, Met Office
# (C) British Crown Copyright 2017 - 2019, Met Office
#
# This file is part of Iris.
#
Expand All @@ -24,17 +24,17 @@
import iris.tests as tests

import dask.array as da
import dask.config
import numpy as np
import numpy.ma as ma

from iris._lazy_data import as_lazy_data, _MAX_CHUNK_SIZE, _limited_shape
from iris._lazy_data import as_lazy_data, _optimum_chunksize
from iris.tests import mock


class Test_as_lazy_data(tests.IrisTest):
def test_lazy(self):
data = da.from_array(np.arange(24).reshape((2, 3, 4)),
chunks=_MAX_CHUNK_SIZE)
data = da.from_array(np.arange(24).reshape((2, 3, 4)), chunks='auto')
result = as_lazy_data(data)
self.assertIsInstance(result, da.core.Array)

Expand All @@ -50,32 +50,30 @@ def test_masked(self):

def test_non_default_chunks(self):
data = np.arange(24)
chunks = 12
chunks = (12,)
lazy_data = as_lazy_data(data, chunks=chunks)
result, = np.unique(lazy_data.chunks)
self.assertEqual(result, chunks)

def test_non_default_chunks__chunks_already_set(self):
chunks = 12
data = da.from_array(np.arange(24), chunks=chunks)
lazy_data = as_lazy_data(data)
result, = np.unique(lazy_data.chunks)
self.assertEqual(result, chunks)
self.assertEqual(result, 24)

def test_with_masked_constant(self):
masked_data = ma.masked_array([8], mask=True)
masked_constant = masked_data[0]
result = as_lazy_data(masked_constant)
self.assertIsInstance(result, da.core.Array)


class Test__optimised_chunks(tests.IrisTest):
# Stable, known chunksize for testing.
FIXED_CHUNKSIZE_LIMIT = 1024 * 1024 * 64

@staticmethod
def _dummydata(shape):
return mock.Mock(spec=da.core.Array,
dtype=np.dtype('f4'),
shape=shape)

def test_chunk_size_limiting(self):
# Check the default chunksizes for large data.
# Check default chunksizes for large data (with a known size limit).
given_shapes_and_resulting_chunks = [
((16, 1024, 1024), (16, 1024, 1024)), # largest unmodified
((17, 1011, 1022), (8, 1011, 1022)),
Expand All @@ -84,28 +82,59 @@ def test_chunk_size_limiting(self):
((17, 1, 1011, 1022), (8, 1, 1011, 1022)),
((11, 2, 1011, 1022), (5, 2, 1011, 1022))
]
err_fmt = 'Result of reducing shape {} was {}, expected {}'
err_fmt = 'Result of optimising chunks {} was {}, expected {}'
for (shape, expected) in given_shapes_and_resulting_chunks:
chunks = _limited_shape(shape)
chunks = _optimum_chunksize(shape, shape,
limit=self.FIXED_CHUNKSIZE_LIMIT)
msg = err_fmt.format(shape, chunks, expected)
self.assertEqual(chunks, expected, msg)

def test_chunk_size_expanding(self):
# Check the expansion of small chunks, (with a known size limit).
given_shapes_and_resulting_chunks = [
((1, 100, 100), (16, 100, 100), (16, 100, 100)),
((1, 100, 100), (5000, 100, 100), (1677, 100, 100)),
((3, 300, 200), (10000, 3000, 2000), (3, 2700, 2000)),
((3, 300, 200), (10000, 300, 2000), (27, 300, 2000)),
((3, 300, 200), (8, 300, 2000), (8, 300, 2000)),
]
err_fmt = 'Result of optimising shape={};chunks={} was {}, expected {}'
for (shape, fullshape, expected) in given_shapes_and_resulting_chunks:
chunks = _optimum_chunksize(chunks=shape, shape=fullshape,
limit=self.FIXED_CHUNKSIZE_LIMIT)
msg = err_fmt.format(fullshape, shape, chunks, expected)
self.assertEqual(chunks, expected, msg)

def test_default_chunksize(self):
# Check that the "ideal" chunksize is taken from the dask config.
with dask.config.set({'array.chunk-size': '20b'}):
chunks = _optimum_chunksize((1, 8),
shape=(400, 20),
dtype=np.dtype('f4'))
self.assertEqual(chunks, (1, 4))

def test_default_chunks_limiting(self):
# Check that chunking is limited when no specific 'chunks' given.
limitcall_patch = self.patch('iris._lazy_data._limited_shape')
# Check that chunking is still controlled when no specific 'chunks'
# is passed.
limitcall_patch = self.patch('iris._lazy_data._optimum_chunksize')
test_shape = (3, 2, 4)
data = self._dummydata(test_shape)
as_lazy_data(data)
self.assertEqual(limitcall_patch.call_args_list,
[mock.call(test_shape)])
[mock.call(list(test_shape),
shape=test_shape,
dtype=np.dtype('f4'))])

def test_large_specific_chunk_passthrough(self):
# Check that even a too-large specific 'chunks' arg is honoured.
limitcall_patch = self.patch('iris._lazy_data._limited_shape')
limitcall_patch = self.patch('iris._lazy_data._optimum_chunksize')
huge_test_shape = (1001, 1002, 1003, 1004)
data = self._dummydata(huge_test_shape)
result = as_lazy_data(data, chunks=huge_test_shape)
self.assertEqual(limitcall_patch.call_args_list, [])
self.assertEqual(limitcall_patch.call_args_list,
[mock.call(huge_test_shape,
shape=huge_test_shape,
dtype=np.dtype('f4'))])
self.assertEqual(result.shape, huge_test_shape)


Expand Down
6 changes: 3 additions & 3 deletions lib/iris/tests/unit/lazy_data/test_is_lazy_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2017, Met Office
# (C) British Crown Copyright 2017 - 2019, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -26,13 +26,13 @@
import dask.array as da
import numpy as np

from iris._lazy_data import is_lazy_data, _MAX_CHUNK_SIZE
from iris._lazy_data import is_lazy_data


class Test_is_lazy_data(tests.IrisTest):
def test_lazy(self):
values = np.arange(30).reshape((2, 5, 3))
lazy_array = da.from_array(values, chunks=_MAX_CHUNK_SIZE)
lazy_array = da.from_array(values, chunks='auto')
self.assertTrue(is_lazy_data(lazy_array))

def test_real(self):
Expand Down