Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fsspec FileSystem support #1321

Closed
wants to merge 13 commits into from
30 changes: 24 additions & 6 deletions satpy/readers/abi_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,25 @@
class NC_ABI_BASE(BaseFileHandler):
"""Base reader for ABI L1B L2+ NetCDF4 files."""

def __init__(self, filename, filename_info, filetype_info):
# block_size is relevant for some file_system instances and should be
# passed when opening the file, 2**16 determined as reasonably tradeoff
# between data and speed by (limited) trial and error using
# CachingFileSystem from fsspec
block_size = 2**16

def __init__(self, filename, filename_info, filetype_info, file_system=None):
"""Open the NetCDF file with xarray and prepare the Dataset for reading."""
super(NC_ABI_BASE, self).__init__(filename, filename_info, filetype_info)
super().__init__(filename, filename_info, filetype_info,
file_system=file_system)
# xarray's default netcdf4 engine
self.of = self.file_system.open(self.filename, block_size=self.block_size)
try:
self.nc = xr.open_dataset(self.filename,
self.nc = xr.open_dataset(self.of,
decode_cf=True,
mask_and_scale=False,
chunks={'x': CHUNK_SIZE, 'y': CHUNK_SIZE}, )
except ValueError:
self.nc = xr.open_dataset(self.filename,
self.nc = xr.open_dataset(self.of,
decode_cf=True,
mask_and_scale=False,
chunks={'lon': CHUNK_SIZE, 'lat': CHUNK_SIZE}, )
Expand Down Expand Up @@ -104,7 +112,13 @@ def is_int(val):
new_fill = fill
else:
new_fill = np.nan
data = data.where(data != fill, new_fill)
# squeezing because some backends (h5netcdf) may return attributes
# as shape (1,) arrays rather than shape () scalars, which according
# to the netcdf documentation at <URL:https://www.unidata.ucar.edu
# /software/netcdf/docs/netcdf_data_set_components.html#attributes>
# is correct. Using np.squeeze because new_fill may be either a
# Python number or numpy array of shape (1,).
data = data.where(data != np.squeeze(fill), np.squeeze(new_fill))

if factor != 1 and item in ('x', 'y'):
# be more precise with x/y coordinates
Expand Down Expand Up @@ -262,8 +276,12 @@ def spatial_resolution_to_number(self):
return res

def __del__(self):
"""Close the NetCDF file that may still be open."""
"""Close the NetCDF file and file_system object that may still be open."""
try:
# this probably doesn't do anything
self.nc.close()
# normally we should close the open file, but when resampling a
# multiscene this is called prematurely, see comments on #1321
# self.of.close()
except (IOError, OSError, AttributeError):
pass
7 changes: 4 additions & 3 deletions satpy/readers/avhrr_l1b_gaclac.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
class GACLACFile(BaseFileHandler):
"""Reader for GAC and LAC data."""

def __init__(self, filename, filename_info, filetype_info,
def __init__(self, filename, filename_info, filetype_info, file_system=None,
start_line=None, end_line=None, strip_invalid_coords=True,
interpolate_coords=True, **reader_kwargs):
"""Init the file handler.
Expand All @@ -71,7 +71,7 @@ def __init__(self, filename, filename_info, filetype_info,

"""
super(GACLACFile, self).__init__(
filename, filename_info, filetype_info)
filename, filename_info, filetype_info, file_system=file_system)

self.start_line = start_line
self.end_line = end_line
Expand Down Expand Up @@ -123,7 +123,8 @@ def read_raw_data(self):
interpolate_coords=self.interpolate_coords,
creation_site=self.creation_site,
**self.reader_kwargs)
self.reader.read(self.filename)
with self.open_file() as fileobj:
self.reader.read(self.filename, fileobj=fileobj)
if np.all(self.reader.mask):
raise ValueError('All data is masked out')

Expand Down
13 changes: 11 additions & 2 deletions satpy/readers/file_handlers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017-2019 Satpy developers
#
# This file is part of satpy.
Expand All @@ -22,14 +21,15 @@
import numpy as np
from pathlib import PurePath

from fsspec.implementations.local import LocalFileSystem
from pyresample.geometry import SwathDefinition
from satpy.dataset import combine_metadata


class BaseFileHandler(metaclass=ABCMeta):
"""Base file handler."""

def __init__(self, filename, filename_info, filetype_info):
def __init__(self, filename, filename_info, filetype_info, file_system=None):
"""Initialize file handler."""
if isinstance(filename, PurePath):
self.filename = str(filename)
Expand All @@ -39,6 +39,7 @@ def __init__(self, filename, filename_info, filetype_info):
self.filename_info = filename_info
self.filetype_info = filetype_info
self.metadata = filename_info.copy()
self.file_system = file_system or LocalFileSystem()

def __str__(self):
"""Customize __str__."""
Expand Down Expand Up @@ -258,3 +259,11 @@ def available_datasets(self, configured_datasets=None):
yield is_avail, ds_info
continue
yield self.file_type_matches(ds_info['file_type']), ds_info

def open_file(self, **kwargs):
"""Open filename from file_system.

Additional parameters may be given depeding on the FileSystem and BufferedFile
implementations derived from the fsspec base classes, e.g. cache options.
"""
return self.file_system.open(self.filename, **kwargs)
10 changes: 9 additions & 1 deletion satpy/readers/yaml_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,15 @@ def _new_filehandler_instances(self, filetype_info, filename_items, fh_kwargs=No
warnings.warn(str(err) + ' for {}'.format(filename))
continue

yield filetype_cls(filename, filename_info, filetype_info, *req_fh, **fh_kwargs)
try:
yield filetype_cls(filename, filename_info, filetype_info,
*req_fh, **fh_kwargs)
except TypeError as exc:
if "file_system" in exc.args[0]:
raise NotImplementedError(
"File handler {} does not support file systems".format(filetype_cls.__name__))
else:
raise

def time_matches(self, fstart, fend):
"""Check that a file's start and end time mtach filter_parameters of this reader."""
Expand Down
27 changes: 19 additions & 8 deletions satpy/scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,16 @@ class Scene:

"""

_fs_sentinel = object()

def __init__(self, filenames=None, reader=None, filter_parameters=None, reader_kwargs=None,
ppp_config_dir=None,
base_dir=None,
sensor=None,
start_time=None,
end_time=None,
area=None):
area=None,
file_system=_fs_sentinel):
"""Initialize Scene with Reader and Compositor objects.

To load data `filenames` and preferably `reader` must be specified. If `filenames` is provided without `reader`
Expand All @@ -98,6 +101,7 @@ def __init__(self, filenames=None, reader=None, filter_parameters=None, reader_k
sub-dictionaries to pass different arguments to different
reader instances.
ppp_config_dir (str): The directory containing the configuration files for satpy.
file_system (FileSystem) an implementation of fsspec.AbstractFileSystem
base_dir (str): (DEPRECATED) The directory to search for files containing the
data to load. If *filenames* is also provided,
this is ignored.
Expand Down Expand Up @@ -129,6 +133,7 @@ def __init__(self, filenames=None, reader=None, filter_parameters=None, reader_k
sensor=sensor,
ppp_config_dir=self._ppp_config_dir,
reader_kwargs=reader_kwargs,
fs=file_system
)
elif start_time or end_time or area:
import warnings
Expand All @@ -143,19 +148,25 @@ def __init__(self, filenames=None, reader=None, filter_parameters=None, reader_k
'area': area,
})
filter_parameters = fp
if reader_kwargs is None:
reader_kwargs = {}
if filter_parameters:
if reader_kwargs is None:
reader_kwargs = {}
else:
reader_kwargs = reader_kwargs.copy()
reader_kwargs = reader_kwargs.copy()
reader_kwargs.setdefault('filter_parameters', {}).update(filter_parameters)

if filenames and isinstance(filenames, str):
raise ValueError("'filenames' must be a list of files: Scene(filenames=[filename])")

self._readers = self._create_reader_instances(filenames=filenames,
reader=reader,
reader_kwargs=reader_kwargs)
if file_system is not self._fs_sentinel:
if "file_system" in reader_kwargs:
warnings.warn(
"'file_system' passed both directly and through "
"reader_kwargs, overwriting in reader_kwargs",
UserWarning)
reader_kwargs["file_system"] = file_system
self._readers = self._create_reader_instances(
filenames=filenames, reader=reader,
reader_kwargs=reader_kwargs)
self.attrs.update(self._compute_metadata_from_readers())
self._datasets = DatasetDict()
self._composite_loader = CompositorLoader(self._ppp_config_dir)
Expand Down
3 changes: 2 additions & 1 deletion satpy/tests/reader_tests/test_abi_l1b.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
class Test_NC_ABI_L1B_Base(unittest.TestCase):
"""Common setup for NC_ABI_L1B tests."""

@mock.patch("satpy.readers.file_handlers.LocalFileSystem")
@mock.patch('satpy.readers.abi_base.xr')
def setUp(self, xr_, rad=None):
def setUp(self, xr_, srfL, rad=None):
"""Create a fake dataset using the given radiance data."""
from satpy.readers.abi_l1b import NC_ABI_L1B

Expand Down
6 changes: 4 additions & 2 deletions satpy/tests/reader_tests/test_abi_l2_nc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
class Test_NC_ABI_L2_base(unittest.TestCase):
"""Test the NC_ABI_L2 reader."""

@mock.patch("satpy.readers.file_handlers.LocalFileSystem")
@mock.patch('satpy.readers.abi_base.xr')
def setUp(self, xr_):
def setUp(self, xr_, srfL):
"""Create fake data for the tests."""
from satpy.readers.abi_l2_nc import NC_ABI_L2

Expand Down Expand Up @@ -135,8 +136,9 @@ def test_get_area_def_fixedgrid(self, adef):
class Test_NC_ABI_L2_area_latlon(unittest.TestCase):
"""Test the NC_ABI_L2 reader."""

@mock.patch("satpy.readers.file_handlers.LocalFileSystem")
@mock.patch('satpy.readers.abi_base.xr')
def setUp(self, xr_):
def setUp(self, xr_, srfL):
"""Create fake data for the tests."""
from satpy.readers.abi_l2_nc import NC_ABI_L2
proj = xr.DataArray(
Expand Down
6 changes: 4 additions & 2 deletions satpy/tests/reader_tests/test_glm_l2.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ def setup_fake_dataset():
class TestGLML2FileHandler(unittest.TestCase):
"""Tests for the GLM L2 reader."""

@mock.patch("satpy.readers.file_handlers.LocalFileSystem")
@mock.patch('satpy.readers.abi_base.xr')
def setUp(self, xr_):
def setUp(self, xr_, srfL):
"""Create a fake file handler to test."""
from satpy.readers.glm_l2 import NCGriddedGLML2
fake_dataset = setup_fake_dataset()
Expand Down Expand Up @@ -142,8 +143,9 @@ class TestGLML2Reader(unittest.TestCase):

yaml_file = "glm_l2.yaml"

@mock.patch("satpy.readers.file_handlers.LocalFileSystem")
@mock.patch('satpy.readers.abi_base.xr')
def setUp(self, xr_):
def setUp(self, xr_, srfL):
"""Create a fake reader to test."""
from satpy.readers import load_reader
from satpy.config import config_search_paths
Expand Down
9 changes: 6 additions & 3 deletions satpy/tests/test_regressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ def generate_fake_abi_xr_dataset(filename, chunks=None, **kwargs):
return dataset


@patch("satpy.readers.file_handlers.LocalFileSystem")
@patch('xarray.open_dataset')
def test_1258(fake_open_dataset):
def test_1258(fake_open_dataset, srfL):
"""Save true_color from abi with radiance doesn't need two resamplings."""
from satpy import Scene
fake_open_dataset.side_effect = generate_fake_abi_xr_dataset
Expand All @@ -183,8 +184,9 @@ def test_1258(fake_open_dataset):
assert len(resampled_scene.keys()) == 2


@patch("satpy.readers.file_handlers.LocalFileSystem")
@patch('xarray.open_dataset')
def test_1088(fake_open_dataset):
def test_1088(fake_open_dataset, srfL):
"""Check that copied arrays gets resampled."""
from satpy import Scene
fake_open_dataset.side_effect = generate_fake_abi_xr_dataset
Expand All @@ -198,8 +200,9 @@ def test_1088(fake_open_dataset):
assert resampled[my_id].shape == (2048, 2560)


@patch("satpy.readers.file_handlers.LocalFileSystem")
@patch('xarray.open_dataset')
def test_no_enums(fake_open_dataset):
def test_no_enums(fake_open_dataset, srfL):
"""Check that no enums are inserted in the resulting attrs."""
from satpy import Scene
from enum import Enum
Expand Down
6 changes: 3 additions & 3 deletions satpy/tests/test_scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_init(self):
cri.return_value = {}
satpy.scene.Scene(filenames=['bla'], reader='blo')
cri.assert_called_once_with(filenames=['bla'], reader='blo',
reader_kwargs=None)
reader_kwargs={})

def test_init_str_filename(self):
"""Test initializing with a single string as filenames."""
Expand Down Expand Up @@ -133,7 +133,7 @@ def test_create_reader_instances_with_filenames(self):
findermock.assert_called_once_with(
filenames=filenames,
reader=reader_name,
reader_kwargs=None,
reader_kwargs={},
ppp_config_dir=mock.ANY
)

Expand Down Expand Up @@ -186,7 +186,7 @@ def test_create_reader_instances_with_reader(self):
findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
reader=reader,
filenames=filenames,
reader_kwargs=None,
reader_kwargs={},
)

def test_create_reader_instances_with_reader_kwargs(self):
Expand Down