Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added functionality to output to parquet #761

Merged
merged 12 commits into from
Jul 26, 2024
12 changes: 12 additions & 0 deletions src/troute-config/troute/config/output_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

streamOutput_allowedTypes = Literal['.csv', '.nc', '.pkl']


class OutputParameters(BaseModel):
chanobs_output: Optional["ChanobsOutput"] = None
# NOTE: this appears to be optional. See nwm_routing/input.py ~:477
csv_output: Optional["CsvOutput"] = None
# NOTE: this appears to be optional. See nwm_routing/input.py ~:496
parquet_output: Optional["ParquetOutput"] = None
# NOTE: this appears to be optional. See nwm_routing/input.py ~:563
chrtout_output: Optional["ChrtoutOutput"] = None
lite_restart: Optional["LiteRestart"] = None
# NOTE: this appears to be optional. See nwm_routing/input.py ~:520
Expand Down Expand Up @@ -46,6 +49,14 @@ class CsvOutput(BaseModel):
csv_output_segments: Optional[List[str]] = None


class ParquetOutput(BaseModel):
# NOTE: required if writing results to parquet
parquet_output_folder: Optional[DirectoryPath] = None
parquet_output_segments: Optional[List[str]] = None
configuration: Optional[str] = None
prefix_ids: Optional[str] = None


class ChrtoutOutput(BaseModel):
# NOTE: mandatory if writing results to CHRTOUT.
wrf_hydro_channel_output_source_folder: Optional[DirectoryPath] = None
Expand Down Expand Up @@ -79,6 +90,7 @@ class WrfHydroParityCheck(BaseModel):
class ParityCheckCompareFileSet(BaseModel):
validation_files: List[FilePath]


