diff --git a/src/troute-config/troute/config/output_parameters.py b/src/troute-config/troute/config/output_parameters.py index 61119c109..b367582d2 100644 --- a/src/troute-config/troute/config/output_parameters.py +++ b/src/troute-config/troute/config/output_parameters.py @@ -7,11 +7,14 @@ streamOutput_allowedTypes = Literal['.csv', '.nc', '.pkl'] + class OutputParameters(BaseModel): chanobs_output: Optional["ChanobsOutput"] = None # NOTE: this appears to be optional. See nwm_routing/input.py ~:477 csv_output: Optional["CsvOutput"] = None # NOTE: this appears to be optional. See nwm_routing/input.py ~:496 + parquet_output: Optional["ParquetOutput"] = None + # NOTE: this appears to be optional. See nwm_routing/input.py ~:563 chrtout_output: Optional["ChrtoutOutput"] = None lite_restart: Optional["LiteRestart"] = None # NOTE: this appears to be optional. See nwm_routing/input.py ~:520 @@ -46,6 +49,14 @@ class CsvOutput(BaseModel): csv_output_segments: Optional[List[str]] = None +class ParquetOutput(BaseModel): + # NOTE: required if writing results to parquet + parquet_output_folder: Optional[DirectoryPath] = None + parquet_output_segments: Optional[List[str]] = None + configuration: Optional[str] = None + prefix_ids: Optional[str] = None + + class ChrtoutOutput(BaseModel): # NOTE: mandatory if writing results to CHRTOUT. wrf_hydro_channel_output_source_folder: Optional[DirectoryPath] = None @@ -79,6 +90,7 @@ class WrfHydroParityCheck(BaseModel): class ParityCheckCompareFileSet(BaseModel): validation_files: List[FilePath] + class StreamOutput(BaseModel): # NOTE: required if writing StreamOutput files stream_output_directory: Optional[DirectoryPath] = None diff --git a/src/troute-nwm/src/nwm_routing/input.py b/src/troute-nwm/src/nwm_routing/input.py index 8a5f55f24..3e1ba4170 100644 --- a/src/troute-nwm/src/nwm_routing/input.py +++ b/src/troute-nwm/src/nwm_routing/input.py @@ -559,6 +559,25 @@ def check_inputs( else: LOG.debug('No csv output folder specified. Results will NOT be written to csv') + + if output_parameters.get('parquet_output', None): + + if output_parameters['parquet_output'].get('parquet_output_folder', None): + + _does_path_exist( + 'parquet_output_folder', + output_parameters['parquet_output']['parquet_output_folder'] + ) + + output_segs = output_parameters['parquet_output'].get('parquet_output_segments', None) + if not output_segs: + LOG.debug('No parquet output segments specified. Results for all domain segments will be written') + + else: + LOG.debug('No parquet output folder specified. Results will NOT be written in parquet format') + + else: + LOG.debug('No parquet output folder specified. Results will NOT be written in parquet format') if output_parameters.get('chrtout_output', None): diff --git a/src/troute-nwm/src/nwm_routing/output.py b/src/troute-nwm/src/nwm_routing/output.py index 2404e7683..08bef8c78 100644 --- a/src/troute-nwm/src/nwm_routing/output.py +++ b/src/troute-nwm/src/nwm_routing/output.py @@ -43,30 +43,72 @@ def _reindex_lake_to_link_id(target_df, crosswalk): # (re) set the target_df index target_df.set_index(idxs, inplace = True) - + return target_df + +def _parquet_output_format_converter(df, start_datetime, dt, configuration, prefix_ids): + ''' + Utility function for convert flowveldepth dataframe + to a timeseries and to match parquet input format + of TEEHR + + Arguments: + ---------- + - df (DataFrame): Data frame to be converted + - start_datetime: Date time from restart parameters + - dt: Time step + - configuration: configuration (for instance- short_range, medium_range) + + Returns: + -------- + - timeseries_df (DataFrame): Converted timeseries data frame + ''' + + df.index.name = 'location_id' + df.reset_index(inplace=True) + timeseries_df = df.melt(id_vars=['location_id'], var_name='var') + timeseries_df['var'] = timeseries_df['var'].astype('string') + timeseries_df[['timestep', 'variable']] = timeseries_df['var'].str.strip("()").str.split(",", n=1, expand=True) + timeseries_df['variable'] = timeseries_df['variable'].str.strip().str.replace("'", "") + timeseries_df['timestep'] = timeseries_df['timestep'].astype('int') + timeseries_df['value_time'] = (start_datetime + pd.to_timedelta(timeseries_df['timestep'] * dt, unit='s')) + variable_to_name_map = {"q": "streamflow", "d": "depth", "v": "velocity"} + timeseries_df["variable_name"] = timeseries_df["variable"].map(variable_to_name_map) + timeseries_df.drop(['var', 'timestep', 'variable'], axis=1, inplace=True) + timeseries_df['configuration'] = configuration + variable_to_units_map = {"streamflow": "m3/s", "velocity": "m/s", "depth": "m"} + timeseries_df['units'] = timeseries_df['variable_name'].map(variable_to_units_map) + timeseries_df['reference_time'] = start_datetime.date() + timeseries_df['location_id'] = timeseries_df['location_id'].astype('string') + timeseries_df['location_id'] = prefix_ids + '-' + timeseries_df['location_id'] + timeseries_df['value'] = timeseries_df['value'].astype('double') + timeseries_df['reference_time'] = timeseries_df['reference_time'].astype('datetime64[us]') + timeseries_df['value_time'] = timeseries_df['value_time'].astype('datetime64[us]') + + return timeseries_df + + def nwm_output_generator( - run, - results, - supernetwork_parameters, - output_parameters, - parity_parameters, - restart_parameters, - parity_set, - qts_subdivisions, - return_courant, - cpu_pool, - waterbodies_df, - waterbody_types_df, - duplicate_ids_df, - data_assimilation_parameters=False, - lastobs_df = None, - link_gage_df = None, - link_lake_crosswalk = None, - logFileName='NONE' + run, + results, + supernetwork_parameters, + output_parameters, + parity_parameters, + restart_parameters, + parity_set, + qts_subdivisions, + return_courant, + cpu_pool, + waterbodies_df, + waterbody_types_df, + duplicate_ids_df, + data_assimilation_parameters=False, + lastobs_df=None, + link_gage_df=None, + link_lake_crosswalk=None, + logFileName='NONE' ): - dt = run.get("dt") nts = run.get("nts") t0 = run.get("t0") @@ -111,6 +153,8 @@ def nwm_output_generator( csv_output = output_parameters.get("csv_output", None) csv_output_folder = None + parquet_output = output_parameters.get("parquet_output", None) + parquet_output_folder = None rsrto = output_parameters.get("hydro_rst_output", None) chrto = output_parameters.get("chrtout_output", None) test = output_parameters.get("test_output", None) @@ -125,8 +169,14 @@ def nwm_output_generator( ) csv_output_segments = csv_output.get("csv_output_segments", None) - if csv_output_folder or rsrto or chrto or chano or test or wbdyo or stream_output: - + if parquet_output: + parquet_output_folder = output_parameters["parquet_output"].get( + "parquet_output_folder", None + ) + parquet_output_segments = parquet_output.get("parquet_output_segments", None) + + if csv_output_folder or parquet_output_folder or rsrto or chrto or chano or test or wbdyo or stream_output: + start = time.time() qvd_columns = pd.MultiIndex.from_product( [range(nts), ["q", "v", "d"]] @@ -195,7 +245,6 @@ def nwm_output_generator( # replace waterbody lake_ids with outlet link ids if link_lake_crosswalk: - # (re) set the flowveldepth index courant.set_index(fvdidxs, inplace = True) @@ -249,7 +298,7 @@ def nwm_output_generator( output_waterbody_types_df = waterbody_types_df LOG.info("- writing t-route flow results to LAKEOUT files") start = time.time() - for i in range(i_df.shape[1]): + for i in range(i_df.shape[1]): nhd_io.write_waterbody_netcdf( wbdyo, i_df.iloc[:,[i]], @@ -270,7 +319,7 @@ def nwm_output_generator( preRunLog.write("Output of waterbody files into folder: "+str(wbdyo)+"\n") preRunLog.write("-----\n") preRunLog.close() - + LOG.debug("writing LAKEOUT files took a total time of %s seconds." % (time.time() - start)) if rsrto: @@ -316,23 +365,23 @@ def nwm_output_generator( else: LOG.critical('Did not find any restart files in wrf_hydro_channel_restart_source_directory. Aborting restart write sequence.') - + else: LOG.critical('wrf_hydro_channel_restart_source_directory not specified in configuration file. Aborting restart write sequence.') - + LOG.debug("writing restart files took %s seconds." % (time.time() - start)) if chrto: - + LOG.info("- writing t-route flow results to CHRTOUT files") start = time.time() chrtout_read_folder = chrto.get( "wrf_hydro_channel_output_source_folder", None ) - - if chrtout_read_folder: + if chrtout_read_folder: + chrtout_files = sorted( Path(chrtout_read_folder) / f for f in run["qlat_files"] ) @@ -389,7 +438,51 @@ def nwm_output_generator( LOG.debug("writing CSV file took %s seconds." % (time.time() - start)) # usgs_df_filtered = usgs_df[usgs_df.index.isin(csv_output_segments)] # usgs_df_filtered.to_csv(output_path.joinpath("usgs_df.csv")) - + + if parquet_output_folder: + + LOG.info("- writing flow, velocity, and depth results to .parquet") + start = time.time() + + # create filenames + # TO DO: create more descriptive filenames + if supernetwork_parameters.get("title_string", None): + filename_fvd = ( + "flowveldepth_" + supernetwork_parameters["title_string"] + ".parquet" + ) + filename_courant = ( + "courant_" + supernetwork_parameters["title_string"] + ".parquet" + ) + else: + run_time_stamp = datetime.now().isoformat() + filename_fvd = "flowveldepth_" + run_time_stamp + ".parquet" + filename_courant = "courant_" + run_time_stamp + ".parquet" + + output_path = Path(parquet_output_folder).resolve() + + # no parquet_output_segments are specified, then write results for all segments + if not parquet_output_segments: + parquet_output_segments = flowveldepth.index + + flowveldepth = flowveldepth.sort_index() + configuration = output_parameters["parquet_output"].get("configuration") + prefix_ids = output_parameters["parquet_output"].get("prefix_ids") + timeseries_df = _parquet_output_format_converter(flowveldepth, restart_parameters.get("start_datetime"), dt, + configuration, prefix_ids) + + parquet_output_segments_str = [prefix_ids + '-' + str(segment) for segment in parquet_output_segments] + timeseries_df.loc[timeseries_df['location_id'].isin(parquet_output_segments_str)].to_parquet( + output_path.joinpath(filename_fvd), allow_truncated_timestamps=True) + + if return_courant: + courant = courant.sort_index() + timeseries_courant = _parquet_output_format_converter(courant, restart_parameters.get("start_datetime"), dt, + configuration, prefix_ids) + timeseries_courant.loc[timeseries_courant['location_id'].isin(parquet_output_segments_str)].to_parquet( + output_path.joinpath(filename_courant), allow_truncated_timestamps=True) + + LOG.debug("writing parquet file took %s seconds." % (time.time() - start)) + if chano: LOG.info("- writing t-route flow results at gage locations to CHANOBS file") @@ -404,7 +497,7 @@ def nwm_output_generator( chano['chanobs_filepath'] = str(chano['chanobs_filepath']) nhd_io.write_chanobs( - Path(chano['chanobs_output_directory'] + chano['chanobs_filepath']), + Path(chano['chanobs_output_directory'] + chano['chanobs_filepath']), flowveldepth, link_gage_df, t0, @@ -424,7 +517,7 @@ def nwm_output_generator( LOG.debug("writing flow data to CHANOBS took %s seconds." % (time.time() - start)) - if lastobso: + if lastobso: # Write out LastObs as netcdf when using main_v04 or troute_model with HYfeature. # This is only needed if 1) streamflow nudging is ON and 2) a lastobs output # folder is provided by the user. @@ -461,24 +554,23 @@ def nwm_output_generator( LOG.debug("writing lastobs files took %s seconds." % (time.time() - start)) - # if 'flowveldepth' in locals(): - # LOG.debug(flowveldepth) + # LOG.debug(flowveldepth) LOG.debug("output complete in %s seconds." % (time.time() - start_time)) ################### Parity Check if parity_set: - + LOG.info( "conducting parity check, comparing WRF Hydro results against t-route results" ) start_time = time.time() - + parity_check( parity_set, results, ) - LOG.debug("parity check complete in %s seconds." % (time.time() - start_time)) \ No newline at end of file + LOG.debug("parity check complete in %s seconds." % (time.time() - start_time)) diff --git a/test/LowerColorado_TX/test_AnA_V4_NHD.yaml b/test/LowerColorado_TX/test_AnA_V4_NHD.yaml index 0d09cbc82..b8a906517 100644 --- a/test/LowerColorado_TX/test_AnA_V4_NHD.yaml +++ b/test/LowerColorado_TX/test_AnA_V4_NHD.yaml @@ -105,7 +105,7 @@ compute_parameters: reservoir_rfc_forecasts_lookback_hours : 48 #-------------------------------------------------------------------------------- -# output_parameters: +output_parameters: # #---------- # test_output: output/lcr_flowveldepth.pkl # lite_restart: @@ -124,5 +124,10 @@ compute_parameters: # stream_output_time: 1 #[hr] # stream_output_type: '.nc' #please select only between netcdf '.nc' or '.csv' or '.pkl' # stream_output_internal_frequency: 30 #[min] it should be order of 5 minutes. For instance if you want to output every hour put 60 + parquet_output: + #--------- + parquet_output_folder: output/ + configuration: short_range + prefix_ids: nex \ No newline at end of file