Skip to content

Commit

Permalink
canadian_timeslices added (#771)
Browse files Browse the repository at this point in the history
* canadian_timeslices added

* update canadian_timeslices

* update on da_run

* adding the function for NHD

* dropping pdb

* deleting timeslice
  • Loading branch information
AminTorabi-NOAA authored May 29, 2024
1 parent d783da6 commit 6aa48a5
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 9 deletions.
8 changes: 6 additions & 2 deletions src/troute-config/troute/config/compute_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ class StreamflowDA(BaseModel, extra='forbid'):
diffusive_streamflow_nudging: bool = False


class ReservoirPersistenceDA(BaseModel, extra='forbid'):
class ReservoirPersistenceDA(BaseModel, extra='ignore'):
# NOTE: mandatory for USGS reservoir DA, defaults to False
reservoir_persistence_usgs: bool = False
# NOTE: mandatory for USACE reservoir DA, defaults to False
reservoir_persistence_usace: bool = False
# NOTE: mandatory for USACE reservoir DA, defaults to False
reservoir_persistence_canada: bool = False

crosswalk_usgs_gage_field: str = "usgs_gage_id"
crosswalk_usace_gage_field: str = "usace_gage_id"
Expand All @@ -152,11 +154,13 @@ class ReservoirDA(BaseModel, extra='forbid'):
reservoir_parameter_file: Optional[FilePath] = None


class DataAssimilationParameters(BaseModel, extra='forbid'):
class DataAssimilationParameters(BaseModel, extra='ignore'):
# NOTE: required for streamflow nudging and/or USGS reservoir DA
usgs_timeslices_folder: Optional[DirectoryPath] = None
# NOTE: required for USACE reservoir DA
usace_timeslices_folder: Optional[DirectoryPath] = None
# NOTE: required for canada reservoir DA
canada_timeslices_folder: Optional[DirectoryPath] = None
# NOTE: required for reservoir DA - suggested value 24 (1 days)
timeslice_lookback_hours: int = 24

Expand Down
61 changes: 59 additions & 2 deletions src/troute-network/troute/DataAssimilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ def __init__(self, network, from_files, value_dict, da_run=[]):
self._last_obs_df = _reindex_link_to_lake_id(self._last_obs_df, network.link_lake_crosswalk)

self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run)

if 'canada_timeslice_files' in da_run:
self._canada_df = _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run)
def update_after_compute(self, run_results, time_increment):
'''
Function to update data assimilation object after running routing module.
Expand Down Expand Up @@ -262,7 +263,8 @@ def update_for_next_loop(self, network, da_run,):

if streamflow_da_parameters.get('streamflow_nudging', False):
self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run)

if 'canada_timeslice_files' in da_run:
self._canada_df = _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run)

class PersistenceDA(AbstractDA):
"""
Expand Down Expand Up @@ -991,6 +993,61 @@ def _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_

return usgs_df

def _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run):
'''
Function for reading USGS timeslice files and creating a dataframe
of USGS gage observations. This dataframe is used for streamflow
nudging and can be used for constructing USGS reservoir dataframes.
Arguments:
----------
- data_assimilation_parameters (dict): user input data re data assimilation
- streamflow_da_parameters (dict): user input data re streamflow nudging
- run_parameters (dict): user input data re subset of compute configuration
- network (Object): network object created from abstract class
- da_run (list): list of data assimilation files separated by for loop chunks
Returns:
--------
- usgs_df (DataFrame): dataframe of USGS gage observations
'''
canada_timeslices_folder = data_assimilation_parameters.get("canada_timeslices_folder", None)
#lastobs_file = streamflow_da_parameters.get("wrf_hydro_lastobs_file", None)
lastobs_start = data_assimilation_parameters.get("wrf_hydro_lastobs_lead_time_relative_to_simulation_start_time",0)
lastobs_type = data_assimilation_parameters.get("wrf_lastobs_type", "error-based")
crosswalk_file = streamflow_da_parameters.get("gage_segID_crosswalk_file", None)
crosswalk_gage_field = streamflow_da_parameters.get('crosswalk_gage_field','gages')
crosswalk_segID_field = streamflow_da_parameters.get('crosswalk_segID_field','link')
da_decay_coefficient = data_assimilation_parameters.get("da_decay_coefficient",120)
qc_threshold = data_assimilation_parameters.get("qc_threshold",1)
interpolation_limit = data_assimilation_parameters.get("interpolation_limit_min",59)

# TODO: join timeslice folder and files into complete path upstream

canada_files = [canada_timeslices_folder.joinpath(f) for f in da_run['canada_timeslice_files']]


if canada_files:
canada_df = (
nhd_io.get_obs_from_timeslices(
network.link_gage_df,
crosswalk_gage_field,
crosswalk_segID_field,
canada_files,
qc_threshold,
interpolation_limit,
run_parameters.get("dt"),
network.t0,
run_parameters.get("cpu_pool", None)
).
loc[network.link_gage_df.index]
)

else:
canada_df = pd.DataFrame()

return canada_df

def _create_reservoir_df(data_assimilation_parameters, reservoir_da_parameters, streamflow_da_parameters, run_parameters, network, da_run, lake_gage_crosswalk, res_source):
'''
Function for reading USGS/USACE timeslice files and creating a dataframe
Expand Down
29 changes: 24 additions & 5 deletions src/troute-network/troute/hyfeature_network_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,31 @@ def build_da_sets(da_params, run_sets, t0):
"usace_timeslices_folder",
None
)

