diff --git a/lib/iris/fileformats/netcdf/saver.py b/lib/iris/fileformats/netcdf/saver.py index 650c5e3338..01b095a6d1 100644 --- a/lib/iris/fileformats/netcdf/saver.py +++ b/lib/iris/fileformats/netcdf/saver.py @@ -23,7 +23,10 @@ import warnings import cf_units +import dask import dask.array as da +from dask.utils import SerializableLock +import filelock import netCDF4 import numpy as np import numpy.ma as ma @@ -490,10 +493,45 @@ def __setitem__(self, keys, arr): MESH_ELEMENTS = ("node", "edge", "face") +class DeferredSaveWrapper: + """ + An object which mimics the data access of a netCDF4.Variable, and can be written to. + It encapsulates the netcdf file and variable which are actually to be written to. + This opens the file each time, to enable writing the data chunk, then closes it. + TODO: could be improved with a caching scheme, but this just about works. + """ + + def __init__(self, filepath, cf_var, lockfile_path): + self.path = filepath + self.varname = cf_var.name + self.lockfile_path = lockfile_path + + def __setitem__(self, keys, array_data): + # Write to the variable. + # First acquire a file-specific lock + # Importantly, in working via the file-system, this is common to all workers, + # even when using processes or distributed. + lock = filelock.FileLock(self.lockfile_path) + lock.acquire() + # Now re-open the file for writing + write to the specific file variable. + dataset = None + try: + dataset = netCDF4.Dataset(self.path, "r+") + var = dataset.variables[self.varname] + var[keys] = array_data + finally: + if dataset: + dataset.close() + lock.release() + + def __repr__(self): + return f"<{self.__class__.__name__} path={self.path!r} var={self.varname!r}>" + + class Saver: """A manager for saving netcdf files.""" - def __init__(self, filename, netcdf_format): + def __init__(self, filename, netcdf_format, compute=True): """ A manager for saving netcdf files. @@ -506,6 +544,15 @@ def __init__(self, filename, netcdf_format): Underlying netCDF file format, one of 'NETCDF4', 'NETCDF4_CLASSIC', 'NETCDF3_CLASSIC' or 'NETCDF3_64BIT'. Default is 'NETCDF4' format. + * compute (bool): + If True, the Saver performs normal 'synchronous' data writes, where data + is streamed directly into file variables during the save operation. + If False, the file is created as normal, but computation and streaming of + any lazy array content is instead deferred to :class:`dask.delayed` objects, + which are held in a list in the saver 'delayed_writes' property. + The relavant file variables are created empty, and the write can + subsequently be completed by computing the 'save.deferred_writes'. + Returns: None. @@ -542,18 +589,27 @@ def __init__(self, filename, netcdf_format): self._mesh_dims = {} #: A dictionary, mapping formula terms to owner cf variable name self._formula_terms_cache = {} + #: Whether lazy saving. + self.lazy_saves = not compute + #: A list of deferred writes for lazy saving : each is a (source, target) pair + self.deferred_writes = [] + #: Target filepath + self.filepath = os.path.abspath(filename) + #: Target lockfile path + self._lockfile_path = self.filepath + ".lock" #: NetCDF dataset + self._dataset = None try: self._dataset = netCDF4.Dataset( - filename, mode="w", format=netcdf_format + self.filepath, mode="w", format=netcdf_format ) except RuntimeError: - dir_name = os.path.dirname(filename) + dir_name = os.path.dirname(self.filepath) if not os.path.isdir(dir_name): msg = "No such file or directory: {}".format(dir_name) raise IOError(msg) if not os.access(dir_name, os.R_OK | os.W_OK): - msg = "Permission denied: {}".format(filename) + msg = "Permission denied: {}".format(self.filepath) raise IOError(msg) else: raise @@ -2442,8 +2498,7 @@ def _increment_name(self, varname): return "{}_{}".format(varname, num) - @staticmethod - def _lazy_stream_data(data, fill_value, fill_warn, cf_var): + def _lazy_stream_data(self, data, fill_value, fill_warn, cf_var): if hasattr(data, "shape") and data.shape == (1,) + cf_var.shape: # (Don't do this check for string data). # Reduce dimensionality where the data array has an extra dimension @@ -2453,13 +2508,32 @@ def _lazy_stream_data(data, fill_value, fill_warn, cf_var): data = data.squeeze(axis=0) if is_lazy_data(data): + if self.lazy_saves: + # deferred lazy streaming + def store(data, cf_var, fill_value): + # Create a data-writeable object that we can stream into, which + # encapsulates the file to be opened + variable to be written. + writeable_var_wrapper = DeferredSaveWrapper( + self.filepath, cf_var, self._lockfile_path + ) + # Add to the list of deferred writes, used in _deferred_save(). + self.deferred_writes.append((data, writeable_var_wrapper)) + # NOTE: in this case, no checking of fill-value violations so just + # return dummy values for this. + # TODO: just for now -- can probably make this work later + is_masked, contains_value = False, False + return is_masked, contains_value - def store(data, cf_var, fill_value): - # Store lazy data and check whether it is masked and contains - # the fill value - target = _FillValueMaskCheckAndStoreTarget(cf_var, fill_value) - da.store([data], [target]) - return target.is_masked, target.contains_value + else: + # Immediate streaming store : check mask+fill as we go. + def store(data, cf_var, fill_value): + # Store lazy data and check whether it is masked and contains + # the fill value + target = _FillValueMaskCheckAndStoreTarget( + cf_var, fill_value + ) + da.store([data], [target]) + return target.is_masked, target.contains_value else: @@ -2509,6 +2583,39 @@ def store(data, cf_var, fill_value): ) warnings.warn(msg.format(cf_var.name, fill_value)) + def _deferred_save(self): + """ + Create a 'delayed' to trigger file completion for lazy saves. + + This contains all the deferred writes, which complete the file by filling out + the data of variables initially created empty. + + """ + # Create a lock to satisfy the da.store call. + # We need a serialisable lock for scheduling with processes or distributed. + # See : https://github.com/dask/distributed/issues/780 + # However, this does *not* imply safe access for file writing in parallel. + # For that, DeferredSaveWrapper uses a filelock as well. + lock = SerializableLock() + + # Create a single delayed da.store operation to complete the file. + sources, targets = zip(*self.deferred_writes) + result = da.store(sources, targets, compute=False, lock=lock) + + # Wrap that in an extra operation that follows it by deleting the lockfile. + @dask.delayed + def postsave_remove_lockfile(store_op, lock_path): + if os.path.exists(lock_path): + try: + os.unlink(lock_path) + except Exception as e: + msg = f'Could not remove lockfile "{lock_path}". Error:\n{e}' + raise Exception(msg) + + result = postsave_remove_lockfile(result, self._lockfile_path) + + return result + def save( cube, @@ -2526,6 +2633,7 @@ def save( least_significant_digit=None, packing=None, fill_value=None, + compute=True, ): """ Save cube(s) to a netCDF file, given the cube and the filename. @@ -2648,6 +2756,14 @@ def save( `:class:`iris.cube.CubeList`, or a single element, and each element of this argument will be applied to each cube separately. + * compute (bool): + When False, create the output file but defer writing any lazy array content to + its variables, such as (lazy) data and aux-coords points and bounds. + Instead return a class:`dask.delayed` which, when computed, will compute all + the lazy content and stream it to complete the file. + Several such data saves can be performed in parallel, by passing a list of them + into a :func:`dask.compute` call. + Returns: None. @@ -2748,7 +2864,7 @@ def is_valid_packspec(p): raise ValueError(msg) # Initialise Manager for saving - with Saver(filename, netcdf_format) as sman: + with Saver(filename, netcdf_format, compute=compute) as sman: # Iterate through the cubelist. for cube, packspec, fill_value in zip(cubes, packspecs, fill_values): sman.write( @@ -2793,3 +2909,10 @@ def is_valid_packspec(p): # Add conventions attribute. sman.update_global_attributes(Conventions=conventions) + + if compute: + result = None + else: + result = sman._deferred_save() + + return result diff --git a/lib/iris/io/__init__.py b/lib/iris/io/__init__.py index 4659f70ae3..f79252ebc7 100644 --- a/lib/iris/io/__init__.py +++ b/lib/iris/io/__init__.py @@ -444,7 +444,7 @@ def save(source, target, saver=None, **kwargs): # Single cube? if isinstance(source, Cube): - saver(source, target, **kwargs) + result = saver(source, target, **kwargs) # CubeList or sequence of cubes? elif isinstance(source, CubeList) or ( @@ -463,13 +463,18 @@ def save(source, target, saver=None, **kwargs): # Force append=True for the tail cubes. Don't modify the incoming # kwargs. kwargs = kwargs.copy() + result = [] for i, cube in enumerate(source): if i != 0: kwargs["append"] = True saver(cube, target, **kwargs) + + result = None # Netcdf saver. else: - saver(source, target, **kwargs) + result = saver(source, target, **kwargs) else: raise ValueError("Cannot save; non Cube found in source") + + return result