Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monkey patch iris #68

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
358fce2
Reset scratch process script
znicholls Jul 10, 2019
2235793
Put parallel loop in
znicholls Jul 10, 2019
d9f5e1d
Add comparison of data/marble crunching
znicholls Jul 10, 2019
83d418f
Back to serial for testing purposes
znicholls Jul 10, 2019
78dd8f8
Make comparisons a fair test
znicholls Jul 10, 2019
d25b518
Make better scratch script
znicholls Jul 10, 2019
c90da9b
Print output in scratch
znicholls Jul 10, 2019
730f7e6
Monkey patch iris netCDF loading
Jul 11, 2019
684fa53
Decide on dask tricks for crunching
znicholls Jul 11, 2019
640a63a
Update scratch for full IPSL crunching
znicholls Jul 11, 2019
a064e24
Optimise as far as I can without parallel
znicholls Jul 11, 2019
4e932f7
Format
znicholls Jul 11, 2019
9d95d4c
Add useful benchmarking script
znicholls Jul 12, 2019
c394ceb
Back to asynchronous reads
znicholls Jul 12, 2019
2b6a686
Turn off saving to do fair comparison with parallel loop
znicholls Jul 12, 2019
3bca515
Put parallel loop back in
znicholls Jul 12, 2019
473a0b7
Add tougher scratch processing script
znicholls Jul 12, 2019
ca87e2e
Back to serial processing
znicholls Jul 12, 2019
3dcbce4
Turn output writing back on
znicholls Jul 12, 2019
73fedfc
Update scripts
znicholls Jul 12, 2019
d24c18b
Rebase and format
znicholls Jul 12, 2019
88da7de
CHANGELOG and tidy up
znicholls Jul 12, 2019
5e419f2
Update scripts/scratch-process.sh
znicholls Jul 12, 2019
0828a6b
Add netCDF-SCM version output into processing speed script
znicholls Jul 12, 2019
484cff3
Turn save off
znicholls Jul 12, 2019
3a1186d
Try synchronous output again
znicholls Jul 12, 2019
272b48b
Back to previous setup
znicholls Jul 12, 2019
af262ae
Fix variable typo
znicholls Jul 12, 2019
bd9fd4a
Make processing use map
znicholls Jul 13, 2019
7403f3f
Ugly parallel crunching
znicholls Jul 13, 2019
1235322
Tidy up
znicholls Jul 13, 2019
1e628b0
Refactor out masking functionality
znicholls Jul 13, 2019
00bb3a7
Only get masks once
znicholls Jul 13, 2019
9044a87
Setup dynamic parallel crunching
znicholls Jul 13, 2019
636f935
Write speed test to specific folder
znicholls Jul 14, 2019
92024ad
Remove speed output files
znicholls Jul 14, 2019
ffb708e
Add upper crunching limit too
znicholls Jul 14, 2019
6f9c2b8
Go back to serial crunching
znicholls Jul 15, 2019
74955ea
Make crunching run in parallel
znicholls Jul 15, 2019
288c16d
Add tqdm progressbar
znicholls Jul 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Changelog
master
------

- (`#68 <https://github.com/znicholls/netcdf-scm/pull/68>`_) Monkey patch iris netCDF4 loading to avoid slow chunking reads
- (`#70 <https://github.com/znicholls/netcdf-scm/pull/70>`_) Dynamically decide whether to handle data lazily (fix regression tests in process)
- (`#69 <https://github.com/znicholls/netcdf-scm/pull/69>`_) Add El Nino 3.4 mask
- (`#67 <https://github.com/znicholls/netcdf-scm/pull/67>`_) Fix crunching filenaming, tidy up more and add catch for IPSL ``time_origin`` time variable attribute
Expand Down
6 changes: 3 additions & 3 deletions scripts/scratch-process.sh
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#!/bin/bash
SRC_DIR="/media/research-nfs/cmip6/"
SRC_DIR="/media/research-nfs/cmip6"
CRUNCH_DIR="/data/marble/sandbox/share/cmip6-crunched-ipsl-sandbox"
WRANGLE_DIR="/data/marble/sandbox/share/cmip6-wrangled-ipsl-sandbox"
CONTACT='Zebedee Nicholls <zebedee.nicholls@climate-energy-college.org>, Jared Lewis <jared.lewis@climate-energy-college.org>, Malte Meinshausen <malte.meinshausen@unimelb.edu.au>'
DRS="CMIP6Output"
REGEXP="^(?!.*fx).*IPSL.*$"
REGEXP="^(?!.*(fx|/ta/)).*(rlust|rsdt|rsut|tas|ts|fgco2|hfds|hfcorr|tos|co2|fco2fos|fco2nat|fco2antt|co2s).*$"
# have to be super careful when crunching input files as the duplicate grids can cause
# things to explode
REGEXP_WRANGLE_IN_FILES=".*tas/.*gr/.*"