canada_timeslices_folder = da_params.get(
"canada_timeslices_folder",
None
)
# User-specified DA ON/OFF preferences
usace_da = False
usgs_da = False
canada_da = False
reservoir_persistence_da = da_params.get('reservoir_da', {}).get('reservoir_persistence_da', False)
if reservoir_persistence_da:
usgs_da = reservoir_persistence_da.get('reservoir_persistence_usgs', False)
usace_da = reservoir_persistence_da.get('reservoir_persistence_usace', False)

canada_da = reservoir_persistence_da.get('reservoir_persistence_canada', False)

nudging = False
streamflow_da = da_params.get('streamflow_da', False)
if streamflow_da:
nudging = streamflow_da.get('streamflow_nudging', False)

if not usgs_da and not usace_da and not nudging:
if not usgs_da and not usace_da and not canada_da and not nudging:
# if all DA capabilities are OFF, return empty dictionary
da_sets = [{} for _ in run_sets]

# if no user-input timeslice folders, a list of empty dictionaries
elif not usgs_timeslices_folder and not usace_timeslices_folder:
elif not usgs_timeslices_folder and not usace_timeslices_folder and not canada_timeslices_folder:
# if no timeslice folders, return empty dictionary
da_sets = [{} for _ in run_sets]

Expand All @@ -79,7 +84,8 @@ def build_da_sets(da_params, run_sets, t0):
usgs_timeslices_folder = pathlib.Path(usgs_timeslices_folder)
if usace_timeslices_folder:
usace_timeslices_folder = pathlib.Path(usace_timeslices_folder)

if canada_timeslices_folder:
canada_timeslices_folder = pathlib.Path(canada_timeslices_folder)
# the number of timeslice files appended to the front- and back-ends
# of the TimeSlice file interpolation stack
pad_hours = da_params.get("timeslice_lookback_hours",0)
Expand Down Expand Up @@ -132,6 +138,19 @@ def build_da_sets(da_params, run_sets, t0):
# Add available TimeSlices to da_sets list
da_sets[i]['usace_timeslice_files'] = filenames_usace

# identify available USGS TimeSlices in run set i
if canada_timeslices_folder and canada_da:
filenames_canada = (timestamps.strftime('%Y-%m-%d_%H:%M:%S')
+ '.15min.wscTimeSlice.ncdf').to_list()

# identify available USGS TimeSlices
filenames_canada = _check_timeslice_exists(
filenames_canada,
canada_timeslices_folder
)
# Add available TimeSlices to da_sets list
da_sets[i]['canada_timeslice_files'] = filenames_canada

# reset initialization time for loop set i+1
t0 = run_sets[i]['final_timestamp']

Expand Down
2 changes: 2 additions & 0 deletions test/LowerColorado_TX/test_AnA_V4_NHD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ compute_parameters:
#----------
usgs_timeslices_folder : usgs_TimeSlice/
usace_timeslices_folder : usace_TimeSlice/
canada_timeslices_folder : Canadian_timeslices/
timeslice_lookback_hours : 48
qc_threshold : 1
streamflow_da:
Expand All @@ -97,6 +98,7 @@ compute_parameters:
#----------
reservoir_persistence_usgs : False
reservoir_persistence_usace : False
reservoir_persistence_canada : True
reservoir_rfc_da:
#----------
reservoir_rfc_forecasts : False
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
2 changes: 2 additions & 0 deletions test/LowerColorado_TX_v4/test_AnA_V4_HYFeature.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ compute_parameters:
#----------
usgs_timeslices_folder : usgs_timeslices/
usace_timeslices_folder : usace_timeslices/
canada_timeslices_folder : Canadian_timeslices/
streamflow_da:
#----------
streamflow_nudging : True
Expand All @@ -110,6 +111,7 @@ compute_parameters:
#----------
reservoir_persistence_usgs : True
reservoir_persistence_usace : True
reservoir_persistence_canada : True
reservoir_rfc_da:
#----------
reservoir_rfc_forecasts : True
Expand Down

0 comments on commit 6aa48a5

Please sign in to comment.