class StreamOutput(BaseModel):
# NOTE: required if writing StreamOutput files
stream_output_directory: Optional[DirectoryPath] = None
Expand Down
19 changes: 19 additions & 0 deletions src/troute-nwm/src/nwm_routing/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,25 @@ def check_inputs(

else:
LOG.debug('No csv output folder specified. Results will NOT be written to csv')

if output_parameters.get('parquet_output', None):

if output_parameters['parquet_output'].get('parquet_output_folder', None):

_does_path_exist(
'parquet_output_folder',
output_parameters['parquet_output']['parquet_output_folder']
)

output_segs = output_parameters['parquet_output'].get('parquet_output_segments', None)
if not output_segs:
LOG.debug('No parquet output segments specified. Results for all domain segments will be written')

else:
LOG.debug('No parquet output folder specified. Results will NOT be written in parquet format')

else:
LOG.debug('No parquet output folder specified. Results will NOT be written in parquet format')

if output_parameters.get('chrtout_output', None):

Expand Down
168 changes: 130 additions & 38 deletions src/troute-nwm/src/nwm_routing/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,30 +43,72 @@ def _reindex_lake_to_link_id(target_df, crosswalk):

# (re) set the target_df index
target_df.set_index(idxs, inplace = True)

return target_df


def _parquet_output_format_converter(df, start_datetime, dt, configuration, prefix_ids):
'''
Utility function for convert flowveldepth dataframe
to a timeseries and to match parquet input format
of TEEHR

Arguments:
----------
- df (DataFrame): Data frame to be converted
- start_datetime: Date time from restart parameters
- dt: Time step
- configuration: configuration (for instance- short_range, medium_range)

Returns:
--------
- timeseries_df (DataFrame): Converted timeseries data frame
'''

df.index.name = 'location_id'
df.reset_index(inplace=True)
timeseries_df = df.melt(id_vars=['location_id'], var_name='var')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you use value_vars=[ 'q', 'v', 'd' ] as a kwarg to melt, you might have an easier time extracting from un-pivoted table?

The below method seems like there should be a better way besides casting to string, manipulating the string, and recasting to numeric/datetime types.

I'm not sure exactly what the df looks like that is trying to be manipulated at this point, but I would try to consider a different method(s) for extracting the needed data from it.

If for some reason this is the only way, then please comment this implementation to describe what the state of the df is and why this is the way it needs to be manipulated.

Copy link
Contributor Author

@karnesh karnesh May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into options and this seems to be the only way. I cannot use value_vars=[ 'q', 'v', 'd' ] as a kwarg to melt because of the format of df column names. Also, the column names needed to be manipulated as strings. Please look at the screenshot below.

parquet t-route

Each column name consists of a time step and a variable name (q, v or d) in string format. The time steps values are used to get the value_time.

timeseries_df['var'] = timeseries_df['var'].astype('string')
timeseries_df[['timestep', 'variable']] = timeseries_df['var'].str.strip("()").str.split(",", n=1, expand=True)
timeseries_df['variable'] = timeseries_df['variable'].str.strip().str.replace("'", "")
timeseries_df['timestep'] = timeseries_df['timestep'].astype('int')
timeseries_df['value_time'] = (start_datetime + pd.to_timedelta(timeseries_df['timestep'] * dt, unit='s'))
variable_to_name_map = {"q": "streamflow", "d": "depth", "v": "velocity"}
timeseries_df["variable_name"] = timeseries_df["variable"].map(variable_to_name_map)
timeseries_df.drop(['var', 'timestep', 'variable'], axis=1, inplace=True)
timeseries_df['configuration'] = configuration
variable_to_units_map = {"streamflow": "m3/s", "velocity": "m/s", "depth": "m"}
timeseries_df['units'] = timeseries_df['variable_name'].map(variable_to_units_map)
timeseries_df['reference_time'] = start_datetime.date()
timeseries_df['location_id'] = timeseries_df['location_id'].astype('string')
timeseries_df['location_id'] = prefix_ids + '-' + timeseries_df['location_id']
timeseries_df['value'] = timeseries_df['value'].astype('double')
timeseries_df['reference_time'] = timeseries_df['reference_time'].astype('datetime64[us]')
timeseries_df['value_time'] = timeseries_df['value_time'].astype('datetime64[us]')

return timeseries_df


def nwm_output_generator(
run,
results,
supernetwork_parameters,
output_parameters,
parity_parameters,
restart_parameters,
parity_set,
qts_subdivisions,
return_courant,
cpu_pool,
waterbodies_df,
waterbody_types_df,
duplicate_ids_df,
data_assimilation_parameters=False,
lastobs_df = None,
link_gage_df = None,
link_lake_crosswalk = None,
logFileName='NONE'
run,
results,
supernetwork_parameters,
output_parameters,
parity_parameters,
restart_parameters,
parity_set,
qts_subdivisions,
return_courant,
cpu_pool,
waterbodies_df,
waterbody_types_df,
duplicate_ids_df,
data_assimilation_parameters=False,
lastobs_df=None,
link_gage_df=None,
link_lake_crosswalk=None,
logFileName='NONE'
):

dt = run.get("dt")
nts = run.get("nts")
t0 = run.get("t0")
Expand Down Expand Up @@ -111,6 +153,8 @@ def nwm_output_generator(

csv_output = output_parameters.get("csv_output", None)
csv_output_folder = None
parquet_output = output_parameters.get("parquet_output", None)
parquet_output_folder = None
rsrto = output_parameters.get("hydro_rst_output", None)
chrto = output_parameters.get("chrtout_output", None)
test = output_parameters.get("test_output", None)
Expand All @@ -125,8 +169,14 @@ def nwm_output_generator(
)
csv_output_segments = csv_output.get("csv_output_segments", None)

if csv_output_folder or rsrto or chrto or chano or test or wbdyo or stream_output:

if parquet_output:
parquet_output_folder = output_parameters["parquet_output"].get(
"parquet_output_folder", None
)
parquet_output_segments = parquet_output.get("parquet_output_segments", None)

if csv_output_folder or parquet_output_folder or rsrto or chrto or chano or test or wbdyo or stream_output:

start = time.time()
qvd_columns = pd.MultiIndex.from_product(
[range(nts), ["q", "v", "d"]]
Expand Down Expand Up @@ -195,7 +245,6 @@ def nwm_output_generator(

# replace waterbody lake_ids with outlet link ids
if link_lake_crosswalk:

# (re) set the flowveldepth index
courant.set_index(fvdidxs, inplace = True)

Expand Down Expand Up @@ -249,7 +298,7 @@ def nwm_output_generator(
output_waterbody_types_df = waterbody_types_df
LOG.info("- writing t-route flow results to LAKEOUT files")
start = time.time()
for i in range(i_df.shape[1]):
for i in range(i_df.shape[1]):
nhd_io.write_waterbody_netcdf(
wbdyo,
i_df.iloc[:,[i]],
Expand All @@ -270,7 +319,7 @@ def nwm_output_generator(
preRunLog.write("Output of waterbody files into folder: "+str(wbdyo)+"\n")
preRunLog.write("-----\n")
preRunLog.close()

LOG.debug("writing LAKEOUT files took a total time of %s seconds." % (time.time() - start))

if rsrto:
Expand Down Expand Up @@ -316,23 +365,23 @@ def nwm_output_generator(

else:
LOG.critical('Did not find any restart files in wrf_hydro_channel_restart_source_directory. Aborting restart write sequence.')

else:
LOG.critical('wrf_hydro_channel_restart_source_directory not specified in configuration file. Aborting restart write sequence.')

LOG.debug("writing restart files took %s seconds." % (time.time() - start))

if chrto:

LOG.info("- writing t-route flow results to CHRTOUT files")
start = time.time()

chrtout_read_folder = chrto.get(
"wrf_hydro_channel_output_source_folder", None
)

if chrtout_read_folder:

if chrtout_read_folder:

chrtout_files = sorted(
Path(chrtout_read_folder) / f for f in run["qlat_files"]
)
Expand Down Expand Up @@ -389,7 +438,51 @@ def nwm_output_generator(
LOG.debug("writing CSV file took %s seconds." % (time.time() - start))
# usgs_df_filtered = usgs_df[usgs_df.index.isin(csv_output_segments)]
# usgs_df_filtered.to_csv(output_path.joinpath("usgs_df.csv"))


if parquet_output_folder:

LOG.info("- writing flow, velocity, and depth results to .parquet")
start = time.time()

# create filenames
# TO DO: create more descriptive filenames
if supernetwork_parameters.get("title_string", None):
filename_fvd = (
"flowveldepth_" + supernetwork_parameters["title_string"] + ".parquet"
)
filename_courant = (
"courant_" + supernetwork_parameters["title_string"] + ".parquet"
)
else:
run_time_stamp = datetime.now().isoformat()
filename_fvd = "flowveldepth_" + run_time_stamp + ".parquet"
filename_courant = "courant_" + run_time_stamp + ".parquet"

output_path = Path(parquet_output_folder).resolve()

# no parquet_output_segments are specified, then write results for all segments
if not parquet_output_segments:
parquet_output_segments = flowveldepth.index

flowveldepth = flowveldepth.sort_index()
configuration = output_parameters["parquet_output"].get("configuration")
prefix_ids = output_parameters["parquet_output"].get("prefix_ids")
timeseries_df = _parquet_output_format_converter(flowveldepth, restart_parameters.get("start_datetime"), dt,
configuration, prefix_ids)

parquet_output_segments_str = [prefix_ids + '-' + str(segment) for segment in parquet_output_segments]
timeseries_df.loc[timeseries_df['location_id'].isin(parquet_output_segments_str)].to_parquet(
output_path.joinpath(filename_fvd), allow_truncated_timestamps=True)

if return_courant:
courant = courant.sort_index()
timeseries_courant = _parquet_output_format_converter(courant, restart_parameters.get("start_datetime"), dt,
configuration, prefix_ids)
timeseries_courant.loc[timeseries_courant['location_id'].isin(parquet_output_segments_str)].to_parquet(
output_path.joinpath(filename_courant), allow_truncated_timestamps=True)

LOG.debug("writing parquet file took %s seconds." % (time.time() - start))

if chano:

LOG.info("- writing t-route flow results at gage locations to CHANOBS file")
Expand All @@ -404,7 +497,7 @@ def nwm_output_generator(
chano['chanobs_filepath'] = str(chano['chanobs_filepath'])

nhd_io.write_chanobs(
Path(chano['chanobs_output_directory'] + chano['chanobs_filepath']),
Path(chano['chanobs_output_directory'] + chano['chanobs_filepath']),
flowveldepth,
link_gage_df,
t0,
Expand All @@ -424,7 +517,7 @@ def nwm_output_generator(

LOG.debug("writing flow data to CHANOBS took %s seconds." % (time.time() - start))

if lastobso:
if lastobso:
# Write out LastObs as netcdf when using main_v04 or troute_model with HYfeature.
# This is only needed if 1) streamflow nudging is ON and 2) a lastobs output
# folder is provided by the user.
Expand Down Expand Up @@ -461,24 +554,23 @@ def nwm_output_generator(

LOG.debug("writing lastobs files took %s seconds." % (time.time() - start))


# if 'flowveldepth' in locals():
# LOG.debug(flowveldepth)
# LOG.debug(flowveldepth)

LOG.debug("output complete in %s seconds." % (time.time() - start_time))

################### Parity Check

if parity_set:

LOG.info(
"conducting parity check, comparing WRF Hydro results against t-route results"
)

start_time = time.time()

parity_check(
parity_set, results,
)

LOG.debug("parity check complete in %s seconds." % (time.time() - start_time))
LOG.debug("parity check complete in %s seconds." % (time.time() - start_time))
7 changes: 6 additions & 1 deletion test/LowerColorado_TX/test_AnA_V4_NHD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ compute_parameters:
reservoir_rfc_forecasts_lookback_hours : 48

#--------------------------------------------------------------------------------
# output_parameters:
output_parameters:
# #----------
# test_output: output/lcr_flowveldepth.pkl
# lite_restart:
Expand All @@ -124,5 +124,10 @@ compute_parameters:
# stream_output_time: 1 #[hr]
# stream_output_type: '.nc' #please select only between netcdf '.nc' or '.csv' or '.pkl'
# stream_output_internal_frequency: 30 #[min] it should be order of 5 minutes. For instance if you want to output every hour put 60
parquet_output:
#---------
parquet_output_folder: output/
configuration: short_range
prefix_ids: nex


Loading