Skip to content

Commit

Permalink
Adds fixes for new target_cols ingest mechanism (#512)
Browse files Browse the repository at this point in the history
This includes a number of fixes that were required as a result of the new "target_cols" concept that is tied to the "viz_db_ingest" lambda. It's somewhat annoying to manually create the ingest tables for new files - or in this case even files we've been ingesting but which now require additional columns (i.e. "target_cols"). We historically would manually create them since it didn't make sense to create them in the "viz_db_ingest" map items loop. But I added the "iteration_index" argument to the map calls to that function, and thus can check in the function if iteration_index == 0 then we set a create_table variable to True. If either an UndefinedTable (i.e. table does not exist) or BadCopyFileFormat (i.e. a new column is now being ingested that wasn't before) error is thrown and create_table is true, then the table schema will be recreated based exactly on the DataFrame that is being ingested, and the ingest will try again. If the iteration_index is not 0 and one of those same errors is encountered, then the error is caught at the Step Function management level and will retry after waiting 5 seconds.

With this new ingest methodology, a few column names slightly changed. This is because the actual DataFrame column name was used instead of whatever name that was previously manually assigned. Thus, a few product SQL queries had to be updated.

There may be a few remaining issues that I haven't been able to catch yet - since it's only now been less than an hour since fixing the frequent offenders (e.g. lambda_rfc, sns_ana, and man_rnr). If it looks good in the morning, we should immediately deploy this to TI. Once we quickly re-confirm that everything seems to be working, we should then deploy this to UAT.

-----------

* Adds fixes for new target_cols ingest mechanism

* Add "iteration_index" to viz_db_ingest map call

* Adds another "create_table" error for catching
  • Loading branch information
shawncrawley authored Aug 23, 2023
1 parent 64d01d4 commit 694427f
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 32 deletions.
40 changes: 31 additions & 9 deletions Core/LAMBDA/viz_functions/viz_db_ingest/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
import os
import boto3
import json
import re
from datetime import datetime
import numpy as np
import xarray as xr
import pandas as pd
import xarray as xr
from io import StringIO
import re
from psycopg2.errors import UndefinedTable, BadCopyFileFormat, InvalidTextRepresentation
from viz_classes import database
from viz_lambda_shared_funcs import check_if_file_exists

Expand All @@ -40,6 +41,7 @@ def lambda_handler(event, context):
reference_time = event['reference_time']
keep_flows_at_or_above = event['keep_flows_at_or_above']
reference_time_dt = datetime.strptime(reference_time, '%Y-%m-%d %H:%M:%S')
create_table = event.get('iteration_index') == 0

print(f"Checking existance of {file} on S3/Google Cloud/Para Nomads.")
download_path = check_if_file_exists(bucket, file, download=True)
Expand All @@ -66,19 +68,27 @@ def lambda_handler(event, context):
if not target_cols:
target_cols = ds_vars

ds['forecast_hour'] = int(re.findall("(\d{8})/[a-z0-9_]*/.*t(\d{2})z.*[ftm](\d*)\.", file)[0][-1])
try:
ds['forecast_hour'] = int(re.findall("(\d{8})/[a-z0-9_]*/.*t(\d{2})z.*[ftm](\d*)\.", file)[0][-1])
if 'forecast_hour' not in target_cols:
target_cols.append('forecast_hour')
except:
print("Regex pattern for the forecast hour didn't match the netcdf file")

try:
ds['nwm_vers'] = float(ds.NWM_version_number.replace("v",""))
if 'nwm_vers' not in target_cols:
target_cols.append('nwm_vers')
except:
print("NWM_version_number property is not available in the netcdf file")

drop_vars = [var for var in ds_vars if var not in target_cols]
df = ds.to_dataframe().reset_index()
df = df.drop(columns=drop_vars)
ds.close()
if 'streamflow' in ds_vars:
if 'streamflow' in target_cols:
df = df.loc[df['streamflow'] >= keep_flows_at_or_above].round({'streamflow': 2}).copy() # noqa
df = df[target_cols]

elif file.endswith('.csv'):
df = pd.read_csv(download_path)
Expand All @@ -93,11 +103,23 @@ def lambda_handler(event, context):
f = StringIO() # Use StringIO to store the temporary text file in memory (faster than on disk)
df.to_csv(f, sep='\t', index=False, header=False)
f.seek(0)
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
#cursor.copy_from(f, target_table, sep='\t', null='') # This is the command that actual copies the data to db
cursor.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.commit()
try:
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
cursor.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.commit()
except (UndefinedTable, BadCopyFileFormat, InvalidTextRepresentation):
if not create_table:
raise

print("Error encountered. Recreating table now and retrying import...")
create_table_df = df.head(0)
schema, table = target_table.split('.')
create_table_df.to_sql(con=viz_db.engine, schema=schema, name=table, index=False, if_exists='replace')
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
cursor.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.commit()

