Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding lite-restart and lastObs to BMI conversions #685

Merged
merged 2 commits into from
Nov 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 133 additions & 90 deletions src/model_DAforcing.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def __init__(self, bmi_cfg_file=None):

# Lastobs
lastobs_file = data_assimilation_parameters.get('streamflow_da', {}).get('lastobs_file', False)

if lastobs_file:
self._lastobs_df = _read_lastobs_file(lastobs_file)

Expand All @@ -131,6 +132,19 @@ def __init__(self, bmi_cfg_file=None):

# read in metadata for BMI compliant arrays:

if not self._lastobs_df.empty:

if lastobs_file:

(_lastObs_gageArray, _lastObs_gageStringLengths, \
_lastObs_timeSince, _lastObs_discharge) = \
_bmi_disassemble_lastObs (self._lastobs_df)

self.lastObs_gageArray = _lastObs_gageArray
self.lastObs_gageStringLengths = _lastObs_gageStringLengths
self.lastObs_timeSince = _lastObs_timeSince
self.lastObs_discharge = _lastObs_discharge

# USGS Observations
if not self._usgs_df.empty:
#
Expand Down Expand Up @@ -159,7 +173,7 @@ def __init__(self, bmi_cfg_file=None):
self.stationStringLengthArray_usgs = _stationStringLengthArray_usgs
self.nStations_usgs = _nStations_usgs
# flatten the actual USGS datafrane into a numpy ndarray
_usgsArray = _flatten_array(self._usgs_df)
_usgsArray = _flatten_array(self._usgs_df, np.float16)
# ... and save it with the class instance
self.usgsArray = _usgsArray

Expand Down Expand Up @@ -191,7 +205,7 @@ def __init__(self, bmi_cfg_file=None):
self.stationStringLengthArray_reservoir_usgs = _stationStringLengthArray_reservoir_usgs
self.nStations_reservoir_usgs = _nStations_reservoir_usgs
# flatten the actual USGS datafrane into a numpy ndarray
_reservoirUsgsArray = _flatten_array(self._reservoir_usgs_df)
_reservoirUsgsArray = _flatten_array(self._reservoir_usgs_df, np.float16)
# ... and save it with the class instance
self.reservoirUsgsArray = _reservoirUsgsArray

Expand All @@ -210,7 +224,7 @@ def __init__(self, bmi_cfg_file=None):
self.stationStringLengthArray_reservoir_usace = _stationStringLengthArray_reservoir_usace
self.nStations_reservoir_usace = _nStations_reservoir_usace
# flatten the actual USACE datafrane into a numpy ndarray
_reservoirUsaceArray = _flatten_array(self._reservoir_usace_df)
_reservoirUsaceArray = _flatten_array(self._reservoir_usace_df, np.float16)
# ... and save it with the class instance
self.reservoirUsaceArray = _reservoirUsaceArray

Expand All @@ -236,89 +250,6 @@ def __init__(self, bmi_cfg_file=None):
self.rfc_List_array = _rfc_List_array
self.rfc_List_stringLengths = _rfc_List_stringLengths

testRevert = False
# The following code segments revert the array troute -> bmi-flattened
# array conversions, and are placed here only temporarily for
# verification purposes, to be moved to DataAssimilation eventually.
# If "reverse" code is not removed, it can be temporarily disabled
# by setting the testRevert flag to False
if (testRevert):

# USGS Observations
if nudging or usgs_persistence:

# USGS dataframe

# Unflatten the arrays
df_raw_usgs = _unflatten_array(self.usgsArray,self.nDates_usgs,\
self.nStations_usgs)

# Decode time/date axis
timeAxisName = 'time'
freqString = '5T'
df_withDates_usgs = _time_retrieve_from_arrays(df_raw_usgs, self.dateNull, \
self.datesSecondsArray_usgs, timeAxisName, freqString)

