Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Viz - Dependent products postprocess, add states to attribute table, peak flow arrival optimization #425

Merged
merged 22 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
95ead92
postprocess sql edits for dependent_on_db_tables + peak flow optimiza…
TylerSchrag-NOAA Apr 26, 2023
966ff18
initialize pipeline updates for dependent_on_db_tables + peak flow ya…
TylerSchrag-NOAA Apr 26, 2023
ee3471c
viz step function updates for dependent_on_db_tables in product postp…
TylerSchrag-NOAA Apr 26, 2023
6fa080f
first pass - adding state to sql files
TylerSchrag-NOAA Apr 26, 2023
aa2d5af
Addition of state columns to main service mapx files
TylerSchrag-NOAA Apr 26, 2023
faf1f9a
Fixing group by error in peak flow sql
TylerSchrag-NOAA Apr 26, 2023
d0fc184
Removing state from drop column list in wrds api handler, and adding …
TylerSchrag-NOAA Apr 26, 2023
bf59aee
Reduced mapx string field lengths from 60000 max
TylerSchrag-NOAA Apr 27, 2023
2f193f3
Fixing config files to allow for multiple dependent on tables
TylerSchrag-NOAA Apr 27, 2023
60b249f
Fix to postprocess_sql check
TylerSchrag-NOAA May 2, 2023
bbe4c49
removal of max_flows_already_processed function in postprocess_sql fu…
TylerSchrag-NOAA May 3, 2023
10d46eb
minor sql change for inundation - didn't like my new state field with…
TylerSchrag-NOAA May 3, 2023
93db11a
Updated derived db dump file in viz database
TylerSchrag-NOAA May 3, 2023
76da03b
Tweak to sd creation script and fix to one mapx file.
TylerSchrag-NOAA May 3, 2023
6087753
fix to readme to specify ti instead of uat branch
TylerSchrag-NOAA May 3, 2023
b621e43
Update srf_peak_flow_arrival_time_hi.yml
CoreyKrewson-NOAA May 5, 2023
3fe0ebf
Update mrf_peak_flow_arrival_time.yml
CoreyKrewson-NOAA May 5, 2023
4f513bb
Update srf_peak_flow_arrival_time.yml
CoreyKrewson-NOAA May 5, 2023
042413a
Update srf_peak_flow_arrival_time_hi.yml
CoreyKrewson-NOAA May 5, 2023
bbc8954
Update srf_peak_flow_arrival_time_prvi.yml
CoreyKrewson-NOAA May 5, 2023
676202f
Update template.yml
CoreyKrewson-NOAA May 5, 2023
2c8dd1c
Update viz_processing_pipeline.json.tftpl
CoreyKrewson-NOAA May 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Core/EC2/RDSBastion/scripts/viz/postgresql_setup.sh.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ egisDB_fim_catchments="egisDB_fim_catchments_2023_0406.dump"
vizDB_admin="vizDB_admin_2023_0424.dump"
vizDB_archive="vizDB_archive_2023_0214.dump"
vizDB_cache="vizDB_cache_2023_0424.dump"
vizDB_derived="vizDB_derived_2023_0424.dump"
vizDB_derived="vizDB_derived_2023_0426.dump"
vizDB_ingest="vizDB_ingest_2023_0424.dump"
vizDB_publish="vizDB_publish_2023_0224.dump"
vizDB_external="vizDB_external_2023_0214.dump"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ SELECT
inun.huc8,
inun.geom,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
derived.channels_conus.strm_order,
derived.channels_conus.name
channels.strm_order,
channels.name,
channels.state
INTO publish.ana_inundation
FROM ingest.ana_inundation as inun
left join derived.channels_conus ON derived.channels_conus.feature_id = inun.feature_id;
left join derived.channels_conus as channels ON channels.feature_id = inun.feature_id;
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ SELECT
inun.huc8,
inun.geom,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
derived.channels_hi.strm_order,
derived.channels_hi.name
channels.strm_order,
channels.name,
'HI' as state
INTO publish.ana_inundation_hi
FROM ingest.ana_inundation_hi as inun
left join derived.channels_hi ON derived.channels_hi.feature_id = inun.feature_id_str
left join derived.channels_hi AS channels ON channels.feature_id = inun.feature_id_str
--Add an empty row so that service monitor will pick up a reference and update time in the event of no fim features.
UNION SELECT -9999, '-9999', 'NA', -9999, '-9999', -9999, -9999, -9999, -9999, 'NA', to_char('1900-01-01 00:00:00'::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC'),
to_char('1900-01-01 00:00:00'::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC'), '-9999', NULL,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time, -9999, NULL;
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time, -9999, NULL, 'HI';
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ SELECT
inun.huc8,
inun.geom,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
derived.channels_prvi.strm_order,
derived.channels_prvi.name
channels.strm_order,
channels.name,
'PRVI' AS state
INTO publish.ana_inundation_prvi
FROM ingest.ana_inundation_prvi as inun
left join derived.channels_prvi ON derived.channels_prvi.feature_id = inun.feature_id
left join derived.channels_prvi as channels ON channels.feature_id = inun.feature_id
--Add an empty row so that service monitor will pick up a reference and update time in the event of no fim features.
UNION SELECT -9999, '-9999', 'NA', -9999, '-9999', -9999, -9999, -9999, -9999, 'NA', to_char('1900-01-01 00:00:00'::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC'),
to_char('1900-01-01 00:00:00'::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC'), '-9999', NULL,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time, -9999, NULL;
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time, -9999, NULL, 'PRVI';
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ SELECT
inun.huc8,
inun.geom,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
derived.channels_conus.strm_order,
derived.channels_conus.name,
channels.strm_order,
channels.name,
channels.state,
'ana_past_14day' AS config
INTO publish.ana_past_14day_max_inundation
FROM ingest.ana_past_14day_max_inundation as inun
left join derived.channels_conus ON derived.channels_conus.feature_id = inun.feature_id;
left join derived.channels_conus as channels ON channels.feature_id = inun.feature_id;
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ SELECT
inun.huc8,
inun.geom,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
derived.channels_conus.strm_order,
derived.channels_conus.name,
channels.strm_order,
channels.name,
channels.state,
'ana_past_7day' AS config
INTO publish.ana_past_7day_max_inundation
FROM ingest.ana_past_7day_max_inundation as inun
left join derived.channels_conus ON derived.channels_conus.feature_id = inun.feature_id;
left join derived.channels_conus as channels ON channels.feature_id = inun.feature_id;
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ SELECT
inun.huc8,
inun.geom,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
derived.channels_conus.strm_order,
derived.channels_conus.name,
channels.strm_order,
channels.name,
channels.state,
'mrf_10day' AS config
INTO publish.mrf_max_inundation_10day
FROM ingest.mrf_max_inundation_10day as inun
left join derived.channels_conus ON derived.channels_conus.feature_id = inun.feature_id;
left join derived.channels_conus as channels ON inun.feature_id = channels.feature_id;
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ SELECT
inun.huc8,
inun.geom,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
derived.channels_conus.strm_order,
derived.channels_conus.name,
channels.strm_order,
channels.name,
channels.state,
'mrf_3day' AS config
INTO publish.mrf_max_inundation_3day
FROM ingest.mrf_max_inundation_3day as inun
left join derived.channels_conus ON derived.channels_conus.feature_id = inun.feature_id;
left join derived.channels_conus as channels ON inun.feature_id = channels.feature_id;
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ SELECT
inun.huc8,
inun.geom,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
derived.channels_conus.strm_order,
derived.channels_conus.name,
channels.strm_order,
channels.name,
channels.state,
'mrf_5day' AS config
INTO publish.mrf_max_inundation_5day
FROM ingest.mrf_max_inundation_5day as inun
left join derived.channels_conus ON derived.channels_conus.feature_id = inun.feature_id;
left join derived.channels_conus as channels ON inun.feature_id = channels.feature_id;
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ SELECT
inun.huc8,
inun.geom,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
derived.channels_conus.strm_order,
derived.channels_conus.name,
channels.strm_order,
channels.name,
channels.state,
agg_status.inherited_rfc_forecasts,
agg_status.max_status
INTO publish.rfc_5day_max_downstream_inundation
FROM ingest.rfc_5day_max_downstream_inundation as inun
JOIN agg_status ON inun.feature_id = agg_status.feature_id
left join derived.channels_conus ON derived.channels_conus.feature_id = inun.feature_id;
left join derived.channels_conus as channels ON channels.feature_id = inun.feature_id;
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ SELECT
inun.huc8,
inun.geom,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
derived.channels_conus.strm_order,
derived.channels_conus.name
channels.strm_order,
channels.name,
channels.state
INTO publish.srf_max_inundation
FROM ingest.srf_max_inundation as inun
left join derived.channels_conus ON derived.channels_conus.feature_id = inun.feature_id;
left join derived.channels_conus as channels ON channels.feature_id = inun.feature_id;
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ SELECT
inun.huc8,
inun.geom,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
derived.channels_hi.strm_order,
derived.channels_hi.name
channels.strm_order,
channels.name,
'HI' AS state
INTO publish.srf_max_inundation_hi
FROM ingest.srf_max_inundation_hi as inun
left join derived.channels_hi ON derived.channels_hi.feature_id = inun.feature_id
left join derived.channels_hi as channels ON channels.feature_id = inun.feature_id
--Add an empty row so that service monitor will pick up a reference and update time in the event of no fim features.
UNION SELECT -9999, '-9999', 'NA', -9999, '-9999', -9999, -9999, -9999, -9999, 'NA', to_char('1900-01-01 00:00:00'::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC'),
'-9999', NULL, to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time, -9999, NULL;
'-9999', NULL, to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time, -9999, NULL, 'HI';
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ SELECT
inun.huc8,
inun.geom,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
derived.channels_prvi.strm_order,
derived.channels_prvi.name
channels.strm_order,
channels.name,
'PRVI' AS state
INTO publish.srf_max_inundation_prvi
FROM ingest.srf_max_inundation_prvi as inun
left join derived.channels_prvi ON derived.channels_prvi.feature_id = inun.feature_id
left join derived.channels_conus as channels ON channels.feature_id = inun.feature_id
--Add an empty row so that service monitor will pick up a reference and update time in the event of no fim features.
UNION SELECT -9999, '-9999', 'NA', -9999, '-9999', -9999, -9999, -9999, -9999, 'NA', to_char('1900-01-01 00:00:00'::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC'),
'-9999', NULL, to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time, -9999, NULL;
'-9999', NULL, to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time, -9999, NULL, 'PRVI';
69 changes: 38 additions & 31 deletions Core/LAMBDA/viz_functions/viz_db_postprocess_sql/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,51 @@
import os
from viz_classes import database

class RequiredTableNotUpdated(Exception):
""" This is a custom exception to report back to the AWS Step Function that a required table (dependent_on_table) has not yet been updated with the current reference time. """

def lambda_handler(event, context):
step = event['step']
folder = event['folder']
reference_time = event['args']['reference_time']
sql_replace = event['args']['sql_rename_dict']
sql_replace.update({'1900-01-01 00:00:00': reference_time}) #setup a replace dictionary, starting with the reference time of the current pipeline.

# Don't run any SQL if it's a reference service
if step in ["products", "fim_config"]:
if event['args']['product']['configuration'] == "reference":
return

# Admin tasks
if folder == 'admin':
run_admin_tasks(event, folder, step, sql_replace)
else:
# TODO: Clean up this conditional logic to be more readable.
if step == 'summaries':
folder = os.path.join(folder, event['args']['product']['product'])
sql_file = event['args']['postprocess_summary']['sql_file']

elif step == "max_flows":
# Max Flow
if step == "max_flows":
sql_file = event['args']['db_max_flow']['max_flows_sql_file']
# FIM Config
elif step == 'fim_config':
if not event['args']['fim_config'].get('postprocess'):
return

sql_file = event['args']['fim_config']['postprocess']['sql_file']
else:
# Product
elif step == "products":
folder = os.path.join(folder, event['args']['product']['configuration'])
sql_file = event['args']['postprocess_sql']['sql_file']
if event.get('args').get('postprocess_sql').get('dependent_on_db_tables'):
dependent_on_tables = event['args']['postprocess_sql']['dependent_on_db_tables']
if len(dependent_on_tables) > 0:
print("Dependent DB Table Specified - Checking if complete.")
if check_required_tables(dependent_on_tables, reference_time, sql_replace) is False:
print("Reference Time not yet present in required table. Pausing and Retrying.")
raise RequiredTableNotUpdated
# Summary
elif step == 'summaries':
folder = os.path.join(folder, event['args']['product']['product'])
sql_file = event['args']['postprocess_summary']['sql_file']

### Run the Appropriate SQL File ###
sql_path = f"{folder}/{sql_file}.sql"

if step == 'max_flows' and max_flows_already_processed(sql_path, reference_time, sql_replace):
return True

run_sql(sql_path, sql_replace)

return True
Expand Down Expand Up @@ -94,19 +101,19 @@ def run_sql(sql_path_or_str, sql_replace=None):
print(f"Finished executing the SQL statement above.")
return result

def max_flows_already_processed(sql_path, reference_time, sql_replace):
sql = open(sql_path, 'r').read().lower()
for word, replacement in sql_replace.items():
sql = re.sub(word, replacement, sql, flags=re.IGNORECASE).replace('utc', 'UTC')
schema, table = re.search('into (\w+)\.(\w+)', sql).groups()
sql = f'SELECT reference_time FROM {schema}.{table} LIMIT 1;'
result = run_sql(sql)

if not result:
return False

if result[0] == reference_time:
print(f"NOTE: {sql_path} was already executed for reference time {reference_time}")
return True
else:
return False
# Check if a specified reference time is present in a database table
def check_required_tables(dependent_on_tables, reference_time, sql_replace):
for table in dependent_on_tables:
sql = f"SELECT reference_time FROM {table} LIMIT 1"
for word, replacement in sql_replace.items():
sql = re.sub(word, replacement, sql, flags=re.IGNORECASE).replace('utc', 'UTC')
try:
result = run_sql(sql)
except: # table doesn't exist
return False
if not result: # table is empty
return False
elif result[0].replace(" UTC", "") == reference_time: # table reference time matches current reference time
return True
else: # table reference time doesn't match current reference time
return False
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ SELECT channels.feature_id,
channels.feature_id::TEXT AS feature_id_str,
channels.strm_order,
channels.name,
channels.state,
channels.huc6,
high_flow_mag.nwm_vers,
high_flow_mag.reference_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SELECT channels.feature_id,
channels.strm_order,
channels.name,
channels.huc6,
channels.state,
hfm_14day.nwm_vers,
hfm_14day.reference_time,
hfm_14day.reference_time AS valid_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SELECT channels.feature_id,
channels.strm_order,
channels.name,
channels.huc6,
channels.state,
hfm_7day.nwm_vers,
hfm_7day.reference_time,
hfm_7day.reference_time AS valid_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SELECT
channels.strm_order::integer,
channels.name,
channels.huc6,
channels.state,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
channels.geom
INTO publish.ana_streamflow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ SELECT
channels.strm_order::integer,
channels.name,
channels.huc6,
'AK' AS state,
channels.geom
INTO publish.ana_streamflow_ak
FROM cache.max_flows_ana_ak as ana
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ SELECT channels.feature_id,
channels.strm_order,
channels.name,
channels.huc6,
'HI' as state,
high_flow_mag.nwm_vers,
high_flow_mag.reference_time,
high_flow_mag.reference_time AS valid_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SELECT
channels.strm_order::integer,
channels.name,
channels.huc6,
'HI' as state,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
channels.geom
INTO publish.ana_streamflow_hi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ SELECT channels.feature_id,
channels.strm_order,
name,
huc6,
'PRVI' as state,
high_flow_mag.nwm_vers,
high_flow_mag.reference_time,
high_flow_mag.reference_time AS valid_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SELECT
channels.strm_order::integer,
channels.name,
channels.huc6,
'PRVI' AS state,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
channels.geom
INTO publish.ana_streamflow_prvi
Expand Down
Loading