Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds fixes for new target_cols ingest mechanism #512

Merged
merged 3 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions Core/LAMBDA/viz_functions/viz_db_ingest/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
import os
import boto3
import json
import re
from datetime import datetime
import numpy as np
import xarray as xr
import pandas as pd
import xarray as xr
from io import StringIO
import re
from psycopg2.errors import UndefinedTable, BadCopyFileFormat, InvalidTextRepresentation
from viz_classes import database
from viz_lambda_shared_funcs import check_if_file_exists

Expand All @@ -40,6 +41,7 @@ def lambda_handler(event, context):
reference_time = event['reference_time']
keep_flows_at_or_above = event['keep_flows_at_or_above']
reference_time_dt = datetime.strptime(reference_time, '%Y-%m-%d %H:%M:%S')
create_table = event.get('iteration_index') == 0

print(f"Checking existance of {file} on S3/Google Cloud/Para Nomads.")
download_path = check_if_file_exists(bucket, file, download=True)
Expand All @@ -66,19 +68,27 @@ def lambda_handler(event, context):
if not target_cols:
target_cols = ds_vars

ds['forecast_hour'] = int(re.findall("(\d{8})/[a-z0-9_]*/.*t(\d{2})z.*[ftm](\d*)\.", file)[0][-1])
try:
ds['forecast_hour'] = int(re.findall("(\d{8})/[a-z0-9_]*/.*t(\d{2})z.*[ftm](\d*)\.", file)[0][-1])
if 'forecast_hour' not in target_cols:
target_cols.append('forecast_hour')
except:
print("Regex pattern for the forecast hour didn't match the netcdf file")

try:
ds['nwm_vers'] = float(ds.NWM_version_number.replace("v",""))
if 'nwm_vers' not in target_cols:
target_cols.append('nwm_vers')
except:
print("NWM_version_number property is not available in the netcdf file")

drop_vars = [var for var in ds_vars if var not in target_cols]
df = ds.to_dataframe().reset_index()
df = df.drop(columns=drop_vars)
ds.close()
if 'streamflow' in ds_vars:
if 'streamflow' in target_cols:
df = df.loc[df['streamflow'] >= keep_flows_at_or_above].round({'streamflow': 2}).copy() # noqa
df = df[target_cols]

elif file.endswith('.csv'):
df = pd.read_csv(download_path)
Expand All @@ -93,11 +103,23 @@ def lambda_handler(event, context):
f = StringIO() # Use StringIO to store the temporary text file in memory (faster than on disk)
df.to_csv(f, sep='\t', index=False, header=False)
f.seek(0)
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
#cursor.copy_from(f, target_table, sep='\t', null='') # This is the command that actual copies the data to db
cursor.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.commit()
try:
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
cursor.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.commit()
except (UndefinedTable, BadCopyFileFormat, InvalidTextRepresentation):
if not create_table:
raise

print("Error encountered. Recreating table now and retrying import...")
create_table_df = df.head(0)
schema, table = target_table.split('.')
create_table_df.to_sql(con=viz_db.engine, schema=schema, name=table, index=False, if_exists='replace')
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
cursor.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.commit()

