Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MultiScene writer handling of multiple delayed objects #1639

Merged
merged 8 commits into from
May 17, 2021
5 changes: 3 additions & 2 deletions satpy/multiscene.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from satpy.dataset import DataID, combine_metadata
from satpy.scene import Scene
from satpy.writers import get_enhanced_image
from satpy.writers import get_enhanced_image, split_results

try:
import imageio
Expand Down Expand Up @@ -345,7 +345,8 @@ def load_data(q):

for scene in scenes_iter:
delayed = scene.save_datasets(compute=False, **kwargs)
if isinstance(delayed, (list, tuple)) and len(delayed) == 2:
sources, targets, _ = split_results(delayed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the _ be overwriting delayed? This way, in the future, if the MultiScene can handle source/target combinations and delayed objects the code won't have to be changed. Right?

Copy link
Collaborator Author

@BENR0 BENR0 May 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I thought about it and I think it should not be overwritten since the result of split_results is only needed for the check of sources. Currently delayed is allowed to be a target/ list of targets (e.g. simple image writer) or a delayed/ list of delayeds (e.g. CF writer). So if delayed got overwritten some additional code to either hand targets or delayed to client.compute in line 355 would be needed. So for the check only sources is needed and targets could also be _.

One thing I noticed was that it is not well documented what each writer returns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Targets should never be returned without a source. They are source + target pairs. The simple image writer should return delayed objects. The geotiff writer returns source (dask array) + target (output file-like object). If you have another example, then I still propose that _ be replaced with delayed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see now I got fooled by the test which returned targets without sources from split_results for the simple image writer because the mock did not pass as a delayed object. I changed the code to overwrite delayed now and also fixed the test. Additionally I added a test for writers with source/target combinations to fail.

if len(sources) > 0:
# TODO Make this work for (source, target) datasets
# given a target, source combination
raise NotImplementedError("Distributed save_datasets does not support writers "
Expand Down
48 changes: 27 additions & 21 deletions satpy/writers/cf_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,18 +658,6 @@ def save_datasets(self, datasets, filename=None, groups=None, header_attrs=None,
"""
logger.info('Saving datasets to NetCDF4/CF.')

if groups is None:
# Write all datasets to the file root without creating a group
groups_ = {None: datasets}
else:
# User specified a group assignment using dataset names. Collect the corresponding datasets.
groups_ = defaultdict(list)
for dataset in datasets:
for group_name, group_members in groups.items():
if dataset.attrs['name'] in group_members:
groups_[group_name].append(dataset)
break

if compression is None:
compression = {'zlib': True}

Expand All @@ -689,11 +677,6 @@ def save_datasets(self, datasets, filename=None, groups=None, header_attrs=None,
else:
root.attrs['history'] = _history_create

if groups is None:
# Groups are not CF-1.7 compliant
if 'Conventions' not in root.attrs:
root.attrs['Conventions'] = CF_VERSION

# Remove satpy-specific kwargs
to_netcdf_kwargs = copy.deepcopy(to_netcdf_kwargs) # may contain dictionaries (encoding)
satpy_kwargs = ['overlay', 'decorate', 'config_files']
Expand All @@ -703,7 +686,23 @@ def save_datasets(self, datasets, filename=None, groups=None, header_attrs=None,
init_nc_kwargs = to_netcdf_kwargs.copy()
init_nc_kwargs.pop('encoding', None) # No variables to be encoded at this point
init_nc_kwargs.pop('unlimited_dims', None)
written = [root.to_netcdf(filename, engine=engine, mode='w', **init_nc_kwargs)]

if groups is None:
# Groups are not CF-1.7 compliant
if 'Conventions' not in root.attrs:
root.attrs['Conventions'] = CF_VERSION
# Write all datasets to the file root without creating a group
groups_ = {None: datasets}
else:
# User specified a group assignment using dataset names. Collect the corresponding datasets.
groups_ = defaultdict(list)
for dataset in datasets:
for group_name, group_members in groups.items():
if dataset.attrs['name'] in group_members:
groups_[group_name].append(dataset)
break

written = [root.to_netcdf(filename, engine=engine, mode='w', **init_nc_kwargs)]

# Write datasets to groups (appending to the file; group=None means no group)
for group_name, group_datasets in groups_.items():
Expand All @@ -722,7 +721,14 @@ def save_datasets(self, datasets, filename=None, groups=None, header_attrs=None,
logger.warning('No time dimension in datasets{}, skipping time bounds creation.'.format(grp_str))

encoding, other_to_netcdf_kwargs = update_encoding(dataset, to_netcdf_kwargs)
res = dataset.to_netcdf(filename, engine=engine, group=group_name, mode='a', encoding=encoding,
**other_to_netcdf_kwargs)
written.append(res)

if groups is None:
dataset = root.merge(dataset)
written = dataset.to_netcdf(filename, engine=engine, mode='w', encoding=encoding,
**other_to_netcdf_kwargs)
else:
res = dataset.to_netcdf(filename, engine=engine, group=group_name, mode='a', encoding=encoding,
**other_to_netcdf_kwargs)
written.append(res)

return written