# Decode station ID axis
stationAxisName = 'stationId'
df_withStationsAndDates_usgs = _stations_retrieve_from_arrays(\
df_withDates_usgs, self.stationArray_usgs, \
self.stationStringLengthArray_usgs, stationAxisName)

# Reservoir USGS dataframe

# Unflatten the arrays
df_raw_reservoirUsgs = _unflatten_array(self.reservoirUsgsArray,\
self.nDates_reservoir_usgs,\
self.nStations_reservoir_usgs)

# Decode time/date axis
timeAxisName = 'time'
freqString = '15T'
df_withDates_reservoirUsgs = _time_retrieve_from_arrays(\
df_raw_reservoirUsgs, self.dateNull, \
self.datesSecondsArray_reservoir_usgs, timeAxisName, freqString)

# Decode station ID axis
stationAxisName = 'stationId'
df_withStationsAndDates_reservoirUsgs = _stations_retrieve_from_arrays\
(df_withDates_reservoirUsgs, self.stationArray_reservoir_usgs, \
self.stationStringLengthArray_reservoir_usgs, stationAxisName)

# USACE Observations
if usace_persistence:

# Reservoir USACE dataframe

# Unflatten the arrays
df_raw_reservoirUsace = _unflatten_array(self.reservoirUsaceArray,\
self.nDates_reservoir_usace,\
self.nStations_reservoir_usace)

# Decode time/date axis
timeAxisName = 'time'
freqString = '15T'
df_withDates_reservoirUsace = _time_retrieve_from_arrays\
(df_raw_reservoirUsace, self.dateNull, \
self.datesSecondsArray_reservoir_usace, timeAxisName, freqString)

# Decode station ID axis
stationAxisName = 'stationId'
df_withStationsAndDates_reservoirUsace = _stations_retrieve_from_arrays\
(df_withDates_reservoirUsace, self.stationArray_reservoir_usace, \
self.stationStringLengthArray_reservoir_usace, stationAxisName)

# RFC Timeseries
if rfc:

# Decode rfc timeseries
df_rfc_timeseries = _bmi_reassemble_rfc_timeseries (self.rfc_da_timestep, \
self.rfc_totalCounts, self.rfc_synthetic_values, \
self.rfc_discharges, self.rfc_timeseries_idx, \
self.rfc_use_rfc, self.rfc_Datetime, self.rfc_timeSteps, \
self.rfc_StationId_array, self.rfc_StationId_stringLengths, \
self.rfc_List_array, self.rfc_List_stringLengths, self.dateNull)


#############################
# Read Restart files:
Expand All @@ -329,13 +260,43 @@ def __init__(self, bmi_cfg_file=None):
self._waterbody_df = pd.DataFrame()

lite_restart_file = self._compute_parameters['restart_parameters']['lite_channel_restart_file']

if lite_restart_file:

self._q0, self._t0 = _read_lite_restart(lite_restart_file)


if not self._q0.empty:

(_q0_columnArray, _q0_columnLengthArray, _q0_nCol, \
_q0_indexArray, _q0_nIndex, _q0_Array) = \
_bmi_disassemble_lite_restart (self._q0,np.float32)

self.q0_columnArray = _q0_columnArray
self.q0_columnLengthArray = _q0_columnLengthArray
self.q0_nCol = _q0_nCol
self.q0_indexArray = _q0_indexArray
self.q0_nIndex = _q0_nIndex
self.q0_Array = _q0_Array

lite_restart_file = self._compute_parameters['restart_parameters']['lite_waterbody_restart_file']

if lite_restart_file:

self._waterbody_df, _ = _read_lite_restart(lite_restart_file)


if not self._waterbody_df.empty:

(_waterbodyLR_columnArray, _waterbodyLR_columnLengthArray, _waterbodyLR_nCol, \
_waterbodyLR_indexArray, _waterbodyLR_nIndex, _waterbodyLR_Array) = \
_bmi_disassemble_lite_restart (self._waterbody_df,np.float64)

