Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tarfile support #82

Merged
merged 3 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 57 additions & 20 deletions bin/pygac-fdr-run
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,72 @@

import argparse
import logging
import re
import tarfile
from contextlib import closing

import satpy
from satpy.readers import FSFile

from pygac_fdr.config import read_config
from pygac_fdr.reader import read_gac
from pygac_fdr.utils import LOGGER_NAME, logging_on
from pygac_fdr.utils import LOGGER_NAME, logging_on, TarFileSystem
from pygac_fdr.writer import NetcdfWriter

LOG = logging.getLogger(LOGGER_NAME)


class HideGzFSFile(FSFile):
_gz_suffix = re.compile(r'\.gz$')
def __fspath__(self):
path = super().__fspath__()
return self._gz_suffix.sub('', path)


def process_file(filename, config):
LOG.info("Processing file {}".format(filename))
try:
scene = read_gac(
filename, reader_kwargs=config["controls"].get("reader_kwargs")
)
writer = NetcdfWriter(
global_attrs=config.get("global_attrs"),
gac_header_attrs=config.get("gac_header_attrs"),
fname_fmt=config["output"].get("fname_fmt"),
encoding=config["netcdf"].get("encoding"),
engine=config["netcdf"].get("engine"),
debug=config["controls"].get("debug"),
)
writer.write(output_dir=config["output"].get("output_dir"), scene=scene)
success = True
except Exception as err:
if config["controls"]["debug"]:
raise
LOG.error("Error processing file {}: {}".format(filename, err))
success = False
return success


def process_tarball(tarball, config):
LOG.info("Processing tarball {}".format(tarball))
failures = []
with closing(TarFileSystem(tarball)) as archive:
filepaths = archive.find('', details=False, withdirs=False)
for filepath in filepaths:
filename = HideGzFSFile(filepath, fs=archive)
success = process_file(filename, config)
if not success:
failures.append(filename)
if failures:
LOG.error('Could not process the following files: {}'.format(failures))
else:
LOG.info('Successfully processed all files in {}'.format(tarball))


if __name__ == "__main__":
# Parse command line arguments
parser = argparse.ArgumentParser(
description="Read & calibrate AVHRR GAC data and write " "results to netCDF"
description="Read & calibrate AVHRR GAC data and write results to netCDF"
)
parser.add_argument(
"--cfg", required=True, type=str, help="Path to pygac-fdr configuration file."
Expand Down Expand Up @@ -75,21 +126,7 @@ if __name__ == "__main__":
# Process files
satpy.CHUNK_SIZE = config["controls"].get("pytroll_chunk_size", 1024)
for filename in args.filenames:
LOG.info("Processing file {}".format(filename))
try:
scene = read_gac(
filename, reader_kwargs=config["controls"].get("reader_kwargs")
)
writer = NetcdfWriter(
global_attrs=config.get("global_attrs"),
gac_header_attrs=config.get("gac_header_attrs"),
fname_fmt=config["output"].get("fname_fmt"),
encoding=config["netcdf"].get("encoding"),
engine=config["netcdf"].get("engine"),
debug=config["controls"].get("debug"),
)
writer.write(output_dir=config["output"].get("output_dir"), scene=scene)
except Exception as err:
if config["controls"]["debug"]:
raise
LOG.error("Error processing file {}: {}".format(filename, err))
if tarfile.is_tarfile(filename):
process_tarball(filename, config)
else:
process_file(filename, config)
74 changes: 74 additions & 0 deletions pygac_fdr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

"""Miscellaneous utilities."""

import datetime
import logging
import tarfile

import fsspec
import satpy.utils


_is_logging_on = False
LOGGER_NAME = __package__

Expand Down Expand Up @@ -62,3 +66,73 @@ def logging_off(for_all=False):
"""
logger_name = "" if for_all else LOGGER_NAME
logging.getLogger(logger_name).handlers = [logging.NullHandler()]


class TarFileSystem(fsspec.AbstractFileSystem):
"""Read contents of TAR archive as a file-system."""
root_marker = ""
max_depth = 20

def __init__(self, tarball):
super().__init__()
self.tarball = tarball
self.tar = tarfile.open(tarball, mode='r')

def __del__(self):
self.close()

def close(self):
"""Close archive"""
self.tar.close()

@property
def closed(self):
return self.tar.closed

@classmethod
def _strip_protocol(cls, path):
return super()._strip_protocol(path).lstrip("/")

@staticmethod
def _get_info(tarinfo):
info = {
"name": tarinfo.name,
"size": tarinfo.size,
"type": "directory" if tarinfo.isdir() else "file"
}
return info

def _get_depth(self, path):
if path:
depth = 1 + path.count('/')
else:
depth = 0
return depth

def ls(self, path, detail=True, **kwargs):
"""List objects at path."""
depth = self._get_depth(path) + 1
if detail:
result = [
self._get_info(tarinfo)
for tarinfo in self.tar.getmembers()
if (tarinfo.name.startswith(path)
and self._get_depth(tarinfo.name) == depth)
]
result.sort(key=lambda item: item['name'])
else:
result = sorted(
name for name in self.tar.getnames()
if self._get_depth(name) == depth
)
return result

def modified(self, path):
"""Return the modified timestamp of a file as a datetime.datetime"""
tarinfo = self.tar.getmember(path)
return datetime.datetime.fromtimestamp(tarinfo.mtime)

def _open(self, path, mode='rb', **kwargs):
if mode != "rb":
raise ValueError("Only mode 'rb' is allowed!")
return self.tar.extractfile(path)