From 46b672b284ec04183b2a27fa6f08e7765a739298 Mon Sep 17 00:00:00 2001 From: Sean Horvath Date: Thu, 29 Feb 2024 20:38:04 +0000 Subject: [PATCH 1/4] allow reading nex-* files without converting to parquet --- src/troute-network/troute/AbstractNetwork.py | 23 ++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/troute-network/troute/AbstractNetwork.py b/src/troute-network/troute/AbstractNetwork.py index ad33b964e..68d00a5c2 100644 --- a/src/troute-network/troute/AbstractNetwork.py +++ b/src/troute-network/troute/AbstractNetwork.py @@ -728,10 +728,10 @@ def build_forcing_sets(self,): raise AssertionError("Aborting simulation because the qlat_input_folder:", qlat_input_folder,"does not exist. Please check the the nexus_input_folder variable is correctly entered in the .yaml control file") from None forcing_glob_filter = forcing_parameters["qlat_file_pattern_filter"] + binary_folder = forcing_parameters.get('binary_nexus_file_folder', None) - if forcing_glob_filter=="nex-*": + if forcing_glob_filter=="nex-*" and binary_folder: print("Reformating qlat nexus files as hourly binary files...") - binary_folder = forcing_parameters.get('binary_nexus_file_folder', None) qlat_files = qlat_input_folder.glob(forcing_glob_filter) #Check that directory/files specified will work @@ -748,9 +748,24 @@ def build_forcing_sets(self,): qlat_input_folder, forcing_glob_filter = nex_files_to_binary(qlat_files_list, binary_folder) forcing_parameters["qlat_input_folder"] = qlat_input_folder forcing_parameters["qlat_file_pattern_filter"] = forcing_glob_filter + + if forcing_glob_filter=="nex-*": + all_files = sorted(qlat_input_folder.glob(forcing_glob_filter)) + final_timestamp = pd.read_csv(all_files[0], header=None, index_col=[0]).tail(1).iloc[0,0] + final_timestamp = datetime.strptime(final_timestamp, "%Y-%m-%d %H:%M:%S") + + all_files = [os.path.basename(f) for f in all_files] + + run_sets = [ + { + 'qlat_files': all_files, + 'nts': nts, + 'final_timestamp': final_timestamp + } + ] # TODO: Throw errors if insufficient input data are available - if run_sets: + elif run_sets: #FIXME: Change it for hyfeature ''' # append final_timestamp variable to each set_list @@ -893,7 +908,7 @@ def get_timesteps_from_nex(nexus_files): output_file_timestamps = [] with open(nexus_files[0]) as f: for line in f: - output_file_timestamps.append(line.split(', ')[1]) + output_file_timestamps.append(line.split(',')[1].strip()) # Convert and reformat dates in the list output_file_timestamps = [pd.to_datetime(i).strftime("%Y%m%d%H%M") for i in output_file_timestamps] From f6256f70b9f256d15d963f875f93c9dc75715b98 Mon Sep 17 00:00:00 2001 From: Sean Horvath Date: Thu, 29 Feb 2024 20:38:41 +0000 Subject: [PATCH 2/4] enable creating qlat_df from nex-* files --- .../troute/HYFeaturesNetwork.py | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/src/troute-network/troute/HYFeaturesNetwork.py b/src/troute-network/troute/HYFeaturesNetwork.py index 7acc3ccbb..c26a49d66 100644 --- a/src/troute-network/troute/HYFeaturesNetwork.py +++ b/src/troute-network/troute/HYFeaturesNetwork.py @@ -10,6 +10,7 @@ from joblib import delayed, Parallel from collections import defaultdict import xarray as xr +from datetime import datetime import os import troute.nhd_io as nhd_io #FIXME @@ -618,16 +619,35 @@ def build_qlateral_array(self, run,): "qlat_file_pattern_filter", "*CHRT_OUT*" ) qlat_files = sorted(qlat_input_folder.glob(qlat_file_pattern_filter)) - + dfs=[] - for f in qlat_files: - df = read_file(f) - df['feature_id'] = df['feature_id'].map(lambda x: int(str(x).removeprefix('nex-')) if str(x).startswith('nex') else int(x)) - df = df.set_index('feature_id') - dfs.append(df) - # lateral flows [m^3/s] are stored at NEXUS points with NEXUS ids - nexuses_lateralflows_df = pd.concat(dfs, axis=1) + #FIXME Temporary solution to allow t-route to use ngen nex-* output files as forcing files + # This capability should be here, but we need to think through how to handle all of this + # data in memory for large domains and many timesteps... - shorvath, Feb 28, 2024 + qlat_file_pattern_filter = self.forcing_parameters.get("qlat_file_pattern_filter", None) + if qlat_file_pattern_filter=="nex-*": + for f in qlat_files: + df = pd.read_csv(f, names=['timestamp', 'qlat'], index_col=[0]) + df['timestamp'] = pd.to_datetime(df['timestamp']).dt.strftime('%Y%m%d%H%M') + df = df.set_index('timestamp') + df = df.T + df.index = [int(os.path.basename(f).split('-')[1].split('_')[0])] + df = df.rename_axis(None, axis=1) + df.index.name = 'feature_id' + dfs.append(df) + + # lateral flows [m^3/s] are stored at NEXUS points with NEXUS ids + nexuses_lateralflows_df = pd.concat(dfs, axis=0) + else: + for f in qlat_files: + df = read_file(f) + df['feature_id'] = df['feature_id'].map(lambda x: int(str(x).removeprefix('nex-')) if str(x).startswith('nex') else int(x)) + df = df.set_index('feature_id') + dfs.append(df) + + # lateral flows [m^3/s] are stored at NEXUS points with NEXUS ids + nexuses_lateralflows_df = pd.concat(dfs, axis=1) # Take flowpath ids entering NEXUS and replace NEXUS ids by the upstream flowpath ids qlats_df = nexuses_lateralflows_df.rename(index=self.downstream_flowpath_dict) From 1e4af2705a6c132395052949c468c93339495e9f Mon Sep 17 00:00:00 2001 From: Sean Horvath Date: Thu, 29 Feb 2024 20:39:05 +0000 Subject: [PATCH 3/4] update functionality to write flowveldepth to netcdf, improve speed --- src/troute-network/troute/nhd_io.py | 547 ++++++++++++++++++++-------- 1 file changed, 387 insertions(+), 160 deletions(-) diff --git a/src/troute-network/troute/nhd_io.py b/src/troute-network/troute/nhd_io.py index b1d4db074..94b52242e 100644 --- a/src/troute-network/troute/nhd_io.py +++ b/src/troute-network/troute/nhd_io.py @@ -1954,103 +1954,154 @@ def write_waterbody_netcdf( } ) -def helper_write_flowveldepth(flowveldepth, subset_df, current_time_step, - stream_output_directory, stream_output_type, gage, - nudge_timesteps, time_dim, stream_output_internal_frequency, time_steps, - counter, t0, stream_output_timediff): - ''' - Just a helper function for 'write_flowveldepth_netcdf' function for parallel computing. - ''' - - if stream_output_directory: - if stream_output_type =='.nc': - file_name = f"{current_time_step}.flowveldepth.nc" - - elif stream_output_type=='.csv': - file_name = f"{current_time_step}.flowveldepth.csv" - # Save the data to CSV file - subset_df.to_csv(f"{stream_output_directory}/{file_name}", index=True) - LOG.debug(f"Flowveldepth data saved as CSV files in {stream_output_directory}") - - elif stream_output_type=='.pkl': - file_name = f"{current_time_step}.flowveldepth.pkl" - # Save the data to Pickle file - subset_df.to_pickle(f"{stream_output_directory}/{file_name}") - LOG.debug(f"Flowveldepth data saved as PICKLE files in {stream_output_directory}") - - else: - print('WRONG FORMAT') + +def write_flowveldepth_netcdf(stream_output_directory, file_name, + flow, velocity, depth, nudge_df, timestamps, + t0): + # Open netCDF4 Dataset in write mode + with netCDF4.Dataset( + filename=f"{stream_output_directory}/{file_name}", + mode='w', + format='NETCDF4' + ) as ncfile: + + # ============ DIMENSIONS =================== + _ = ncfile.createDimension('feature_id', len(flow)) + _ = ncfile.createDimension('time', len(timestamps)) + #_ = ncfile.createDimension('gage', gage) + #_ = ncfile.createDimension('nudge_timestep', nudge_timesteps) # Add dimension for nudge time steps - if stream_output_directory: - if (stream_output_type =='.nc'): - # Open netCDF4 Dataset in write mode - with netCDF4.Dataset( - filename=f"{stream_output_directory}/{file_name}", - mode='w', - format='NETCDF4' - ) as ncfile: - - # ============ DIMENSIONS =================== - _ = ncfile.createDimension('feature_id', None) - _ = ncfile.createDimension('time_step (sec)', subset_df.iloc[:, 0::4].shape[1]) - _ = ncfile.createDimension('gage', gage) - _ = ncfile.createDimension('nudge_timestep', nudge_timesteps) # Add dimension for nudge time steps - - # =========== q,v,d,ndg VARIABLES =============== - for counters, var in enumerate(['flowrate', 'velocity', 'depth', 'nudge']): - QVD = ncfile.createVariable( - varname=var, - datatype=np.float32, - dimensions=('feature_id', 'time_step (sec)',), - ) - - QVD.units = 'm3/s m/s m m3/s' - QVD.description = f'Data for {var}' - - # Prepare data for writing - data_array = subset_df.iloc[:, counters::4].to_numpy(dtype=np.float32) - - # Set data for each feature_id and time_step - ncfile.variables[var][:] = data_array - feature_id = ncfile.createVariable( - varname='feature_id', - datatype=np.int32, - dimensions=('feature_id',), - ) - feature_id[:] = flowveldepth.index.to_numpy(dtype=np.int32) - feature_id.units = 'None' - feature_id.description = 'Feature IDs' - ### - time_step = ncfile.createVariable( - varname='time_step (sec)', - datatype=np.int32, - dimensions=('time_step (sec)',), - ) - time_step[:] = np.array(time_dim[:subset_df.iloc[:, 0::4].shape[1]], dtype=np.int32) - time_step.units = 'sec' - time_step.description = 'time stamp' - # =========== GLOBAL ATTRIBUTES =============== - ncfile.setncatts( - { - 'TITLE': 'OUTPUT FROM T-ROUTE', - 'Time step (sec)': f'{stream_output_internal_frequency}', - 'model_initialization_time': t0.strftime('%Y-%m-%d_%H:%M:%S'), - 'model_reference_time': time_steps[counter].strftime('%Y-%m-%d_%H:%M:%S'), - 'comment': f'The file includes {stream_output_timediff} hour data which includes {len(time_dim)} timesteps', - 'code_version': '', - } - ) - LOG.debug(f"Flowveldepth data saved as NetCDF files in {stream_output_directory}") - -def write_flowveldepth_netcdf(stream_output_directory, - flowveldepth, - nudge, - usgs_positions_id, - t0, - stream_output_timediff, - stream_output_type, - stream_output_internal_frequency = 5, - cpu_pool = 6): + # =========== time VARIABLE =============== + TIME = ncfile.createVariable( + varname = "time", + datatype = 'int32', + dimensions = ("time",), + fill_value = -9999.0 + ) + TIME[:] = timestamps + ncfile['time'].setncatts( + { + 'long_name': 'valid output time', + 'standard_name': 'time', + 'units': 'seconds', + 'missing_value': -9999.0 + #'calendar': 'proleptic_gregorian' + } + ) + + # # =========== reference_time VARIABLE =============== + # REF_TIME = ncfile.createVariable( + # varname = "reference_time", + # datatype = 'S1', + # dimensions = ("reference_time",), + # ) + # REF_TIME[:] = t0.strftime('%Y-%m-%d_%H:%M:%S') + # ncfile['reference_time'].setncatts( + # { + # 'long_name': 'reference time', + # 'standard_name': 'reference_time', + # } + # ) + + # =========== feature_id VARIABLE =============== + FEATURE_ID = ncfile.createVariable( + varname = "feature_id", + datatype = 'int64', + dimensions = ("feature_id",), + ) + FEATURE_ID[:] = flow.index + ncfile['feature_id'].setncatts( + { + 'long_name': 'Segment ID', + } + ) + + # =========== flow VARIABLE =============== + flow_var = ncfile.createVariable( + varname = "flow", + datatype = "f4", + dimensions = ("feature_id", "time"), + fill_value = -9999.0 + ) + + flow_var[:] = flow.to_numpy(dtype=np.float32) + ncfile['flow'].setncatts( + { + 'long_name': 'Flow', + 'units': 'm3 s-1', + 'missing_value': -9999.0 + } + ) + + # =========== velocity VARIABLE =============== + velocity_var = ncfile.createVariable( + varname = "velocity", + datatype = "f4", + dimensions = ("feature_id", "time"), + fill_value = -9999.0 + ) + velocity_var[:] = velocity.to_numpy(dtype=np.float32) + ncfile['velocity'].setncatts( + { + 'long_name': 'Velocity', + 'units': 'm/s', + 'missing_value': -9999.0 + } + ) + + # =========== depth VARIABLE =============== + depth_var = ncfile.createVariable( + varname = "depth", + datatype = "f4", + dimensions = ("feature_id", "time"), + fill_value = -9999.0 + ) + depth_var[:] = depth.to_numpy(dtype=np.float32) + ncfile['depth'].setncatts( + { + 'long_name': 'Depth', + 'units': 'm', + 'missing_value': -9999.0 + } + ) + + # =========== nudge VARIABLE =============== + nudge = ncfile.createVariable( + varname = "nudge", + datatype = "f4", + dimensions = ("feature_id", "time"), + fill_value = -9999.0 + ) + nudge[:] = nudge_df.to_numpy(dtype=np.float32) + ncfile['nudge'].setncatts( + { + 'long_name': 'Streamflow Nudge Value', + 'units': 'm3 s-1', + 'missing_value': -9999.0 + } + ) + + # =========== GLOBAL ATTRIBUTES =============== + ncfile.setncatts( + { + 'TITLE': 'OUTPUT FROM T-ROUTE', + 'file_reference_time': t0.strftime('%Y-%m-%d_%H:%M:%S'), + 'code_version': '', + } + ) + +def write_flowveldepth( + stream_output_directory, + flowveldepth, + nudge, + usgs_positions_id, + t0, + dt, + stream_output_timediff, + stream_output_type, + stream_output_internal_frequency = 5, + cpu_pool = 1, + ): ''' Write the results of flowveldepth and nudge to netcdf- break. Arguments @@ -2060,83 +2111,259 @@ def write_flowveldepth_netcdf(stream_output_directory, nudge (numpy.ndarray) - nudge data with shape (76, 289) usgs_positions_id (array) - Position ids of usgs gages ''' - # Number of timesteps and features - nsteps = len(flowveldepth.columns) // 3 - num_features = len(flowveldepth) - nstep_nc = 12 * stream_output_timediff + # timesteps, variable = zip(*flowveldepth.columns.tolist()) + # timesteps = list(timesteps) + n_timesteps = flowveldepth.shape[1]//3 + ts = stream_output_internal_frequency//5 + ind = [i for i in range(ts-1,n_timesteps+1,ts)] + timestamps_sec = [(i+1)*300 for i in ind] + + flow = flowveldepth.iloc[:,0::3].iloc[:,ind] + velocity = flowveldepth.iloc[:,1::3].iloc[:,ind] + depth = flowveldepth.iloc[:,2::3].iloc[:,ind] + # Check if the first column of nudge is all zeros if np.all(nudge[:, 0] == 0): # Drop the first column nudge = nudge[:, 1:] - - gage, nudge_timesteps = nudge.shape + nudge_df = pd.DataFrame(data=nudge, index=usgs_positions_id).iloc[:,ind] + empty_ids = list(set(flowveldepth.index).difference(set(nudge_df.index))) + empty_df = pd.DataFrame(index=empty_ids, columns=nudge_df.columns).fillna(-9999.0) + nudge_df = pd.concat([nudge_df, empty_df]).loc[flowveldepth.index] + + ts_per_file = stream_output_timediff*60//stream_output_internal_frequency - #--------- Add 'nudge' column based on usgs_positions_id---------- + num_files = flowveldepth.shape[1]//3*dt//(stream_output_timediff*60*60) + if num_files==0: + num_files==1 + file_name_time = t0 + jobs = [] + for _ in range(num_files): + filename = 'troute_output_' + file_name_time.strftime('%Y%m%d%H%M') + '.nc' + args = (stream_output_directory,filename, + flow.iloc[:,0:ts_per_file], + velocity.iloc[:,0:ts_per_file], + depth.iloc[:,0:ts_per_file], + nudge_df.iloc[:,0:ts_per_file], + timestamps_sec[0:ts_per_file],t0) + if cpu_pool > 1 & num_files > 1: + jobs.append(delayed(write_flowveldepth_netcdf)(*args)) + else: + write_flowveldepth_netcdf(*args) + + flow = flow.iloc[:,ts_per_file:] + velocity = velocity.iloc[:,ts_per_file:] + depth = depth.iloc[:,ts_per_file:] + nudge_df = nudge_df.iloc[:,ts_per_file:] + timestamps_sec = timestamps_sec[ts_per_file:] + file_name_time = file_name_time + timedelta(hours=stream_output_timediff) + + if cpu_pool > 1 & num_files > 1: + try: + # Execute all jobs in parallel + with Parallel(n_jobs=cpu_pool) as parallel: + parallel(jobs) + except Exception as e: + LOG.error("Error during parallel writing output: %s", e) + + LOG.debug("Completed the write_flowveldepth_netcdf function") + - # Create a copy of the flowveldepth DataFrame to add 'ndg' columns - qvd_ndg = flowveldepth.copy() - # Create a list for names of the columns for nudge values - ndg_columns = [(j,'ndg') for j in range(nsteps)] - if len(usgs_positions_id)>0: - # Add 'ndg' columns based on usgs_positions_id - for i, usgs_id in enumerate(usgs_positions_id): - # Extract the corresponding nudge values for the usgs_id - nudge_values = nudge[i] - # Assign nudge values to 'ndg' columns for the corresponding row - qvd_ndg.loc[usgs_id, ndg_columns] = nudge_values - else: - qvd_ndg.loc[:, ndg_columns] = -9999.0 +#TODO Delete these function. They've been replace by the above two functions. Keep here for +# now until functions above are properly vetted. - new_order = [(i, attr) for i in range(0, nsteps) for attr in ['q', 'v', 'd', 'ndg']] - # Reorder the columns - qvd_ndg = qvd_ndg[new_order] - - # renaming the columns based on timestamp - column_name_timeStamp = [t0 + timedelta(minutes=(i * 5)) for i in range(nsteps)] - new_column_name_timeStamp = [(attr + '_' +str(cnt) + '_' + times.strftime('%Y%m%d%H%M')) for cnt, times in enumerate(column_name_timeStamp) for attr in ['q', 'v', 'd', 'ndg']] - qvd_ndg.columns = new_column_name_timeStamp - # Create time step values based on t0 - time_steps = [t0 + timedelta(hours= (i * stream_output_timediff)) for i in range(nsteps//nstep_nc)] - time_dim = [t * stream_output_internal_frequency*60 for t in range(1, int(stream_output_timediff * 60 / stream_output_internal_frequency) + 1)] - jobs = [] +# def helper_write_flowveldepth(flowveldepth, subset_df, current_time_step, +# stream_output_directory, stream_output_type, gage, +# nudge_timesteps, time_dim, stream_output_internal_frequency, time_steps, +# counter, t0, stream_output_timediff): +# ''' +# Just a helper function for 'write_flowveldepth_netcdf' function for parallel computing. +# ''' - for counter, i in enumerate(range(0, nsteps, nstep_nc)): +# if stream_output_directory: +# if stream_output_type =='.nc': +# file_name = f"{current_time_step}.flowveldepth.nc" + +# elif stream_output_type=='.csv': +# file_name = f"{current_time_step}.flowveldepth.csv" +# # Save the data to CSV file +# subset_df.to_csv(f"{stream_output_directory}/{file_name}", index=True) +# LOG.debug(f"Flowveldepth data saved as CSV files in {stream_output_directory}") + +# elif stream_output_type=='.pkl': +# file_name = f"{current_time_step}.flowveldepth.pkl" +# # Save the data to Pickle file +# subset_df.to_pickle(f"{stream_output_directory}/{file_name}") +# LOG.debug(f"Flowveldepth data saved as PICKLE files in {stream_output_directory}") + +# else: +# print('WRONG FORMAT') + +# if stream_output_directory: +# if (stream_output_type =='.nc'): +# # Open netCDF4 Dataset in write mode +# with netCDF4.Dataset( +# filename=f"{stream_output_directory}/{file_name}", +# mode='w', +# format='NETCDF4' +# ) as ncfile: + +# # ============ DIMENSIONS =================== +# _ = ncfile.createDimension('feature_id', None) +# _ = ncfile.createDimension('time_step (sec)', subset_df.iloc[:, 0::4].shape[1]) +# _ = ncfile.createDimension('gage', gage) +# _ = ncfile.createDimension('nudge_timestep', nudge_timesteps) # Add dimension for nudge time steps + +# # =========== q,v,d,ndg VARIABLES =============== +# for counters, var in enumerate(['flowrate', 'velocity', 'depth', 'nudge']): +# QVD = ncfile.createVariable( +# varname=var, +# datatype=np.float32, +# dimensions=('feature_id', 'time_step (sec)',), +# ) + +# QVD.units = 'm3/s m/s m m3/s' +# QVD.description = f'Data for {var}' + +# # Prepare data for writing +# data_array = subset_df.iloc[:, counters::4].to_numpy(dtype=np.float32) + +# # Set data for each feature_id and time_step +# ncfile.variables[var][:] = data_array +# feature_id = ncfile.createVariable( +# varname='feature_id', +# datatype=np.int32, +# dimensions=('feature_id',), +# ) +# feature_id[:] = flowveldepth.index.to_numpy(dtype=np.int32) +# feature_id.units = 'None' +# feature_id.description = 'Feature IDs' +# ### +# time_step = ncfile.createVariable( +# varname='time_step (sec)', +# datatype=np.int32, +# dimensions=('time_step (sec)',), +# ) +# time_step[:] = np.array(time_dim[:subset_df.iloc[:, 0::4].shape[1]], dtype=np.int32) +# time_step.units = 'sec' +# time_step.description = 'time stamp' +# # =========== GLOBAL ATTRIBUTES =============== +# ncfile.setncatts( +# { +# 'TITLE': 'OUTPUT FROM T-ROUTE', +# 'Time step (sec)': f'{stream_output_internal_frequency}', +# 'model_initialization_time': t0.strftime('%Y-%m-%d_%H:%M:%S'), +# 'model_reference_time': time_steps[counter].strftime('%Y-%m-%d_%H:%M:%S'), +# 'comment': f'The file includes {stream_output_timediff} hour data which includes {len(time_dim)} timesteps', +# 'code_version': '', +# } +# ) +# LOG.debug(f"Flowveldepth data saved as NetCDF files in {stream_output_directory}") + +# def write_flowveldepth_netcdf(stream_output_directory, +# flowveldepth, +# nudge, +# usgs_positions_id, +# t0, +# stream_output_timediff, +# stream_output_type, +# stream_output_internal_frequency = 5, +# cpu_pool = 6): +# ''' +# Write the results of flowveldepth and nudge to netcdf- break. +# Arguments +# ------------- +# stream_output_directory (Path or string) - directory where file will be created +# flowveldepth (DataFrame) - including flowrate, velocity, and depth for each time step +# nudge (numpy.ndarray) - nudge data with shape (76, 289) +# usgs_positions_id (array) - Position ids of usgs gages +# ''' +# # Number of timesteps and features +# print('step 1...') +# nsteps = len(flowveldepth.columns) // 3 +# num_features = len(flowveldepth) +# nstep_nc = 12 * stream_output_timediff +# # Check if the first column of nudge is all zeros +# if np.all(nudge[:, 0] == 0): +# # Drop the first column +# nudge = nudge[:, 1:] + +# gage, nudge_timesteps = nudge.shape +# print('step 2...') +# #--------- Add 'nudge' column based on usgs_positions_id---------- + +# # Create a copy of the flowveldepth DataFrame to add 'ndg' columns +# qvd_ndg = flowveldepth.copy() +# # Create a list for names of the columns for nudge values +# ndg_columns = [(j,'ndg') for j in range(nsteps)] + +# if len(usgs_positions_id)>0: +# # Add 'ndg' columns based on usgs_positions_id +# for i, usgs_id in enumerate(usgs_positions_id): +# # Extract the corresponding nudge values for the usgs_id +# nudge_values = nudge[i] + +# # Assign nudge values to 'ndg' columns for the corresponding row +# qvd_ndg.loc[usgs_id, ndg_columns] = nudge_values +# else: +# print('step 3...') +# qvd_ndg.loc[:, ndg_columns] = -9999.0 + +# print('step 4...') +# new_order = [(i, attr) for i in range(0, nsteps) for attr in ['q', 'v', 'd', 'ndg']] +# # Reorder the columns +# qvd_ndg = qvd_ndg[new_order] + +# # renaming the columns based on timestamp +# print('step 5...') +# column_name_timeStamp = [t0 + timedelta(minutes=(i * 5)) for i in range(nsteps)] +# new_column_name_timeStamp = [(attr + '_' +str(cnt) + '_' + times.strftime('%Y%m%d%H%M')) for cnt, times in enumerate(column_name_timeStamp) for attr in ['q', 'v', 'd', 'ndg']] +# qvd_ndg.columns = new_column_name_timeStamp + +# # Create time step values based on t0 +# print('step 6...') +# time_steps = [t0 + timedelta(hours= (i * stream_output_timediff)) for i in range(nsteps//nstep_nc)] +# time_dim = [t * stream_output_internal_frequency*60 for t in range(1, int(stream_output_timediff * 60 / stream_output_internal_frequency) + 1)] +# jobs = [] + +# for counter, i in enumerate(range(0, nsteps, nstep_nc)): - # Define the range of columns for this file - start_col = i * 4 - end_col = min((i + nstep_nc) * 4 , nsteps * 4) - selected_col = stream_output_internal_frequency // 5 - # Create a subset DataFrame for the current range of columns - # subset_df = qvd_ndg.iloc[:, start_col:end_col] - # Create a list of column names to keep - columns_to_keep = [col for col in qvd_ndg.columns[start_col:end_col] if int(col.split('_')[1]) % selected_col == 0] - subset_df = qvd_ndg[columns_to_keep] - subset_df.columns = ['_'.join([col.split('_')[0], col.split('_')[2]]) for col in subset_df.columns] +# # Define the range of columns for this file +# start_col = i * 4 +# end_col = min((i + nstep_nc) * 4 , nsteps * 4) +# selected_col = stream_output_internal_frequency // 5 +# # Create a subset DataFrame for the current range of columns +# # subset_df = qvd_ndg.iloc[:, start_col:end_col] +# # Create a list of column names to keep +# columns_to_keep = [col for col in qvd_ndg.columns[start_col:end_col] if int(col.split('_')[1]) % selected_col == 0] +# subset_df = qvd_ndg[columns_to_keep] +# subset_df.columns = ['_'.join([col.split('_')[0], col.split('_')[2]]) for col in subset_df.columns] - # Create the file name based on the current time step - current_time_step = time_steps[counter].strftime('%Y%m%d%H%M') +# # Create the file name based on the current time step +# import pdb; pdb.set_trace() +# current_time_step = time_steps[counter].strftime('%Y%m%d%H%M') - args = (flowveldepth, subset_df, current_time_step, - stream_output_directory, stream_output_type, gage, - nudge_timesteps, time_dim, stream_output_internal_frequency, time_steps, - counter, t0, stream_output_timediff) +# args = (flowveldepth, subset_df, current_time_step, +# stream_output_directory, stream_output_type, gage, +# nudge_timesteps, time_dim, stream_output_internal_frequency, time_steps, +# counter, t0, stream_output_timediff) - if cpu_pool > 1: - jobs.append(delayed(helper_write_flowveldepth)(*args)) - LOG.debug(f"Job for step {counter} added for parallel processing.") - else: - helper_write_flowveldepth(*args) - - if cpu_pool > 1: - try: - # Execute all jobs in parallel - with Parallel(n_jobs=cpu_pool) as parallel: - parallel(jobs) - except Exception as e: - LOG.error("Error during parallel processing: %s", e) - - LOG.debug("Completed the write_flowveldepth_netcdf function") +# if cpu_pool > 1: +# jobs.append(delayed(helper_write_flowveldepth)(*args)) +# LOG.debug(f"Job for step {counter} added for parallel processing.") +# else: +# helper_write_flowveldepth(*args) + +# if cpu_pool > 1: +# try: +# # Execute all jobs in parallel +# with Parallel(n_jobs=cpu_pool) as parallel: +# parallel(jobs) +# except Exception as e: +# LOG.error("Error during parallel processing: %s", e) + +# LOG.debug("Completed the write_flowveldepth_netcdf function") From a810fc9e2adf115e2458a95f2fad474d327d15bc Mon Sep 17 00:00:00 2001 From: Sean Horvath Date: Thu, 29 Feb 2024 20:39:26 +0000 Subject: [PATCH 4/4] update calling write_flowveldepth functions --- src/troute-nwm/src/nwm_routing/output.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/troute-nwm/src/nwm_routing/output.py b/src/troute-nwm/src/nwm_routing/output.py index 6a4de28f3..e4adc5bf1 100644 --- a/src/troute-nwm/src/nwm_routing/output.py +++ b/src/troute-nwm/src/nwm_routing/output.py @@ -202,15 +202,18 @@ def nwm_output_generator( nudge = np.concatenate([r[8] for r in results]) usgs_positions_id = np.concatenate([r[3][0] for r in results]) - nhd_io.write_flowveldepth_netcdf(Path(stream_output_directory), - flowveldepth, - nudge, - usgs_positions_id, - t0, - int(stream_output_timediff), - stream_output_type, - stream_output_internal_frequency, - cpu_pool = cpu_pool) + nhd_io.write_flowveldepth( + Path(stream_output_directory), + flowveldepth, + nudge, + usgs_positions_id, + t0, + dt, + int(stream_output_timediff), + stream_output_type, + stream_output_internal_frequency, + cpu_pool = cpu_pool + ) if test: flowveldepth.to_pickle(Path(test))