print(f"--> Import of {len(df)} rows Complete. Removing {download_path} and closing db connection.")
os.remove(download_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ SELECT
END AS record_forecast,
metadata.producer,
metadata.issuer,
to_char(metadata.issued_time::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS issued_time,
to_char(metadata.generation_time::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS generation_time,
to_char(metadata."issuedTime"::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS issued_time,
to_char(metadata."generationTime"::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS generation_time,
metadata.usgs_sitecode,
metadata.feature_id,
metadata.nws_name,
Expand All @@ -120,4 +120,4 @@ FROM ingest.ahps_metadata as metadata
JOIN max_stage ON max_stage.nws_lid = metadata.nws_lid
JOIN min_stage ON min_stage.nws_lid = metadata.nws_lid
JOIN initial_stage ON initial_stage.nws_lid = metadata.nws_lid
WHERE metadata.issued_time::timestamp without time zone > ('1900-01-01 00:00:00'::timestamp without time zone - INTERVAL '26 hours') AND metadata.nws_lid NOT IN (SELECT nws_lid FROM derived.ahps_restricted_sites);
WHERE metadata."issuedTime"::timestamp without time zone > ('1900-01-01 00:00:00'::timestamp without time zone - INTERVAL '26 hours') AND metadata.nws_lid NOT IN (SELECT nws_lid FROM derived.ahps_restricted_sites);
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def setup_huc_inundation(event):
if sql.strip().endswith(';'):
sql = sql.replace(';', f' group by {alias}.feature_id, streamflow_cms')
else:
sql += " group by max_forecast.feature_id, streamflow_cms"
sql += f" group by {alias}.feature_id, streamflow_cms"

sql = sql.replace(";", "")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ product_type: "vector" # Needed to not fail, but obviously there's nothing more
run: true

ingest_files:
- file_format: common/data/model/com/nwm/prod/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: None
file_window: None
target_table: ingest.nwm_channel_rt_ana
target_keys: (feature_id, streamflow)
target_cols: ['feature_id', 'streamflow', 'velocity', 'qBucket']

- file_format: common/data/model/com/nwm/prod/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.reservoir.tm00.conus.nc
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.reservoir.tm00.conus.nc
file_step: None
file_window: None
target_table: ingest.nwm_reservoir_ana_full
target_table: ingest.nwm_reservoir_ana
target_cols: ['feature_id', 'water_sfc_elev', 'outflow']
target_keys: (feature_id)
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,8 @@ ingest_files:
file_step: None
file_window: None
target_table: ingest.rnr_wrf_hydro_outputs
target_cols: ['station_id', 'time', 'streamflow', 'forecast_hour']
target_keys: (station_id)

- file_format: replace_route/{{datetime:%Y%m%d}}/forecasts/{{datetime:%H}}Z_run_issue_times.csv
file_step: None
file_window: None
target_table: ingest.rnr_forecasts
target_keys: (lid)

db_max_flows:
- name: rnr_max_flows
Expand Down
16 changes: 8 additions & 8 deletions Core/StepFunctions/execute_replace_route.json.tftpl
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"Comment": "A description of my state machine",
"StartAt": "Create Domain Tables",
"StartAt": "Create RnR Domain Tables",
"States": {
"Create Domain Tables": {
"Create RnR Domain Tables": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
Expand Down Expand Up @@ -34,9 +34,9 @@
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "Create Domain-Specifc WRF-Hydro Input File",
"StartAt": "Create WRF-Hydro Domain File",
"States": {
"Create Domain-Specifc WRF-Hydro Input File": {
"Create WRF-Hydro Domain File": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
Expand Down Expand Up @@ -67,10 +67,10 @@
"run_time.$": "$.run_time",
"step.$": "$$.Map.Item.Value"
},
"Next": "Kick off WRF-Hydro",
"Next": "Run WRF-Hydro",
"ResultPath": null
},
"Kick off WRF-Hydro": {
"Run WRF-Hydro": {
"Type": "Task",
"Parameters": {
"DocumentName": "AWS-RunShellScript",
Expand All @@ -85,9 +85,9 @@
},
"Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand.waitForTaskToken",
"TimeoutSeconds": 600,
"Next": "Initialize Pipeline"
"Next": "Initialize RnR Viz Pipeline"
},
"Initialize Pipeline": {
"Initialize RnR Viz Pipeline": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
Expand Down
26 changes: 24 additions & 2 deletions Core/StepFunctions/viz_processing_pipeline.json.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,17 @@
}
],
"Next": "Input Data Files",
"ResultPath": null
"ResultPath": null,
"Catch": [
{
"ErrorEquals": [
"UndefinedTable"
],
"Comment": "Table Does Not Exist",
"Next": "Input Data Files",
"ResultPath": null
}
]
},
"Input Data Files": {
"Type": "Map",
Expand Down Expand Up @@ -131,6 +141,17 @@
"MaxAttempts": 6,
"BackoffRate": 2,
"Comment": "Lambda Service Errors"
},
{
"ErrorEquals": [
"UndefinedTable",
"BadCopyFileFormat",
"InvalidTextRepresentation"
],
"BackoffRate": 2,
"IntervalSeconds": 5,
"MaxAttempts": 2,
"Comment": "Recreate Table Errors"
}
]
}
Expand All @@ -144,7 +165,8 @@
"target_cols.$": "$.db_ingest_group.target_cols",
"bucket.$": "$.db_ingest_group.bucket",
"reference_time.$": "$.reference_time",
"keep_flows_at_or_above.$": "$.db_ingest_group.keep_flows_at_or_above"
"keep_flows_at_or_above.$": "$.db_ingest_group.keep_flows_at_or_above",
"iteration_index.$": "$$.Map.Item.Index"
},
"ItemsPath": "$.db_ingest_group.ingest_datasets"
},
Expand Down

0 comments on commit 694427f

Please sign in to comment.