Skip to content

Commit

Permalink
Initial port of "NetCDF Zarr Sequential Recipe: CMIP6" notebook (pang…
Browse files Browse the repository at this point in the history
  • Loading branch information
derekocallaghan committed Feb 17, 2023
1 parent 046b450 commit 7bdbe99
Showing 1 changed file with 113 additions and 51 deletions.
164 changes: 113 additions & 51 deletions docs/pangeo_forge_recipes/tutorials/xarray_zarr/cmip6-recipe.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -617,29 +617,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 3: Define a pre-processing function\n",
"- This is an optional step which we want to apply to each chunk\n",
"- Here we change some data variables into coordinate variables, but you can define your own pre-processing step here"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"# the netcdf lists some of the coordinate variables as data variables. This is a fix which we want to apply to each chunk.\n",
"def set_bnds_as_coords(ds):\n",
" new_coords_vars = [var for var in ds.data_vars if 'bnds' in var or 'bounds' in var]\n",
" ds = ds.set_coords(new_coords_vars)\n",
" return ds"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 4: Create a recipe\n",
"## Step 3: Define the File Pattern\n",
"- A `FilePattern` is the starting place for all recipes. These Python objects are the \"raw ingredients\" upon which the recipe will act. They describe how the individual source files are organized logically as part of a larger dataset. To create a file pattern, the first step is to define a function which takes any variable components of the source file path as inputs, and returns full file path strings.\n",
"- Revisting our input urls, we see that the only variable components of these paths are the 13-character numerical strings which immediatly precede the .nc file extension:"
]
Expand Down Expand Up @@ -854,24 +832,120 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"**Time to make the recipe!**\n",
"- In it's most basic form, `XarrayZarrRecipe` can be instantiated using a file pattern as the only argument. Here we'll be using some of the optional arguments to specify a few additional preferences:"
"## Step 4: Write the Recipe\n",
"\n",
"Now that we have a `FilePattern`, we are ready to write our recipe. As described in {doc}`netcdf_zarr_sequential`, a recipe is defined as a pipeline of [Apache Beam transforms](https://beam.apache.org/documentation/programming-guide/#transforms) applied to the data collection associated with a `FilePattern`.\n",
"\n",
"First, we'll import the transforms provided by Pangeo Forge that may be used to transform a `FilePattern` collection into a Zarr store."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define a pre-processing function\n",
"\n",
"- This is an optional step which we want to apply to each chunk\n",
"- Here we change some data variables into coordinate variables, but you can define your own pre-processing step here\n",
"\n",
"We will write a Beam transform that fixes both these issues. This is achieved by creating a [composite transform](https://beam.apache.org/documentation/programming-guide/#composite-transform-creation), which is a subclass of the `apache_beam.PTransform` class that overrides the `expand()` method to specify the actual processing logic. "
]
},
{
"cell_type": "code",
"execution_count": 21,
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from pangeo_forge_recipes.recipes.xarray_zarr import XarrayZarrRecipe\n",
"from pangeo_forge_recipes.transforms import Indexed, T\n",
"\n",
"class SetBndsAsCoords(beam.PTransform):\n",
" \"\"\"\n",
" Fix issues in retrieved data.\n",
" \"\"\"\n",
"\n",
"recipe = XarrayZarrRecipe(\n",
" pattern, \n",
" target_chunks=target_chunks,\n",
" process_chunk=set_bnds_as_coords,\n",
" xarray_concat_kwargs={'join':'exact'},\n",
")"
" @staticmethod\n",
" def _set_bnds_as_coords(item: Indexed[T]) -> Indexed[T]:\n",
" \"\"\"\n",
" The netcdf lists some of the coordinate variables as data variables. \n",
" This is a fix which we want to apply to each dataset.\n",
" \"\"\"\n",
" index, ds = item\n",
" new_coords_vars = [var for var in ds.data_vars if 'bnds' in var or 'bounds' in var]\n",
" ds = ds.set_coords(new_coords_vars)\n",
" return index, ds\n",
"\n",
" def expand(self, pcoll: beam.PCollection) -> beam.PCollection:\n",
" return pcoll | beam.Map(self._set_bnds_as_coords)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define the Recipe Object\n",
"The recipe pipeline of transforms applied to `pattern` is similar to that described in {doc}`netcdf_zarr_sequential`, with the following modifications:\n",
"1. The new preprocessing transform `SetBndsAsCoords` is included in the pipeline."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A place for our data to go"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'/tmp/tmpe1ovhwnn/output.zarr'"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from tempfile import TemporaryDirectory\n",
"td = TemporaryDirectory()\n",
"target_path = td.name + \"/output.zarr\"\n",
"target_path"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"transforms = (\n",
" beam.Create(pattern.items())\n",
" | OpenURLWithFSSpec()\n",
" | OpenWithXarray(file_type=pattern.file_type)\n",
" | SetBndsAsCoords() # New preprocessor\n",
" | StoreToZarr(\n",
" target=target_path,\n",
" combine_dims=pattern.combine_dim_keys,\n",
" target_chunks=target_chunks\n",
" )\n",
")\n",
"transforms"
]
},
{
Expand All @@ -880,9 +954,7 @@
"source": [
"## Step 5: Execute the recipe\n",
"\n",
"Here we use the basic function executor. For details on all execution modes, see:\n",
"\n",
"https://pangeo-forge.readthedocs.io/en/latest/pangeo_forge_recipes/recipe_user_guide/execution.html"
"Execute the recipe pipeline using Beam."
]
},
{
Expand Down Expand Up @@ -1582,11 +1654,8 @@
}
],
"source": [
"from pangeo_forge_recipes.recipes import setup_logging\n",
"\n",
"setup_logging() # setup execution logs\n",
"\n",
"recipe.to_function()() # compile and execute recipe as single Python function"
"with beam.Pipeline() as p:\n",
" p | transforms"
]
},
{
Expand Down Expand Up @@ -1637,8 +1706,8 @@
],
"source": [
"# Check to see if it worked:\n",
"ds = xr.open_zarr(recipe.target_mapper)\n",
"print(ds)"
"ds = xr.open_zarr(target_path)\n",
"ds"
]
},
{
Expand Down Expand Up @@ -1691,13 +1760,6 @@
"\n",
"dataset = 'IPSL-CM6A-LR.abrupt-4xCO2.r1i1p1f1.Lmon.cLeaf.gr' # need decode_coords=False in xr.open_dataset, but using xarray_open_kwargs = {'decode_coords':False}, still throws an error when caching the input "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand All @@ -1716,7 +1778,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.10"
"version": "3.9.13"
}
},
"nbformat": 4,
Expand Down

0 comments on commit 7bdbe99

Please sign in to comment.