netcdf-scm-crunch "${SRC_DIR}" "${CRUNCH_DIR}" "${CONTACT}" --drs "${DRS}" --regexp "${REGEXP}"
netcdf-scm-crunch "${SRC_DIR}" "${CRUNCH_DIR}" "${CONTACT}" --drs "${DRS}" --regexp "${REGEXP}" --force
netcdf-scm-wrangle "${CRUNCH_DIR}/netcdf-scm-crunched/CMIP6/" "${WRANGLE_DIR}" "${CONTACT}" --drs "${DRS}"
netcdf-scm-wrangle "${CRUNCH_DIR}/netcdf-scm-crunched/CMIP6/" "${WRANGLE_DIR}" "${CONTACT}" --drs "${DRS}" --out-format magicc-input-files-point-end-of-year --regexp "${REGEXP_WRANGLE_IN_FILES}"
117 changes: 20 additions & 97 deletions scripts/scratch.py
Original file line number Diff line number Diff line change
@@ -1,98 +1,21 @@
import datetime as dt
import os
import os.path

import pymagicc
from openscm.scmdataframe import ScmDataFrame
from pymagicc.io import MAGICCData

import netcdf_scm

IN_DIR = "/data/marble/sandbox/share/cmip6-crunched/netcdf-scm-crunched/CMIP6"
OUT_DIR = "/data/marble/sandbox/share/cmip6-wrangled-ipsl-sandbox"

