Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally cache small data variables and file handles #981

Merged
merged 20 commits into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion satpy/readers/fci_l1c_fdhsi.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ class using the :mod:`~satpy.Scene.load` method with the reader
def __init__(self, filename, filename_info, filetype_info):
"""Initialize file handler."""
super(FCIFDHSIFileHandler, self).__init__(filename, filename_info,
filetype_info)
filetype_info,
cache_var_size=10000,
cache_handle=True)
logger.debug('Reading: {}'.format(self.filename))
logger.debug('Start: {}'.format(self.start_time))
logger.debug('End: {}'.format(self.end_time))
Expand Down
119 changes: 101 additions & 18 deletions satpy/readers/netcdf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import netCDF4
import logging
import numpy as np
import xarray as xr
import dask.array as da

from satpy import CHUNK_SIZE
from satpy.readers.file_handlers import BaseFileHandler
Expand Down Expand Up @@ -49,19 +51,44 @@ class NetCDF4FileHandler(BaseFileHandler):

wrapper["/attr/platform_short_name"]

Note that loading datasets requires reopening the original file, but to
get just the shape of the dataset append "/shape" to the item string:
Note that loading datasets requires reopening the original file
(unless those datasets are cached, see below), but to get just the
shape of the dataset append "/shape" to the item string:

wrapper["group/subgroup/var_name/shape"]

If your file has many small data variables that are frequently accessed,
you may choose to cache some of them. You can do this by passing a number,
any variable smaller than this number in bytes will be read into RAM.
Warning, this part of the API is provisional and subject to change.

You may get an additional speedup by passing ``cache_handle=True``. This
will keep the netCDF4 dataset handles open throughout the lifetime of the
object, and instead of using `xarray.open_dataset` to open every data
variable, a dask array will be created "manually". This may be useful if
you have a dataset distributed over many files, such as for FCI. Note
that the coordinates will be missing in this case. If you use this option,
``xarray_kwargs`` will have no effect.

Args:
filename (str): File to read
filename_info (dict): Dictionary with filename information
filetype_info (dict): Dictionary with filetype information
auto_maskandscale (bool): Apply mask and scale factors
xarray_kwargs (dict): Addition arguments to `xarray.open_dataset`
cache_var_size (int): Cache variables smaller than this size.
cache_handle (bool): Keep files open for lifetime of filehandler.
"""

file_handle = None

def __init__(self, filename, filename_info, filetype_info,
auto_maskandscale=False, xarray_kwargs=None):
"""Initilize file handler."""
auto_maskandscale=False, xarray_kwargs=None,
cache_var_size=0, cache_handle=False):
super(NetCDF4FileHandler, self).__init__(
filename, filename_info, filetype_info)
self.file_content = {}
self.cached_file_content = {}
try:
file_handle = netCDF4.Dataset(self.filename, 'r')
except IOError:
Expand All @@ -75,11 +102,29 @@ def __init__(self, filename, filename_info, filetype_info,

self.collect_metadata("", file_handle)
self.collect_dimensions("", file_handle)
file_handle.close()
if cache_var_size > 0:
self.collect_cache_vars(
[varname for (varname, var)
in self.file_content.items()
if isinstance(var, netCDF4.Variable)
and isinstance(var.dtype, np.dtype) # vlen may be str
and var.size * var.dtype.itemsize < cache_var_size],
file_handle)
if cache_handle:
self.file_handle = file_handle
else:
file_handle.close()
self._xarray_kwargs = xarray_kwargs or {}
self._xarray_kwargs.setdefault('chunks', CHUNK_SIZE)
self._xarray_kwargs.setdefault('mask_and_scale', self.auto_maskandscale)

def __del__(self):
if self.file_handle is not None:
try:
self.file_handle.close()
except RuntimeError: # presumably closed already
pass

def _collect_attrs(self, name, obj):
"""Collect all the attributes for the provided file object."""
for key in obj.ncattrs():
Expand Down Expand Up @@ -113,10 +158,31 @@ def collect_dimensions(self, name, obj):
dim_name = "{}/dimension/{}".format(name, dim_name)
self.file_content[dim_name] = len(dim_obj)

def collect_cache_vars(self, cache_vars, obj):
"""Collect data variables for caching.

This method will collect some data variables and store them in RAM.
This may be useful if some small variables are frequently accessed,
to prevent needlessly frequently opening and closing the file, which
in case of xarray is associated with some overhead.

Should be called later than `collect_metadata`.

