diff --git a/src/troute-network/troute/nhd_io.py b/src/troute-network/troute/nhd_io.py index 8b144c5a5..466c3ddf5 100644 --- a/src/troute-network/troute/nhd_io.py +++ b/src/troute-network/troute/nhd_io.py @@ -1948,6 +1948,94 @@ def write_waterbody_netcdf( } ) +def helper_write_flowveldepth(flowveldepth, subset_df, current_time_step, + stream_output_directory, stream_output_type, gage, + nudge_timesteps, time_dim, stream_output_internal_frequency, time_steps, + counter, t0, stream_output_timediff): + ''' + Just a helper function for 'write_flowveldepth_netcdf' function for parallel computing. + ''' + + if stream_output_directory: + if stream_output_type =='.nc': + file_name = f"{current_time_step}.flowveldepth.nc" + + elif stream_output_type=='.csv': + file_name = f"{current_time_step}.flowveldepth.csv" + # Save the data to CSV file + subset_df.to_csv(f"{stream_output_directory}/{file_name}", index=True) + LOG.debug(f"Flowveldepth data saved as CSV files in {stream_output_directory}") + + elif stream_output_type=='.pkl': + file_name = f"{current_time_step}.flowveldepth.pkl" + # Save the data to Pickle file + subset_df.to_pickle(f"{stream_output_directory}/{file_name}") + LOG.debug(f"Flowveldepth data saved as PICKLE files in {stream_output_directory}") + + else: + print('WRONG FORMAT') + + if stream_output_directory: + if (stream_output_type =='.nc'): + # Open netCDF4 Dataset in write mode + with netCDF4.Dataset( + filename=f"{stream_output_directory}/{file_name}", + mode='w', + format='NETCDF4' + ) as ncfile: + + # ============ DIMENSIONS =================== + _ = ncfile.createDimension('feature_id', None) + _ = ncfile.createDimension('time_step (sec)', subset_df.iloc[:, 0::4].shape[1]) + _ = ncfile.createDimension('gage', gage) + _ = ncfile.createDimension('nudge_timestep', nudge_timesteps) # Add dimension for nudge time steps + + # =========== q,v,d,ndg VARIABLES =============== + for counters, var in enumerate(['flowrate', 'velocity', 'depth', 'nudge']): + QVD = ncfile.createVariable( + varname=var, + datatype=np.float32, + dimensions=('feature_id', 'time_step (sec)',), + ) + + QVD.units = 'm3/s m/s m m3/s' + QVD.description = f'Data for {var}' + + # Prepare data for writing + data_array = subset_df.iloc[:, counters::4].to_numpy(dtype=np.float32) + + # Set data for each feature_id and time_step + ncfile.variables[var][:] = data_array + feature_id = ncfile.createVariable( + varname='feature_id', + datatype=np.int32, + dimensions=('feature_id',), + ) + feature_id[:] = flowveldepth.index.to_numpy(dtype=np.int32) + feature_id.units = 'None' + feature_id.description = 'Feature IDs' + ### + time_step = ncfile.createVariable( + varname='time_step (sec)', + datatype=np.int32, + dimensions=('time_step (sec)',), + ) + time_step[:] = np.array(time_dim[:subset_df.iloc[:, 0::4].shape[1]], dtype=np.int32) + time_step.units = 'sec' + time_step.description = 'time stamp' + # =========== GLOBAL ATTRIBUTES =============== + ncfile.setncatts( + { + 'TITLE': 'OUTPUT FROM T-ROUTE', + 'Time step (sec)': f'{stream_output_internal_frequency}', + 'model_initialization_time': t0.strftime('%Y-%m-%d_%H:%M:%S'), + 'model_reference_time': time_steps[counter].strftime('%Y-%m-%d_%H:%M:%S'), + 'comment': f'The file includes {stream_output_timediff} hour data which includes {len(time_dim)} timesteps', + 'code_version': '', + } + ) + LOG.debug(f"Flowveldepth data saved as NetCDF files in {stream_output_directory}") + def write_flowveldepth_netcdf(stream_output_directory, flowveldepth, nudge, @@ -1955,7 +2043,8 @@ def write_flowveldepth_netcdf(stream_output_directory, t0, stream_output_timediff, stream_output_type, - stream_output_internal_frequency = 5): + stream_output_internal_frequency = 5, + cpu_pool = 6): ''' Write the results of flowveldepth and nudge to netcdf- break. Arguments @@ -2006,8 +2095,11 @@ def write_flowveldepth_netcdf(stream_output_directory, # Create time step values based on t0 time_steps = [t0 + timedelta(hours= (i * stream_output_timediff)) for i in range(nsteps//nstep_nc)] time_dim = [t * stream_output_internal_frequency*60 for t in range(1, int(stream_output_timediff * 60 / stream_output_internal_frequency) + 1)] + jobs = [] for counter, i in enumerate(range(0, nsteps, nstep_nc)): + + # Define the range of columns for this file start_col = i * 4 end_col = min((i + nstep_nc) * 4 , nsteps * 4) @@ -2021,83 +2113,24 @@ def write_flowveldepth_netcdf(stream_output_directory, # Create the file name based on the current time step current_time_step = time_steps[counter].strftime('%Y%m%d%H%M') - if stream_output_directory: - if stream_output_type =='.nc': - file_name = f"{current_time_step}.flowveldepth.nc" - - elif stream_output_type=='.csv': - file_name = f"{current_time_step}.flowveldepth.csv" - # Save the data to CSV file - subset_df.to_csv(f"{stream_output_directory}/{file_name}", index=True) - LOG.debug(f"Flowveldepth data saved as CSV files in {stream_output_directory}") - - elif stream_output_type=='.pkl': - file_name = f"{current_time_step}.flowveldepth.pkl" - # Save the data to Pickle file - subset_df.to_pickle(f"{stream_output_directory}/{file_name}") - LOG.debug(f"Flowveldepth data saved as PICKLE files in {stream_output_directory}") - - else: - print('WRONG FORMAT') - if stream_output_directory: - if (stream_output_type =='.nc'): - # Open netCDF4 Dataset in write mode - with netCDF4.Dataset( - filename=f"{stream_output_directory}/{file_name}", - mode='w', - format='NETCDF4' - ) as ncfile: - - # ============ DIMENSIONS =================== - _ = ncfile.createDimension('feature_id', None) - _ = ncfile.createDimension('time_step (sec)', subset_df.iloc[:, 0::4].shape[1]) - _ = ncfile.createDimension('gage', gage) - _ = ncfile.createDimension('nudge_timestep', nudge_timesteps) # Add dimension for nudge time steps - - # =========== q,v,d,ndg VARIABLES =============== - for counters, var in enumerate(['flowrate', 'velocity', 'depth', 'nudge']): - QVD = ncfile.createVariable( - varname=var, - datatype=np.float32, - dimensions=('feature_id', 'time_step (sec)',), - ) - - QVD.units = 'm3/s m/s m m3/s' - QVD.description = f'Data for {var}' - - # Prepare data for writing - data_array = subset_df.iloc[:, counters::4].to_numpy(dtype=np.float32) - - # Set data for each feature_id and time_step - ncfile.variables[var][:] = data_array - feature_id = ncfile.createVariable( - varname='feature_id', - datatype=np.int32, - dimensions=('feature_id',), - ) - feature_id[:] = flowveldepth.index.to_numpy(dtype=np.int32) - feature_id.units = 'None' - feature_id.description = 'Feature IDs' - ### - time_step = ncfile.createVariable( - varname='time_step (sec)', - datatype=np.int32, - dimensions=('time_step (sec)',), - ) - time_step[:] = np.array(time_dim[:subset_df.iloc[:, 0::4].shape[1]], dtype=np.int32) - time_step.units = 'sec' - time_step.description = 'time stamp' - # =========== GLOBAL ATTRIBUTES =============== - ncfile.setncatts( - { - 'TITLE': 'OUTPUT FROM T-ROUTE', - 'Time step (sec)': f'{stream_output_internal_frequency}', - 'model_initialization_time': t0.strftime('%Y-%m-%d_%H:%M:%S'), - 'model_reference_time': time_steps[counter].strftime('%Y-%m-%d_%H:%M:%S'), - 'comment': f'The file includes {stream_output_timediff} hour data which includes {len(time_dim)} timesteps', - 'code_version': '', - } - ) - LOG.debug(f"Flowveldepth data saved as NetCDF files in {stream_output_directory}") - \ No newline at end of file + args = (flowveldepth, subset_df, current_time_step, + stream_output_directory, stream_output_type, gage, + nudge_timesteps, time_dim, stream_output_internal_frequency, time_steps, + counter, t0, stream_output_timediff) + + if cpu_pool > 1: + jobs.append(delayed(helper_write_flowveldepth)(*args)) + LOG.debug(f"Job for step {counter} added for parallel processing.") + else: + helper_write_flowveldepth(*args) + + if cpu_pool > 1: + try: + # Execute all jobs in parallel + with Parallel(n_jobs=cpu_pool) as parallel: + parallel(jobs) + except Exception as e: + LOG.error("Error during parallel processing: %s", e) + + LOG.debug("Completed the write_flowveldepth_netcdf function") diff --git a/src/troute-nwm/src/nwm_routing/output.py b/src/troute-nwm/src/nwm_routing/output.py index c5c4a10be..4575d4406 100644 --- a/src/troute-nwm/src/nwm_routing/output.py +++ b/src/troute-nwm/src/nwm_routing/output.py @@ -209,7 +209,8 @@ def nwm_output_generator( t0, int(stream_output_timediff), stream_output_type, - stream_output_internal_frequency) + stream_output_internal_frequency, + cpu_pool = cpu_pool) if test: flowveldepth.to_pickle(Path(test))