print(f"--> Import of {len(df)} rows Complete. Removing {download_path} and closing db connection.")
os.remove(download_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ SELECT
END AS record_forecast,
metadata.producer,
metadata.issuer,
to_char(metadata.issued_time::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS issued_time,
to_char(metadata.generation_time::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS generation_time,
to_char(metadata."issuedTime"::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS issued_time,
to_char(metadata."generationTime"::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS generation_time,
metadata.usgs_sitecode,
metadata.feature_id,
metadata.nws_name,
Expand All @@ -120,4 +120,4 @@ FROM ingest.ahps_metadata as metadata
JOIN max_stage ON max_stage.nws_lid = metadata.nws_lid
JOIN min_stage ON min_stage.nws_lid = metadata.nws_lid
JOIN initial_stage ON initial_stage.nws_lid = metadata.nws_lid
WHERE metadata.issued_time::timestamp without time zone > ('1900-01-01 00:00:00'::timestamp without time zone - INTERVAL '26 hours') AND metadata.nws_lid NOT IN (SELECT nws_lid FROM derived.ahps_restricted_sites);
WHERE metadata."issuedTime"::timestamp without time zone > ('1900-01-01 00:00:00'::timestamp without time zone - INTERVAL '26 hours') AND metadata.nws_lid NOT IN (SELECT nws_lid FROM derived.ahps_restricted_sites);
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def setup_huc_inundation(event):
if sql.strip().endswith(';'):
sql = sql.replace(';', f' group by {alias}.feature_id, streamflow_cms')
else:
sql += " group by max_forecast.feature_id, streamflow_cms"
sql += f" group by {alias}.feature_id, streamflow_cms"

sql = sql.replace(";", "")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ product_type: "vector" # Needed to not fail, but obviously there's nothing more
run: true

ingest_files:
- file_format: common/data/model/com/nwm/prod/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: None
file_window: None
target_table: ingest.nwm_channel_rt_ana
target_keys: (feature_id, streamflow)
target_cols: ['feature_id', 'streamflow', 'velocity', 'qBucket']

- file_format: common/data/model/com/nwm/prod/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.reservoir.tm00.conus.nc
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.reservoir.tm00.conus.nc
file_step: None
file_window: None
target_table: ingest.nwm_reservoir_ana_full
target_table: ingest.nwm_reservoir_ana
target_cols: ['feature_id', 'water_sfc_elev', 'outflow']
target_keys: (feature_id)
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,8 @@ ingest_files:
file_step: None
file_window: None
target_table: ingest.rnr_wrf_hydro_outputs
target_cols: ['station_id', 'time', 'streamflow', 'forecast_hour']
target_keys: (station_id)

- file_format: replace_route/{{datetime:%Y%m%d}}/forecasts/{{datetime:%H}}Z_run_issue_times.csv
file_step: None
file_window: None
target_table: ingest.rnr_forecasts
target_keys: (lid)

db_max_flows:
- name: rnr_max_flows
Expand Down
16 changes: 8 additions & 8 deletions Core/StepFunctions/execute_replace_route.json.tftpl
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"Comment": "A description of my state machine",
"StartAt": "Create Domain Tables",
"StartAt": "Create RnR Domain Tables",
"States": {
"Create Domain Tables": {
"Create RnR Domain Tables": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
Expand Down Expand Up @@ -34,9 +34,9 @@
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "Create Domain-Specifc WRF-Hydro Input File",
"StartAt": "Create WRF-Hydro Domain File",
"States": {
"Create Domain-Specifc WRF-Hydro Input File": {
"Create WRF-Hydro Domain File": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
Expand Down Expand Up @@ -67,10 +67,10 @@
"run_time.$": "$.run_time",
"step.$": "$$.Map.Item.Value"
},
"Next": "Kick off WRF-Hydro",
"Next": "Run WRF-Hydro",
"ResultPath": null
},
"Kick off WRF-Hydro": {
"Run WRF-Hydro": {
"Type": "Task",
"Parameters": {
"DocumentName": "AWS-RunShellScript",
Expand All @@ -85,9 +85,9 @@
},
"Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand.waitForTaskToken",
"TimeoutSeconds": 600,
"Next": "Initialize Pipeline"
"Next": "Initialize RnR Viz Pipeline"
},
"Initialize Pipeline": {
"Initialize RnR Viz Pipeline": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
Expand Down
26 changes: 24 additions & 2 deletions Core/StepFunctions/viz_processing_pipeline.json.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,17 @@
}
],
"Next": "Input Data Files",
"ResultPath": null
"ResultPath": null,
"Catch": [
{
"ErrorEquals": [
"UndefinedTable"
],
"Comment": "Table Does Not Exist",
"Next": "Input Data Files",
"ResultPath": null
}
]
},
"Input Data Files": {
"Type": "Map",
Expand Down Expand Up @@ -131,6 +141,17 @@
"MaxAttempts": 6,
"BackoffRate": 2,
"Comment": "Lambda Service Errors"
},
{
"ErrorEquals": [
"UndefinedTable",
"BadCopyFileFormat",
"InvalidTextRepresentation"
],
"BackoffRate": 2,
"IntervalSeconds": 5,
"MaxAttempts": 2,
"Comment": "Recreate Table Errors"
}
]
}
Expand All @@ -144,7 +165,8 @@
"target_cols.$": "$.db_ingest_group.target_cols",
"bucket.$": "$.db_ingest_group.bucket",
"reference_time.$": "$.reference_time",
"keep_flows_at_or_above.$": "$.db_ingest_group.keep_flows_at_or_above"
"keep_flows_at_or_above.$": "$.db_ingest_group.keep_flows_at_or_above",
"iteration_index.$": "$$.Map.Item.Index"
},
"ItemsPath": "$.db_ingest_group.ingest_datasets"
},
Expand Down