Args:
cache_vars (List[str]): Names of data variables to be cached.
obj (netCDF4.Dataset): Dataset object from which to read them.
"""
for var_name in cache_vars:
v = self.file_content[var_name]
self.cached_file_content[var_name] = xr.DataArray(
v[:], dims=v.dimensions, attrs=v.__dict__, name=v.name)

def __getitem__(self, key):
"""Get item for given key."""
val = self.file_content[key]
if isinstance(val, netCDF4.Variable):
if key in self.cached_file_content:
return self.cached_file_content[key]
# these datasets are closed and inaccessible when the file is
# closed, need to reopen
# TODO: Handle HDF4 versus NetCDF3 versus NetCDF4
Expand All @@ -125,21 +191,38 @@ def __getitem__(self, key):
group, key = parts
else:
group = None
with xr.open_dataset(self.filename, group=group,
**self._xarray_kwargs) as nc:
val = nc[key]
# Even though `chunks` is specified in the kwargs, xarray
# uses dask.arrays only for data variables that have at least
# one dimension; for zero-dimensional data variables (scalar),
# it uses its own lazy loading for scalars. When those are
# accessed after file closure, xarray reopens the file without
# closing it again. This will leave potentially many open file
# objects (which may in turn trigger a Segmentation Fault:
# https://github.com/pydata/xarray/issues/2954#issuecomment-491221266
if not val.chunks:
val.load()
if self.file_handle is not None:
val = self._get_var_from_filehandle(group, key)
else:
val = self._get_var_from_xr(group, key)
return val

def _get_var_from_xr(self, group, key):
with xr.open_dataset(self.filename, group=group,
**self._xarray_kwargs) as nc:
val = nc[key]
# Even though `chunks` is specified in the kwargs, xarray
# uses dask.arrays only for data variables that have at least
# one dimension; for zero-dimensional data variables (scalar),
# it uses its own lazy loading for scalars. When those are
# accessed after file closure, xarray reopens the file without
# closing it again. This will leave potentially many open file
# objects (which may in turn trigger a Segmentation Fault:
# https://github.com/pydata/xarray/issues/2954#issuecomment-491221266
if not val.chunks:
val.load()
return val

def _get_var_from_filehandle(self, group, key):
# Not getting coordinates as this is more work, therefore more
# overhead, and those are not used downstream.
g = self.file_handle[group]
v = g[key]
x = xr.DataArray(
da.from_array(v), dims=v.dimensions, attrs=v.__dict__,
name=v.name)
return x

def __contains__(self, item):
"""Get item from file content."""
return item in self.file_content
Expand Down
33 changes: 33 additions & 0 deletions satpy/tests/reader_tests/test_netcdf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ def setUp(self):
ds2_i = nc.createVariable('ds2_i', np.int32,
dimensions=('rows', 'cols'))
ds2_i[:] = np.arange(10 * 100).reshape((10, 100))
ds2_s = nc.createVariable("ds2_s", np.int8,
dimensions=("rows",))
ds2_s[:] = np.arange(10)
ds2_sc = nc.createVariable("ds2_sc", np.int8, dimensions=())
ds2_sc[:] = 42

# Add attributes
nc.test_attr_str = 'test_string'
Expand Down Expand Up @@ -138,6 +143,34 @@ def test_all_basic(self):

self.assertTrue('ds2_f' in file_handler)
self.assertFalse('fake_ds' in file_handler)
self.assertIsNone(file_handler.file_handle)
self.assertEqual(file_handler["ds2_sc"], 42)

def test_caching(self):
"""Test that caching works as intended.
"""
from satpy.readers.netcdf_utils import NetCDF4FileHandler
h = NetCDF4FileHandler("test.nc", {}, {}, cache_var_size=1000,
cache_handle=True)
self.assertIsNotNone(h.file_handle)
self.assertTrue(h.file_handle.isopen())

self.assertEqual(sorted(h.cached_file_content.keys()),
["ds2_s", "ds2_sc"])
# with caching, these tests access different lines than without
np.testing.assert_array_equal(h["ds2_s"], np.arange(10))
np.testing.assert_array_equal(h["test_group/ds1_i"],
np.arange(10 * 100).reshape((10, 100)))
h.__del__()
self.assertFalse(h.file_handle.isopen())

def test_filenotfound(self):
"""Test that error is raised when file not found
"""
from satpy.readers.netcdf_utils import NetCDF4FileHandler

with self.assertRaises(IOError):
NetCDF4FileHandler("/thisfiledoesnotexist.nc", {}, {})


def suite():
Expand Down