self.waterbodyLR_columnArray = _waterbodyLR_columnArray
self.waterbodyLR_columnLengthArray = _waterbodyLR_columnLengthArray
self.waterbodyLR_nCol = _waterbodyLR_nCol
self.waterbodyLR_indexArray = _waterbodyLR_indexArray
self.waterbodyLR_nIndex = _waterbodyLR_nIndex
self.waterbodyLR_Array = _waterbodyLR_Array

else:

raise(RuntimeError("No config file provided."))
Expand Down Expand Up @@ -497,10 +458,10 @@ def _stringsToBMI(stringList):
return (stringArray, stringLengthArray)


def _flatten_array(dataFrame):
def _flatten_array(dataFrame, dataType):

# convert to numpy array first
array1 = dataFrame.to_numpy(copy=True, dtype=np.float16)
array1 = dataFrame.to_numpy(copy=True, dtype=dataType)
# flatten it
array2 = array1.reshape(-1)

Expand Down Expand Up @@ -654,6 +615,88 @@ def _bmi_reassemble_rfc_timeseries (rfc_da_timestep, rfc_totalCounts, \
return dataFrame


def _bmi_disassemble_lite_restart (dataFrame, dataType):

# These are the q0- and waterbody dataframes for "lite-restart"
# index is int64, columns are strings, and array proper is float32 or float64

# get columns and convert to BMI compliant arrays
columnList = (dataFrame.columns).tolist()
nCol = len(columnList)
(columnArray, columnLengthArray) = _stringsToBMI(columnList)

# get index array (already BMI compliant; int64)
indexArray = (dataFrame.index).to_numpy(dtype=np.int64, copy=True)
nIndex = len(indexArray)

mainArray = _flatten_array(dataFrame, dataType)

return (columnArray, columnLengthArray, nCol, indexArray, nIndex, mainArray)


def _bmi_reassemble_lite_restart (columnArray, columnLengthArray, nCol, indexArray,\
nIndex, Array):

# reverse flattening of array proper
df_raw = _unflatten_array(Array,nCol,nIndex)

# get column names back as strings
colList = _BMI_toStrings(columnArray, columnLengthArray)
colListAttach = pd.Index(colList, dtype=object)

# transpose dataframe
df_raw_transpose = df_raw.T
# add column axis as index on transposed dataframe
df_raw_transpose.index = colListAttach
# revert transpose
df_raw_withCol = df_raw_transpose.T

# add index
index = pd.Index(indexArray, dtype=np.int64)
df_raw_withCol.index = index

df_complete = df_raw_withCol

return df_complete


def _bmi_disassemble_lastObs (lastobs_df):

# Column entries that are already float or int:
timeSinceArray = lastobs_df["time_since_lastobs"].to_numpy(dtype=np.float64, copy=True)
lastDischargeArray = lastobs_df["lastobs_discharge"].to_numpy(dtype=np.float64, copy=True)

# Gage IDs: string entry
gageList = (lastobs_df["gages"]).tolist()
(gageArray, gageStringLengthArray) = _stringsToBMI(gageList)

return(gageArray, gageStringLengthArray, timeSinceArray, lastDischargeArray)


def _bmi_reassemble_lastObs (gageArray, gageStringLengthArray, \
timeSinceArray, lastDischargeArray):

# Create empty dataframe with appropriate column names
columnList = ['gages','time_since_lastobs','lastobs_discharge']

# Build up dataframe
dataFrame = pd.DataFrame()
for col in columnList:

if (col == 'gages'):
addedCol = _BMI_toStrings(gageArray, gageStringLengthArray)
elif (col == 'time_since_lastobs'):
addedCol = timeSinceArray
elif (col == 'lastobs_discharge'):
addedCol = lastDischargeArray

# add the selected column
dataFrame[col] = addedCol

return dataFrame



# Utility functions -------
def _read_config_file(custom_input_file):
'''
Expand Down