REGIONMODES = {
"FOURBOX": [
"World|Northern Hemisphere|Land",
"World|Northern Hemisphere|Ocean",
"World|Southern Hemisphere|Land",
"World|Southern Hemisphere|Ocean",
from click.testing import CliRunner

from netcdf_scm.cli import crunch_data

INPUT_DIR = "tests/test-data/cmip6output/CMIP6/"
OUTPUT_DIR = "output-examples/scratch-script-output"

runner = CliRunner()
result = runner.invoke(
crunch_data,
[
INPUT_DIR,
OUTPUT_DIR,
"cmip6output crunching scratch",
"--drs",
"CMIP6Output",
"-f",
],
"GLOBAL": ["World"],
}

VARIABLE_MAPPING = {"tas": "Surface Temperature"}

ignored_vars = []

for i, (path, folders, files) in enumerate(os.walk(IN_DIR)):
if "IPSL" not in path:
continue
if files:
assert len(files) == 1, files

pb = files[0].split("_")
variable = pb[1]
exp_id = pb[4]
source_id = pb[3]
variant = pb[5]
time_id = pb[7].replace(".csv", "")
base_name = "_".join([variable, exp_id, source_id, variant, time_id])

d = MAGICCData(ScmDataFrame(os.path.join(path, files[0]))).timeseries()
d = d.subtract(d.iloc[:, 0], axis="rows")

mag_name = "{}.MAG".format(base_name).upper()
print(mag_name)
mag_writer = MAGICCData(d)
mag_writer["todo"] = "SET"
mag_writer.metadata["timeseriestype"] = "MONTHLY"
mag_writer.metadata["header"] = (
"Date: {}\n"
"Crunched by: Zebedee Nicholls <zebedee.nicholls@climate-energy-college.org>, Jared Lewis <jared.lewis@climate-energy-college.org>, Malte Meinshausen <malte.meinshausen@unimelb.edu.au>\n"
"Writer: pymagicc v{} (available at github.com/openclimatedata/pymagicc)\n"
"Cruncher: netcdf-scm v{} (available at github.com/znicholls/netcdf-scm)\n"
"Affiliation: Climate & Energy College, The University of Melbourne, Australia".format(
dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
pymagicc.__version__,
netcdf_scm.__version__,
)
)
mag_writer.metadata["other metadata"] = "will go here"
mag_writer["variable"] = variable # TODO: fix cruncher so it uses cf names
try:
mag_writer.write(os.path.join(OUT_DIR, mag_name), magicc_version=7)
except TypeError:
print("Not happy: {}".format(files[0]))
continue

try:
VARIABLE_MAPPING[variable]
except KeyError:
if variable not in ignored_vars:
print("not writing in files for {}".format(variable))
ignored_vars.append(variable)
continue

for r, regions in REGIONMODES.items():
m = MAGICCData(d).filter(region=regions)

m["todo"] = "SET"
m["variable"] = VARIABLE_MAPPING[variable]

m.metadata = {"header": "testing file only"}
try:
m = m.filter(month=12)
except KeyError:
print("Not happy {}".format(files[0]))
continue

out_name = "{}_{}_SURFACE_TEMP.IN".format(base_name, r).upper()
print("wrote {}".format(out_name))
try:
m.write(os.path.join(OUT_DIR, out_name), magicc_version=7)
except (TypeError, IndexError):
print("Not happy {}".format(files[0]))


print("did not write IN files for:\n{}".format("\n".join(ignored_vars)))
)
assert result.exit_code == 0, result.output
print(result.output)
28 changes: 28 additions & 0 deletions scripts/test-processing-speed.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
SRC_DIR="/data/marble/cmip6/CMIP6/DCPP/IPSL/IPSL-CM6A-LR/dcppC-ipv-neg"
CRUNCH_DIR="/data/marble/sandbox/share/cmip6-crunched-ipsl-sandbox"
CONTACT='Zebedee Nicholls <zebedee.nicholls@climate-energy-college.org>, Jared Lewis <jared.lewis@climate-energy-college.org>, Malte Meinshausen <malte.meinshausen@unimelb.edu.au>'
DRS="CMIP6Output"
REGEXP="^(?!.*fx).*(tas/|fco2nat|rsut|rlut|rsdt).*$"

exec 3>&1 4>&2
SRC_DIR_TIME=$( { time netcdf-scm-crunch "${SRC_DIR}" "${CRUNCH_DIR}" "${CONTACT}" --drs "${DRS}" --regexp "${REGEXP}" --force 1>&3 2>&4; } 2>&1 )
exec 3>&- 4>&-


SRC_DIR_MEDIUM="/data/marble/cmip6/CMIP6/CMIP/BCC/BCC-CSM2-MR/historical/r1i1p1f1/Amon"

exec 3>&1 4>&2
SRC_DIR_MEDIUM_TIME=$( { time netcdf-scm-crunch "${SRC_DIR_MEDIUM}" "${CRUNCH_DIR}" "${CONTACT}" --drs "${DRS}" --regexp "${REGEXP}" --force 1>&3 2>&4; } 2>&1 )
exec 3>&- 4>&-


SRC_DIR_BIG="/data/marble/cmip6/CMIP6/CMIP/BCC/BCC-CSM2-MR/piControl/r1i1p1f1/Amon"

exec 3>&1 4>&2
SRC_DIR_BIG_TIME=$( { time netcdf-scm-crunch "${SRC_DIR_BIG}" "${CRUNCH_DIR}" "${CONTACT}" --drs "${DRS}" --regexp "${REGEXP}" --force 1>&3 2>&4; } 2>&1 )
exec 3>&- 4>&-

echo "Multiple small file (10 yrs) time: $SRC_DIR_TIME"
echo "Multiple medium file (250 yrs) crunch time: $SRC_DIR_MEDIUM_TIME"
echo "Multiple big file (750 yrs) crunch time: $SRC_DIR_BIG_TIME"
24 changes: 24 additions & 0 deletions src/netcdf_scm/iris_cube_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import iris.analysis.cartography
import iris.experimental.equalise_cubes
from iris.exceptions import CoordinateNotFoundError, ConcatenateError
from iris.fileformats import netcdf
import cftime
import cf_units
except ModuleNotFoundError: # pragma: no cover # emergency valve
Expand All @@ -48,6 +49,28 @@
logger = logging.getLogger(__name__)


# monkey patch iris loading of netCDF4 to ignore file chunk specifications
def _get_cf_var_data(cf_var, filename):
import netCDF4

# Get lazy chunked data out of a cf variable.
dtype = netcdf._get_actual_dtype(cf_var) # pylint:disable=protected-access

# Create cube with deferred data, but no metadata
fill_value = getattr(
cf_var.cf_data,
"_FillValue",
netCDF4.default_fillvals[cf_var.dtype.str[1:]], # pylint:disable=no-member
)
proxy = netcdf.NetCDFDataProxy(
cf_var.shape, dtype, filename, cf_var.cf_name, fill_value
)
return netcdf.as_lazy_data(proxy, chunks=None)


netcdf._get_cf_var_data = _get_cf_var_data # pylint:disable=protected-access


class SCMCube: # pylint:disable=too-many-public-methods
"""
Class for processing netCDF files for use in simple climate models.
Expand Down Expand Up @@ -741,6 +764,7 @@ def get_scm_cubes(self, sftlf_cube=None, land_mask_threshold=50, masks=None):
scm_masks = self._get_scm_masks(
sftlf_cube=sftlf_cube, land_mask_threshold=land_mask_threshold, masks=masks
)

# ensure data is realised so it's not read multiple times while applying
# masks
self._ensure_data_realised()
Expand Down
2 changes: 2 additions & 0 deletions src/netcdf_scm/masks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""
import logging
import os
from functools import lru_cache

import numpy as np

Expand Down Expand Up @@ -108,6 +109,7 @@ def f(masker, cube, **kwargs):
return f


@lru_cache(maxsize=32)
def get_default_sftlf_cube():
"""Load NetCDF-SCM's default (last resort) surface land fraction cube"""
return iris.load_cube(os.path.join(os.path.dirname(__file__), _DEFAULT_SFTLF_FILE))
Expand Down
1 change: 1 addition & 0 deletions src/netcdf_scm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def apply_mask(in_scmcube, in_mask):
new_data = da.ma.masked_array(data=in_scmcube.cube.lazy_data(), mask=in_mask)
else:
new_data = ma.masked_array(in_scmcube.cube.data, mask=in_mask)

out_cube.cube = in_scmcube.cube.copy(data=new_data)

return out_cube
Expand Down