diff --git a/README.md b/README.md index 473b07a6..4757f74c 100644 --- a/README.md +++ b/README.md @@ -120,7 +120,7 @@ you will also need to activate this environment by name.) We install the `environment_w_jupyter.yml` to provide all known dependencies -including those for running the eample notebooks. (The `environment.yml` +including those for running the example notebooks. (The `environment.yml` does not contain jupyter or jupyterlab because this interferes with installation on WholeTale, see Example Notebooks seection below.) diff --git a/doc/index.rst b/doc/index.rst index 326faa48..2060e4a1 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -20,6 +20,12 @@ modeling techniques and data sources. Pywatershed is a place for experimentation with software design, process representation, and data fusion in the context of well established hydrologic process modeling. +The Python language was choosen because it is accessible to a wide audience of +potential contributors which will help foster community development and +experimentation. A large number of advanced libraries available for Python can +be applied to hdyrologic modeling, including libraries for parallelism, data +access and manipulation, and machine learning. + Following the conceptual design of PRMS, pywatershed calculates explicit solutions of spatially distributed hydrologic process representations including evaporation, transpiration, runoff, infiltration, interflow, snowpack, soil moisture, conceptual @@ -29,11 +35,13 @@ land use change at temporal scales ranging from days to centuries. Pywatershed enhances PRMS with a new software design that is object-oriented and highly flexible, allowing users to easily run "sub-models", replace process representations, and -incorporate new data. The Python language is accessible to a wide audience of -potential contributors which will help foster community development and experimentation. -A large number of advanced libraries available for Python can be applied to -hdyrologic modeling, including libraries for parallelism, data access and manipulation, -and machine learning. +incorporate new data. There are base classes which manage mass and energy conservation +and the implementation of concrete, process classes follows a self-describing design +which allows for Model class to properly connect hydrologic proecsses based on their +own descriptions of themselves. A variety of input data sources is managed by the +Adapter class which implements subclasses for different sources. The design of +pywatershed is documented in these docs and also deomonstrated by numbered jupyter +notebooks in the `examples/` directory. The flexible structure of pywatershed helps it to couple with other hydrologic models. We can easily one-way couple pywatershed to diff --git a/examples/05_parameter_ensemble.ipynb b/examples/05_parameter_ensemble.ipynb new file mode 100644 index 00000000..f2a713df --- /dev/null +++ b/examples/05_parameter_ensemble.ipynb @@ -0,0 +1,481 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "f932e1c0-24ac-4be8-a886-8554cd1e4412", + "metadata": {}, + "source": [ + "# Parameter Ensemble\n", + "This notebook shows how to edit and work with parameters in pywatershed. First we look at the data model used by the `Parameter` class to build a small ensemble of parameters for the `PRMSChannel` hydrologic process. Then we do a little bit of (embarassingly) parallel programming using Python's [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) to run this ensemble in parallel (in addition to serial). This provides a skeleton recipe for how to do calibration or sensitivity analysis with pywatershed. \n", + "\n", + "It is a design feature that the `Parameter` class is read-only. This is because we dont want the code and developers modifying parameters opaquely under the hood. While this practice is commonplace, it undermines the idea of reproducible research and causes more headaches than it sovles. So we guard against this with software design. The trick is that we need to make the `Parameter` object editable, but that means we have to change it to another class first. \n", + "\n", + "Let's get started. \n", + "\n", + "Note this notebook needs notebooks 0-1 to have been run in advance." + ] + }, + { + "cell_type": "markdown", + "id": "d2458336-67f7-4a2d-b486-8dbb99389854", + "metadata": {}, + "source": [ + "## Preliminaries" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "73380b42-3336-4c6c-99cb-f797841a132a", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + } + }, + "outputs": [], + "source": [ + "# auto-format the code in this notebook\n", + "%load_ext jupyter_black" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "769aa543-dfa5-43e9-84f6-0830ad0cb1d0", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + } + }, + "outputs": [], + "source": [ + "import os\n", + "import pathlib as pl\n", + "from pprint import pprint\n", + "import shutil\n", + "\n", + "import numpy as np\n", + "import pywatershed as pws\n", + "import xarray as xr" + ] + }, + { + "cell_type": "markdown", + "id": "c73e4353-26c7-40c1-a952-9739f52c8198", + "metadata": {}, + "source": [ + "We'll use a PRMS-native parameter file from one of the domains supplied with pywatershed on install." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "01da8dca-622f-44c2-9041-8a43826d5d6f", + "metadata": {}, + "outputs": [], + "source": [ + "domain_dir = pws.constants.__pywatershed_root__ / \"data/drb_2yr\"\n", + "nb_output_dir = pl.Path(\"./05_parameter_ensemble\")\n", + "nb_output_dir.mkdir(exist_ok=True)\n", + "(nb_output_dir / \"params\").mkdir(exist_ok=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8b08b96b-9bcc-4b1b-a2e9-231949b2dd0c", + "metadata": {}, + "outputs": [], + "source": [ + "params = pws.parameters.PrmsParameters.load(domain_dir / \"myparam.param\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c7dd6bc3-18aa-4358-9426-1515c1d0f884", + "metadata": {}, + "outputs": [], + "source": [ + "print(params)\n", + "isinstance(params, pws.Parameters)" + ] + }, + { + "cell_type": "markdown", + "id": "21bae789-3c33-4ff7-a0c2-e60afccaefbb", + "metadata": {}, + "source": [ + "## Create an ensemble or parameters\n", + "Now that we have the PRMS parameters as a `pws.Parameters` object, actually as its subclass `PrmsParameters`, we'll conduct a simple demonstration of how to generate an ensemble of values for `K_coef`, the Muskingum storage coefficient which affects the travel time of waves in the `PRMSChannel` representation.\n", + "\n", + "As mentioned above, we have to get the parameter data in to a different class to be able to edit. Here we have two options: 1) an `xarray.Dataset` 2) a `pywatershed.DatasetDict`. These two options have invertible mappings provided by `pywatershed.DatasetDict`. \n", + "\n", + "First we'll deomonstrate the approach with `xarray.Dataset`. We'll create an ensemble with 11 members and we'll write the new parameter datasets, including all the variables, out to disk as separate NetCDF files. Note we could do this in memory and not write to disk, but generally it is favorable to have a record of inputs. This also demonstrates how to how one can quite easily convert a native PRMS parameter file to a NetCDF file. \n", + "\n", + "We'll just multiply the `K_coef` the coefficient by the 11 numbers in 0.75, 0.8, ... , 1.2, 1.25 to get our 11 realizations." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7abef9f9-7675-4c7d-98ef-5dd0bc60ce3e", + "metadata": {}, + "outputs": [], + "source": [ + "param_files = [] # get a list of written NetCDF files back at the end\n", + "n_members = 11\n", + "for ii in range(n_members):\n", + " param_ds = params.to_xr_ds() # copies by default\n", + " multiplier = ii * 0.05 + 0.75\n", + " print(\"multiplier = \", multiplier)\n", + " param_ds[\"K_coef\"] *= multiplier\n", + " param_file_name = (\n", + " nb_output_dir / f\"params/perturbed_params_xr_{str(ii).zfill(3)}.nc\"\n", + " )\n", + " param_files += [param_file_name]\n", + " param_ds.to_netcdf(param_file_name)" + ] + }, + { + "cell_type": "markdown", + "id": "d326af89-56b8-4ec6-8c53-71de58e630fb", + "metadata": {}, + "source": [ + "For the final `param_ds` still in memory, we can look at it... it has 144 variables, so you'll need to click the triangle to see the list. The little papert with bent corner icon provides metadata and the stacked disks give a python `repr`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7cc16ad5-1858-4f42-b902-69138d4458a2", + "metadata": {}, + "outputs": [], + "source": [ + "param_ds" + ] + }, + { + "cell_type": "markdown", + "id": "1089e6fb-d39c-4d86-8044-b10c213af5da", + "metadata": {}, + "source": [ + "Do a check that the values in the file divided by the original values reproduce the factors in order." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "146516e8-bd2e-4587-ac8b-167d34e84a96", + "metadata": {}, + "outputs": [], + "source": [ + "for ff in param_files:\n", + " new_params = xr.open_dataset(\n", + " ff, decode_times=False, decode_timedelta=False\n", + " )\n", + " k_coef = new_params[\"K_coef\"]\n", + " # new_params = pws.parameters.PrmsParameters.from_netcdf(ff)\n", + " # k_coef = new_params.data_vars[\"K_coef\"]\n", + " multipliers = k_coef / params.data_vars[\"K_coef\"]\n", + " assert (multipliers - multipliers[0] < 1e-15).all()\n", + " print(multipliers[0].values)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cf869257-d4e5-4b36-93e8-deada428a81e", + "metadata": {}, + "outputs": [], + "source": [ + "del param_files" + ] + }, + { + "cell_type": "markdown", + "id": "c075c737-9157-449c-886b-7d07ebe59486", + "metadata": {}, + "source": [ + "Now to demonstrate the use of a `pywatershed.DatasetDict` which you can read about in the [documentation](https://pywatershed.readthedocs.io/en/main/api/generated/pywatershed.base.DatasetDict.html#pywatershed.base.DatasetDict). Note that the edited `DatasetDict` can be made a `Parameters` object again by `Parameters(**param_dict.data)`, but we'll just write directly to file and then load as a `Parameters` object. These are slightly different choices from above, show additional flexibility. We still choose to write the parameter ensemble to disk, however.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cd179039-a91d-4057-8e11-4ba49cb387bd", + "metadata": {}, + "outputs": [], + "source": [ + "param_files = []\n", + "for ii in range(11):\n", + " param_dict = params.to_dd() # copies by default\n", + " multiplier = ii * 0.05 + 0.75\n", + " print(\"multiplier = \", multiplier)\n", + " param_dict.data_vars[\"K_coef\"] *= multiplier\n", + " param_file_name = (\n", + " nb_output_dir / f\"params/perturbed_params_{str(ii).zfill(3)}.nc\"\n", + " )\n", + " param_files += [param_file_name]\n", + " param_dict.to_netcdf(\n", + " param_file_name, use_xr=True\n", + " ) # using xarray, more work necessary for nc4 export" + ] + }, + { + "cell_type": "markdown", + "id": "ef9a8074-d0ef-4afb-8f15-1b3ec726fbe5", + "metadata": {}, + "source": [ + "Same check as above, but this time we read the NetCDF file into a `PrmsParameters` object rather than an `xarray.Dataset`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "04cb7d5a-4979-4859-872c-6ccd6f6f1fc3", + "metadata": {}, + "outputs": [], + "source": [ + "for ff in param_files:\n", + " # the problem arises on the read with xarray default decoding\n", + " # but we can just open the netcdf file as Parameters\n", + " # ds = xr.open_dataset(ff, decode_times=False, decode_timedelta=False)\n", + " # k_coef = ds[\"K_coef\"]\n", + " new_params = pws.parameters.PrmsParameters.from_netcdf(ff)\n", + " k_coef = new_params.data_vars[\"K_coef\"]\n", + " multipliers = k_coef / params.data_vars[\"K_coef\"]\n", + " assert (multipliers - multipliers[0] < 1e-15).all()\n", + " print(multipliers[0])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2ae2d078-4d36-4db0-b824-aa09da6b9585", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "id": "cad8c37c-ddbc-4c0f-868e-c0f9fdce6e78", + "metadata": {}, + "source": [ + "## Run the parameter ensemble\n", + "We'll write a helper function for running the parameters through the model. Note comments on details around concurrent.futures." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "97cc4ba6-9f81-4c6a-8a1f-d2e2a169c986", + "metadata": {}, + "outputs": [], + "source": [ + "def run_channel_model(output_dir_parent, param_file):\n", + " # for concurrent.futures we have to write this function to file/module\n", + " # so we have to import things that wont be in scope in that case.\n", + " import numpy as np\n", + " import pywatershed as pws\n", + "\n", + " domain_dir = pws.constants.__pywatershed_root__ / \"data/drb_2yr\"\n", + "\n", + " params = pws.parameters.PrmsParameters.from_netcdf(param_file)\n", + "\n", + " param_id = param_file.with_suffix(\"\").name.split(\"_\")[-1]\n", + " nc_output_dir = output_dir_parent / f\"run_params_{param_id}\"\n", + " nc_output_dir.mkdir(parents=True, exist_ok=True)\n", + "\n", + " control = pws.Control.load_prms(\n", + " domain_dir / \"control.test\", warn_unused_options=False\n", + " )\n", + " control.edit_end_time(np.datetime64(\"1979-07-01T00:00:00\"))\n", + " control.options = control.options | {\n", + " \"input_dir\": \"01_multi-process_models/nhm_memory\",\n", + " \"budget_type\": \"warn\",\n", + " \"calc_method\": \"numba\",\n", + " \"netcdf_output_dir\": nc_output_dir,\n", + " }\n", + "\n", + " model = pws.Model(\n", + " [pws.PRMSChannel],\n", + " control=control,\n", + " parameters=params,\n", + " )\n", + "\n", + " model.run(finalize=True)\n", + " return nc_output_dir" + ] + }, + { + "cell_type": "markdown", + "id": "bf2cb631-91da-4488-962c-ed5c80378049", + "metadata": {}, + "source": [ + "### Serial run\n", + "We'll perform serial execution of the model over the parameter files." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8dc58917-2b29-474a-bb56-1d401c68b4ef", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "serial_output_dirs = []\n", + "serial_output_parent = nb_output_dir / \"serial\"\n", + "if serial_output_parent.exists():\n", + " shutil.rmtree(serial_output_parent)\n", + "serial_output_parent.mkdir()\n", + "for ff in param_files:\n", + " serial_output_dirs += [run_channel_model(serial_output_parent, ff)]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "81841620-fbc1-49cf-a699-ccad3b2db051", + "metadata": {}, + "outputs": [], + "source": [ + "serial_output_dirs" + ] + }, + { + "cell_type": "markdown", + "id": "e3ccefe5-2b6e-40e7-afee-fd6718a3af38", + "metadata": {}, + "source": [ + "### concurrent.futures run\n", + "For [concurrent futures](https://docs.python.org/3/library/concurrent.futures.html) to work in an interactive setting, we have to import the iterated/mapped function from a module, the function can not be defined in the notebook/interactive setting. We can easily just write the function out to file (ensure above that everything is in scope when imported, as noted in the function)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f2188410-5d3e-40b0-a539-b26292ed0e17", + "metadata": {}, + "outputs": [], + "source": [ + "# the name of the nb_output_dir can not be imported from so\n", + "# we'll create another directory to import from and delete it later\n", + "import inspect\n", + "\n", + "import_dir = pl.Path(\"param_ensemble_tmp\")\n", + "import_dir.mkdir(exist_ok=True)\n", + "with open(import_dir / \"run_channel_model_mod.py\", \"w\") as the_file:\n", + " the_file.write(inspect.getsource(run_channel_model))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7913ba4b-3a46-432d-abb5-a86a11f6e02c", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "import time\n", + "from concurrent.futures import ProcessPoolExecutor as PoolExecutor\n", + "from concurrent.futures import as_completed\n", + "from functools import partial\n", + "from param_ensemble_tmp.run_channel_model_mod import run_channel_model\n", + "\n", + "parallel_output_parent = pl.Path(\"parallel\")\n", + "if parallel_output_parent.exists():\n", + " shutil.rmtree(parallel_output_parent)\n", + "parallel_output_parent.mkdir()\n", + "\n", + "# you can set your choice of max_workers\n", + "with PoolExecutor(max_workers=4) as executor:\n", + " parallel_output_dirs = executor.map(\n", + " partial(run_channel_model, parallel_output_parent), param_files\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7cea730c-4f0a-4720-8dee-90c463a7a702", + "metadata": {}, + "outputs": [], + "source": [ + "shutil.rmtree(import_dir)" + ] + }, + { + "cell_type": "markdown", + "id": "34d68668-dac4-4f77-9917-ea72074f3dad", + "metadata": {}, + "source": [ + "### Check serial and parallel \n", + "See that these runs gave the same results." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dc4d7600-8488-44f4-81fb-893cb0876c29", + "metadata": {}, + "outputs": [], + "source": [ + "# check serial == parallel\n", + "serial_runs = sorted(serial_output_parent.glob(\"*\"))\n", + "parallel_runs = sorted(parallel_output_parent.glob(\"*\"))\n", + "\n", + "for ss, pp in zip(serial_runs, parallel_runs):\n", + " serial_files = sorted(ss.glob(\"*.nc\"))\n", + " parallel_files = sorted(pp.glob(\"*.nc\"))\n", + " for sf, pf in zip(serial_files, parallel_files):\n", + " s_ds = xr.open_dataset(sf)\n", + " p_ds = xr.open_dataset(pf)\n", + " xr.testing.assert_allclose(s_ds, p_ds)" + ] + }, + { + "cell_type": "markdown", + "id": "c30e9294-0682-46f7-bc63-5bb7c407dce8", + "metadata": {}, + "source": [ + "Can also check that the original parameters give the same results as in notebook `01_multi-process_models.ipynb`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cca4dc2f-c91b-475d-9021-c52261c0c31e", + "metadata": {}, + "outputs": [], + "source": [ + "run_005 = serial_output_parent / \"run_params_005\"\n", + "files_005 = sorted(run_005.glob(\"*.nc\"))\n", + "for ff in files_005:\n", + " if ff.name == \"PRMSChannel_budget.nc\":\n", + " continue\n", + " ds_005 = xr.open_dataset(ff)\n", + " ds_01 = xr.open_dataset(\n", + " pl.Path(\"01_multi-process_models/nhm_memory\") / ff.name\n", + " )\n", + " xr.testing.assert_allclose(ds_005, ds_01)" + ] + } + ], + "metadata": { + "language_info": { + "codemirror_mode": { + "name": "ipython" + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/param_edits.ipynb b/examples/param_edits.ipynb deleted file mode 100644 index 0c5bcc7e..00000000 --- a/examples/param_edits.ipynb +++ /dev/null @@ -1,320 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "f932e1c0-24ac-4be8-a886-8554cd1e4412", - "metadata": {}, - "source": [ - "# Parameter Editing\n", - "This notebook gives a quick recipe for how to do calibration or sensitivity analysis with pywatershed. \n", - "It is a design feature that parameters, more specifically the Parameter class, are read-only because \n", - "it should be the case that parameters supplied are used and the code is not opaquely modifying these.\n", - "\n", - "As a consequence, one has to make the Parameter class editable. Below this is accomplished by doing\n", - "`the_parameters.to_dd()` which returns a DatasetDict which is editable. One has to know something about\n", - "how DatasetDicts are constructed to edit effectively, information can be found in the [documentation](https://pywatershed.readthedocs.io/en/main/api/generated/pywatershed.base.DatasetDict.html#pywatershed.base.DatasetDict). \n", - "The edited DatasetDict can be made a Parameters object again by `Parameters(**param_dict.data)`, as shown below. \n", - "\n", - "Note this notebook needs notebooks 0-1 to have been run in advance." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "73380b42-3336-4c6c-99cb-f797841a132a", - "metadata": { - "editable": true, - "slideshow": { - "slide_type": "" - } - }, - "outputs": [], - "source": [ - "# auto-format the code in this notebook\n", - "%load_ext jupyter_black" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "769aa543-dfa5-43e9-84f6-0830ad0cb1d0", - "metadata": { - "editable": true, - "slideshow": { - "slide_type": "" - } - }, - "outputs": [], - "source": [ - "import pathlib as pl\n", - "from pprint import pprint\n", - "import shutil\n", - "\n", - "import numpy as np\n", - "import pywatershed as pws\n", - "import xarray as xr" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "01da8dca-622f-44c2-9041-8a43826d5d6f", - "metadata": {}, - "outputs": [], - "source": [ - "domain_dir = pws.constants.__pywatershed_root__ / \"data/drb_2yr\"\n", - "nb_output_dir = pl.Path(\"./param_edits\")\n", - "nb_output_dir.mkdir(exist_ok=True)\n", - "(nb_output_dir / \"params\").mkdir(exist_ok=True)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "8b08b96b-9bcc-4b1b-a2e9-231949b2dd0c", - "metadata": {}, - "outputs": [], - "source": [ - "# A legacy PRMS parameter file\n", - "params = pws.parameters.PrmsParameters.load(domain_dir / \"myparam.param\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "cd179039-a91d-4057-8e11-4ba49cb387bd", - "metadata": {}, - "outputs": [], - "source": [ - "param_list = []\n", - "param_files = []\n", - "for ii in range(11):\n", - " param_dict = params.to_dd() # copies by default\n", - " multiplier = ii * 0.05 + 0.75\n", - " print(\"multiplier = \", multiplier)\n", - " param_dict.data_vars[\"K_coef\"] *= multiplier\n", - " param_file_name = (\n", - " nb_output_dir / f\"params/perturbed_params_{str(ii).zfill(3)}.nc\"\n", - " )\n", - " param_files += [param_file_name]\n", - " # These could avoid export to netcdf4 if just using in memory\n", - " # could store in a list like: param_list.append(pws.Parameters(**param_dict.data))\n", - " pws.Parameters(**param_dict.data).to_netcdf(\n", - " param_file_name, use_xr=True\n", - " ) # using xarray, more work necessary for nc4 export" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "04cb7d5a-4979-4859-872c-6ccd6f6f1fc3", - "metadata": {}, - "outputs": [], - "source": [ - "# this provides a check that the values from file are what we expect\n", - "for ff in param_files:\n", - " # the problem arises on the read with xarray default decoding\n", - " # but we can just open the netcdf file as Parameters\n", - " # ds = xr.open_dataset(ff, decode_times=False, decode_timedelta=False)\n", - " # k_coef = ds[\"K_coef\"]\n", - " new_params = pws.parameters.PrmsParameters.from_netcdf(ff)\n", - " k_coef = new_params.data_vars[\"K_coef\"]\n", - " multipliers = k_coef / params.data_vars[\"K_coef\"]\n", - " assert (multipliers - multipliers[0] < 1e-15).all()\n", - " print(multipliers[0])" - ] - }, - { - "cell_type": "markdown", - "id": "cad8c37c-ddbc-4c0f-868e-c0f9fdce6e78", - "metadata": {}, - "source": [ - "## A helper function for running the parameters through the model" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "97cc4ba6-9f81-4c6a-8a1f-d2e2a169c986", - "metadata": {}, - "outputs": [], - "source": [ - "def run_channel_model(output_dir_parent, param_file):\n", - " # for concurrent.futures we have to write this function to file/module\n", - " # so we have to import things that wont be in scope in that case.\n", - " import numpy as np\n", - " import pywatershed as pws\n", - "\n", - " domain_dir = pws.constants.__pywatershed_root__ / \"data/drb_2yr\"\n", - "\n", - " params = pws.parameters.PrmsParameters.from_netcdf(param_file)\n", - "\n", - " param_id = param_file.with_suffix(\"\").name.split(\"_\")[-1]\n", - " nc_output_dir = output_dir_parent / f\"run_params_{param_id}\"\n", - " nc_output_dir.mkdir(parents=True, exist_ok=True)\n", - "\n", - " control = pws.Control.load(domain_dir / \"control.test\")\n", - " control.edit_end_time(np.datetime64(\"1979-07-01T00:00:00\"))\n", - " control.options = control.options | {\n", - " \"input_dir\": \"01_multi-process_models/nhm_memory\",\n", - " \"budget_type\": \"warn\",\n", - " \"calc_method\": \"numba\",\n", - " \"netcdf_output_dir\": nc_output_dir,\n", - " }\n", - "\n", - " model = pws.Model(\n", - " [pws.PRMSChannel],\n", - " control=control,\n", - " parameters=params,\n", - " )\n", - "\n", - " model.run(finalize=True)\n", - " return nc_output_dir" - ] - }, - { - "cell_type": "markdown", - "id": "bf2cb631-91da-4488-962c-ed5c80378049", - "metadata": {}, - "source": [ - "## Serial execution of the model over the parameter files" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "8dc58917-2b29-474a-bb56-1d401c68b4ef", - "metadata": {}, - "outputs": [], - "source": [ - "%%time\n", - "serial_output_dirs = []\n", - "serial_output_parent = nb_output_dir / \"serial\"\n", - "if serial_output_parent.exists():\n", - " shutil.rmtree(serial_output_parent)\n", - "serial_output_parent.mkdir()\n", - "for ff in param_files:\n", - " serial_output_dirs += [run_channel_model(serial_output_parent, ff)]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "81841620-fbc1-49cf-a699-ccad3b2db051", - "metadata": {}, - "outputs": [], - "source": [ - "serial_output_dirs" - ] - }, - { - "cell_type": "markdown", - "id": "e3ccefe5-2b6e-40e7-afee-fd6718a3af38", - "metadata": {}, - "source": [ - "## concurrent.futures approach\n", - "For concurrent futures to work in an interactive setting, we have to import the iterated/mapped function from a module, the function can not be defined in the notebook/interactive setting. We can easily just write the function out to file (ensure above that everything is in scope when imported, as noted in the function)." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f2188410-5d3e-40b0-a539-b26292ed0e17", - "metadata": {}, - "outputs": [], - "source": [ - "import inspect\n", - "\n", - "with open(\"param_edits/run_channel_model.py\", \"w\") as the_file:\n", - " the_file.write(inspect.getsource(run_channel_model))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7913ba4b-3a46-432d-abb5-a86a11f6e02c", - "metadata": {}, - "outputs": [], - "source": [ - "%%time\n", - "import time\n", - "from concurrent.futures import ProcessPoolExecutor as PoolExecutor\n", - "from concurrent.futures import as_completed\n", - "from functools import partial\n", - "from param_edits.run_channel_model import run_channel_model\n", - "\n", - "parallel_output_parent = nb_output_dir / \"parallel\"\n", - "if parallel_output_parent.exists():\n", - " shutil.rmtree(parallel_output_parent)\n", - "parallel_output_parent.mkdir()\n", - "\n", - "# you can set your choice of max_workers\n", - "with PoolExecutor(max_workers=11) as executor:\n", - " parallel_output_dirs = executor.map(\n", - " partial(run_channel_model, parallel_output_parent), param_files\n", - " )" - ] - }, - { - "cell_type": "markdown", - "id": "34d68668-dac4-4f77-9917-ea72074f3dad", - "metadata": {}, - "source": [ - "# Checks" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "dc4d7600-8488-44f4-81fb-893cb0876c29", - "metadata": {}, - "outputs": [], - "source": [ - "# check serial == parallel\n", - "serial_runs = sorted(serial_output_parent.glob(\"*\"))\n", - "parallel_runs = sorted(parallel_output_parent.glob(\"*\"))\n", - "\n", - "for ss, pp in zip(serial_runs, parallel_runs):\n", - " serial_files = sorted(ss.glob(\"*.nc\"))\n", - " parallel_files = sorted(pp.glob(\"*.nc\"))\n", - " for sf, pf in zip(serial_files, parallel_files):\n", - " s_ds = xr.open_dataset(sf)\n", - " p_ds = xr.open_dataset(pf)\n", - " xr.testing.assert_allclose(s_ds, p_ds)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "cca4dc2f-c91b-475d-9021-c52261c0c31e", - "metadata": {}, - "outputs": [], - "source": [ - "# check serial 5 is the same as in notebook 02\n", - "run_005 = serial_output_parent / \"run_params_005\"\n", - "files_005 = sorted(run_005.glob(\"*.nc\"))\n", - "for ff in files_005:\n", - " if ff.name == \"PRMSChannel_budget.nc\":\n", - " continue\n", - " ds_005 = xr.open_dataset(ff)\n", - " ds_02 = xr.open_dataset(\n", - " pl.Path(\"01_multi-process_models/nhm_memory\") / ff.name\n", - " )\n", - " xr.testing.assert_allclose(ds_005, ds_02)" - ] - } - ], - "metadata": { - "language_info": { - "codemirror_mode": { - "name": "ipython" - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -}