Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

canadian_timeslices added #771

Merged
merged 6 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/troute-config/troute/config/compute_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ class StreamflowDA(BaseModel, extra='forbid'):
diffusive_streamflow_nudging: bool = False


class ReservoirPersistenceDA(BaseModel, extra='forbid'):
class ReservoirPersistenceDA(BaseModel, extra='ignore'):
# NOTE: mandatory for USGS reservoir DA, defaults to False
reservoir_persistence_usgs: bool = False
# NOTE: mandatory for USACE reservoir DA, defaults to False
reservoir_persistence_usace: bool = False
# NOTE: mandatory for USACE reservoir DA, defaults to False
reservoir_persistence_canada: bool = False

crosswalk_usgs_gage_field: str = "usgs_gage_id"
crosswalk_usace_gage_field: str = "usace_gage_id"
Expand All @@ -152,11 +154,13 @@ class ReservoirDA(BaseModel, extra='forbid'):
reservoir_parameter_file: Optional[FilePath] = None


class DataAssimilationParameters(BaseModel, extra='forbid'):
class DataAssimilationParameters(BaseModel, extra='ignore'):
# NOTE: required for streamflow nudging and/or USGS reservoir DA
usgs_timeslices_folder: Optional[DirectoryPath] = None
# NOTE: required for USACE reservoir DA
usace_timeslices_folder: Optional[DirectoryPath] = None
# NOTE: required for canada reservoir DA
canada_timeslices_folder: Optional[DirectoryPath] = None
# NOTE: required for reservoir DA - suggested value 24 (1 days)
timeslice_lookback_hours: int = 24

Expand Down
61 changes: 59 additions & 2 deletions src/troute-network/troute/DataAssimilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ def __init__(self, network, from_files, value_dict, da_run=[]):
self._last_obs_df = _reindex_link_to_lake_id(self._last_obs_df, network.link_lake_crosswalk)

self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run)

if 'canada_timeslice_files' in da_run:
self._canada_df = _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run)
def update_after_compute(self, run_results, time_increment):
'''
Function to update data assimilation object after running routing module.
Expand Down Expand Up @@ -262,7 +263,8 @@ def update_for_next_loop(self, network, da_run,):

if streamflow_da_parameters.get('streamflow_nudging', False):
self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run)

if 'canada_timeslice_files' in da_run:
self._canada_df = _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run)

class PersistenceDA(AbstractDA):
"""
Expand Down Expand Up @@ -991,6 +993,61 @@ def _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_

return usgs_df

def _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run):
'''
Function for reading USGS timeslice files and creating a dataframe
of USGS gage observations. This dataframe is used for streamflow
nudging and can be used for constructing USGS reservoir dataframes.

Arguments:
----------
- data_assimilation_parameters (dict): user input data re data assimilation
- streamflow_da_parameters (dict): user input data re streamflow nudging
- run_parameters (dict): user input data re subset of compute configuration
- network (Object): network object created from abstract class
- da_run (list): list of data assimilation files separated by for loop chunks

Returns:
--------
- usgs_df (DataFrame): dataframe of USGS gage observations
'''
canada_timeslices_folder = data_assimilation_parameters.get("canada_timeslices_folder", None)
#lastobs_file = streamflow_da_parameters.get("wrf_hydro_lastobs_file", None)
lastobs_start = data_assimilation_parameters.get("wrf_hydro_lastobs_lead_time_relative_to_simulation_start_time",0)
lastobs_type = data_assimilation_parameters.get("wrf_lastobs_type", "error-based")
crosswalk_file = streamflow_da_parameters.get("gage_segID_crosswalk_file", None)
crosswalk_gage_field = streamflow_da_parameters.get('crosswalk_gage_field','gages')
crosswalk_segID_field = streamflow_da_parameters.get('crosswalk_segID_field','link')
da_decay_coefficient = data_assimilation_parameters.get("da_decay_coefficient",120)
qc_threshold = data_assimilation_parameters.get("qc_threshold",1)
interpolation_limit = data_assimilation_parameters.get("interpolation_limit_min",59)

# TODO: join timeslice folder and files into complete path upstream

canada_files = [canada_timeslices_folder.joinpath(f) for f in da_run['canada_timeslice_files']]


if canada_files:
canada_df = (
nhd_io.get_obs_from_timeslices(
network.link_gage_df,
crosswalk_gage_field,
crosswalk_segID_field,
canada_files,
qc_threshold,
interpolation_limit,
run_parameters.get("dt"),
network.t0,
run_parameters.get("cpu_pool", None)
).
loc[network.link_gage_df.index]
)

else:
canada_df = pd.DataFrame()

return canada_df

def _create_reservoir_df(data_assimilation_parameters, reservoir_da_parameters, streamflow_da_parameters, run_parameters, network, da_run, lake_gage_crosswalk, res_source):
'''
Function for reading USGS/USACE timeslice files and creating a dataframe
Expand Down
29 changes: 24 additions & 5 deletions src/troute-network/troute/hyfeature_network_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,31 @@ def build_da_sets(da_params, run_sets, t0):
"usace_timeslices_folder",
None
)

canada_timeslices_folder = da_params.get(
"canada_timeslices_folder",
None
)
# User-specified DA ON/OFF preferences
usace_da = False
usgs_da = False
canada_da = False
reservoir_persistence_da = da_params.get('reservoir_da', {}).get('reservoir_persistence_da', False)
if reservoir_persistence_da:
usgs_da = reservoir_persistence_da.get('reservoir_persistence_usgs', False)
usace_da = reservoir_persistence_da.get('reservoir_persistence_usace', False)

canada_da = reservoir_persistence_da.get('reservoir_persistence_canada', False)

nudging = False
streamflow_da = da_params.get('streamflow_da', False)
if streamflow_da:
nudging = streamflow_da.get('streamflow_nudging', False)

if not usgs_da and not usace_da and not nudging:
if not usgs_da and not usace_da and not canada_da and not nudging:
# if all DA capabilities are OFF, return empty dictionary
da_sets = [{} for _ in run_sets]

# if no user-input timeslice folders, a list of empty dictionaries
elif not usgs_timeslices_folder and not usace_timeslices_folder:
elif not usgs_timeslices_folder and not usace_timeslices_folder and not canada_timeslices_folder:
# if no timeslice folders, return empty dictionary
da_sets = [{} for _ in run_sets]

Expand All @@ -79,7 +84,8 @@ def build_da_sets(da_params, run_sets, t0):
usgs_timeslices_folder = pathlib.Path(usgs_timeslices_folder)
if usace_timeslices_folder:
usace_timeslices_folder = pathlib.Path(usace_timeslices_folder)

if canada_timeslices_folder:
canada_timeslices_folder = pathlib.Path(canada_timeslices_folder)
# the number of timeslice files appended to the front- and back-ends
# of the TimeSlice file interpolation stack
pad_hours = da_params.get("timeslice_lookback_hours",0)
Expand Down Expand Up @@ -132,6 +138,19 @@ def build_da_sets(da_params, run_sets, t0):
# Add available TimeSlices to da_sets list
da_sets[i]['usace_timeslice_files'] = filenames_usace

# identify available USGS TimeSlices in run set i
if canada_timeslices_folder and canada_da:
filenames_canada = (timestamps.strftime('%Y-%m-%d_%H:%M:%S')
+ '.15min.wscTimeSlice.ncdf').to_list()

# identify available USGS TimeSlices
filenames_canada = _check_timeslice_exists(
filenames_canada,
canada_timeslices_folder
)
# Add available TimeSlices to da_sets list
da_sets[i]['canada_timeslice_files'] = filenames_canada

# reset initialization time for loop set i+1
t0 = run_sets[i]['final_timestamp']

Expand Down
2 changes: 2 additions & 0 deletions test/LowerColorado_TX/test_AnA_V4_NHD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ compute_parameters:
#----------
usgs_timeslices_folder : usgs_TimeSlice/
usace_timeslices_folder : usace_TimeSlice/
canada_timeslices_folder : Canadian_timeslices/
timeslice_lookback_hours : 48
qc_threshold : 1
streamflow_da:
Expand All @@ -97,6 +98,7 @@ compute_parameters:
#----------
reservoir_persistence_usgs : False
reservoir_persistence_usace : False
reservoir_persistence_canada : True
reservoir_rfc_da:
#----------
reservoir_rfc_forecasts : False
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
2 changes: 2 additions & 0 deletions test/LowerColorado_TX_v4/test_AnA_V4_HYFeature.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ compute_parameters:
#----------
usgs_timeslices_folder : usgs_timeslices/
usace_timeslices_folder : usace_timeslices/
canada_timeslices_folder : Canadian_timeslices/
streamflow_da:
#----------
streamflow_nudging : True
Expand All @@ -110,6 +111,7 @@ compute_parameters:
#----------
reservoir_persistence_usgs : True
reservoir_persistence_usace : True
reservoir_persistence_canada : True
reservoir_rfc_da:
#----------
reservoir_rfc_forecasts : True
Expand Down
Loading