Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic check dependent tables #447

Merged
merged 8 commits into from
May 18, 2023
118 changes: 118 additions & 0 deletions Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
import boto3
import os
import json
import re
import urllib.parse
import inspect
from botocore.exceptions import ClientError


class RequiredTableNotUpdated(Exception):
""" This is a custom exception to report back to the AWS Step Function that a required table does not exist or has not yet been updated with the current reference time. """

###################################################################################################################################################
###################################################################################################################################################
class database: #TODO: Should we be creating a connection/engine upon initialization, or within each method like we are now?
Expand Down Expand Up @@ -178,6 +183,119 @@ def cache_data(self, table, reference_time, retention_days=30):
db_engine.execute(f'SELECT * INTO {new_archive_table} FROM publish.{table};')
db_engine.dispose()
print(f"---> Wrote cache data into {new_archive_table} and dropped corresponding table from {retention_days} days ago, if it existed.")

###########################################
def required_tables_updated(self, sql_path_or_str, sql_replace={}, reference_time=None, stop_on_first_issue=True, raise_if_false=False):
""" Determines if tables required by provided SQL path or string are updated as expected

Args:
sql_path_or_str (str): Path to SQL file or raw SQL string
sql_replace (dict): Dictionary containing find/replace values for SQL, if applicable
reference_time (str): The reference_time that should be compared against for tables that contain a
reference_time column. If the table does not contain that column, it is
considered to be up to date
stop_on_first_issue (bool): If True, the first issue encountered will cause the script to terminate
either returning false or raising an exception if raise_if_false is also True. If False, every
error will be explored before returning (only useful if raise_if_false is True since the error
message will thus contain all relevant failures, rather than just the first.)
raise_if_false (bool): If True, a custom RequiredTableNotUpdated exception will be raised
if either a table does not exist, or if the reference_time column
exists its current value does not match the provided reference_time. The specific
details of the failure will be included in the exception message, which will only
be the first failure encountered unless stop_on_first_issue is False.

Raises:
RequiredTableNotUpdated if raise_if_false is True

Returns:
Bool. True if no issues encountered, False otherwise.
"""
issues_encountered = []
# Determine if arg is file or raw SQL string
if os.path.exists(sql_path_or_str):
sql = open(sql_path_or_str, 'r').read()
else:
sql = sql_path_or_str

for word, replacement in sql_replace.items():
sql = re.sub(word, replacement, sql, flags=re.IGNORECASE).replace('utc', 'UTC')

required_tables = set(re.findall('(?<=FROM |JOIN )\w+\.\w+', sql, flags=re.IGNORECASE))
if not required_tables:
return True

# This next 3 lines were added specifically to abort checking cache.max_flows_ana when creating
# cache.max_flows_ana_past_hour since cache.max_flow_ana will always be an hour behind at the
# time of creating the past_hour table. Rather than hard-code it, I've left it more generalized
# in case other similar cases come up. But this could ideally be removed once We figure out a
# new method for storing the past hour of max_flows_ana.
tables_that_invalidate_check = set(re.findall('(?<=INTO )\w+\.\w*past\w*', sql, flags=re.IGNORECASE))
if tables_that_invalidate_check:
return True

# Required tables exist and should be checked
with self.connection as connection:
cur = connection.cursor()
for table in required_tables:
if issues_encountered and stop_on_first_issue:
break
schemaname, tablename = table.lower().split('.')
sql = f'''
SELECT EXISTS (
SELECT FROM
pg_tables
WHERE
schemaname = '{schemaname}' AND
tablename = '{tablename}'
);
'''
cur.execute(sql)
table_exists = cur.fetchone()[0]

if not table_exists:
issues_encountered.append(f'Table {table} does not exist.')
continue

# Table exists.

if not reference_time or any(x in table for x in ['past', 'ahps']):
continue

# Reference time provided.

# Check if reference_time column exists and if its entry matches
sql = f'''
SELECT EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema='{schemaname}' AND table_name='{tablename}' AND column_name='reference_time'
);
'''
cur.execute(sql)
reftime_col_exists = cur.fetchone()[0]
if not reftime_col_exists:
continue

# Column 'reference_time' exists

# Check if it matches
sql = f"SELECT reference_time FROM {table} LIMIT 1"
cur.execute(sql)
reftime_result = cur.fetchone()
if not reftime_result: # table is empty
issues_encountered.append(f'Table {table} is empty.')
continue

data_reftime = reftime_result[0].replace(" UTC", "")
if data_reftime != reference_time: # table reference time matches current reference time
issues_encountered.append(f'Table {table} has unexpected reftime. Expected {reference_time} but found {data_reftime}.')
continue

if issues_encountered:
if raise_if_false:
raise RequiredTableNotUpdated(' '.join(issues_encountered))
return False
return True

###################################################################################################################################################
###################################################################################################################################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
import os
from viz_classes import database

class RequiredTableNotUpdated(Exception):
""" This is a custom exception to report back to the AWS Step Function that a required table (dependent_on_table) has not yet been updated with the current reference time. """

def lambda_handler(event, context):
step = event['step']
folder = event['folder']
Expand Down Expand Up @@ -33,20 +30,19 @@ def lambda_handler(event, context):
elif step == "products":
folder = os.path.join(folder, event['args']['product']['configuration'])
sql_file = event['args']['postprocess_sql']['sql_file']
if event.get('args').get('postprocess_sql').get('dependent_on_db_tables'):
dependent_on_tables = event['args']['postprocess_sql']['dependent_on_db_tables']
if len(dependent_on_tables) > 0:
print("Dependent DB Table Specified - Checking if complete.")
if check_required_tables(dependent_on_tables, reference_time, sql_replace) is False:
print("Reference Time not yet present in required table. Pausing and Retrying.")
raise RequiredTableNotUpdated
# Summary
elif step == 'summaries':
folder = os.path.join(folder, event['args']['product']['product'])
sql_file = event['args']['postprocess_summary']['sql_file']

### Run the Appropriate SQL File ###
sql_path = f"{folder}/{sql_file}.sql"

# Checks if all tables references in sql file exist and are updated (if applicable)
# Raises a custom RequiredTableNotUpdated if not, which will be caught by viz_pipline
# and invoke a retry
database(db_type="viz").required_tables_updated(sql_path, sql_replace, reference_time, raise_if_false=True)

run_sql(sql_path, sql_replace)

return True
Expand Down Expand Up @@ -100,20 +96,3 @@ def run_sql(sql_path_or_str, sql_replace=None):
connection.commit()
print(f"Finished executing the SQL statement above.")
return result

# Check if a specified reference time is present in a database table
def check_required_tables(dependent_on_tables, reference_time, sql_replace):
for table in dependent_on_tables:
sql = f"SELECT reference_time FROM {table} LIMIT 1"
for word, replacement in sql_replace.items():
sql = re.sub(word, replacement, sql, flags=re.IGNORECASE).replace('utc', 'UTC')
try:
result = run_sql(sql)
except: # table doesn't exist
return False
if not result: # table is empty
return False
elif result[0].replace(" UTC", "") == reference_time: # table reference time matches current reference time
return True
else: # table reference time doesn't match current reference time
return False
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ def setup_huc_inundation(event):

# Find the sql file, and replace any items in the dictionary
sql_path = f'data_sql/{fim_config_sql}.sql'

# Checks if all tables references in sql file exist and are updated (if applicable)
# Raises a custom RequiredTableNotUpdated if not, which will be caught by viz_pipline
# and invoke a retry
process_db.required_tables_updated(sql_path, sql_replace, reference_time, raise_if_false=True)

sql = open(sql_path, 'r').read().lower()

setup_db_table(target_table, reference_time, viz_db, process_db, sql_replace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ db_max_flows:
postprocess_sql:
- sql_file: mrf_gfs_10day_peak_flow_arrival_time
target_table: publish.mrf_gfs_10day_peak_flow_arrival_time
dependent_on_db_tables:
- publish.mrf_gfs_10day_high_water_arrival_time

services:
- mrf_gfs_10day_peak_flow_arrival_time_noaa
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ db_max_flows:
postprocess_sql:
- sql_file: srf_18hr_peak_flow_arrival_time
target_table: publish.srf_18hr_peak_flow_arrival_time
dependent_on_db_tables:
- publish.srf_18hr_high_water_arrival_time

services:
- srf_18hr_peak_flow_arrival_time_noaa
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ db_max_flows:
postprocess_sql:
- sql_file: srf_48hr_peak_flow_arrival_time_hi
target_table: publish.srf_48hr_peak_flow_arrival_time_hi
dependent_on_db_tables:
- publish.srf_48hr_high_water_arrival_time_hi

services:
- srf_48hr_peak_flow_arrival_time_hi_noaa
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ db_max_flows:
postprocess_sql:
- sql_file: srf_48hr_peak_flow_arrival_time_prvi
target_table: publish.srf_48hr_peak_flow_arrival_time_prvi
dependent_on_db_tables:
- publish.srf_48hr_high_water_arrival_time_prvi

services:
- srf_48hr_peak_flow_arrival_time_prvi_noaa
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ fim_configs: # (OPTIONAL) List of fim configurations to run
postprocess_sql: # (OPTIONAL) List of postprocess sql files to run in the postprocess sql lambda function
- sql_file: postprocess_sql_file_name # (REQUIRED) Name of the ingest postprocess sql file
target_table: publish.product # (REQUIRED) Name of the DB table where the fileset will be imported
dependent_on_db_tables: # (OPTIONAL) A list of other product database table(s) - i.e. publish schema tables - that the current product postprocessing is dependent on (will wait until this table has the current reference time).
- publish.dependent_on_db_tables # (REQUIRED) Name of the dependent DB table

product_summaries: # (OPTIONAL) List of dictionaries which provides the names of the summary sql files to run in the postprocess sql lambda function and their table output results
- sql_file: product_summary_sql_file_name # (REQUIRED) Name of the summary sql file ran in the postprocess sql lambda function
Expand Down
56 changes: 55 additions & 1 deletion Core/StepFunctions/viz_processing_pipeline.json.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@
"FunctionName": "${db_postprocess_sql_arn}"
},
"Retry": [
{
"ErrorEquals": [
"RequiredTableNotUpdated"
],
"BackoffRate": 1,
"IntervalSeconds": 30,
"MaxAttempts": 20,
"Comment": "Required table(s) not yet updated as expected"
},
{
"ErrorEquals": [
"Lambda.ServiceException",
Expand Down Expand Up @@ -150,6 +159,15 @@
"FunctionName": "${db_postprocess_sql_arn}"
},
"Retry": [
{
"ErrorEquals": [
"RequiredTableNotUpdated"
],
"BackoffRate": 1,
"IntervalSeconds": 30,
"MaxAttempts": 20,
"Comment": "Required table(s) not yet updated as expected"
},
{
"ErrorEquals": [
"Lambda.ServiceException",
Expand Down Expand Up @@ -194,6 +212,15 @@
}
},
"Retry": [
{
"ErrorEquals": [
"RequiredTableNotUpdated"
],
"BackoffRate": 1,
"IntervalSeconds": 30,
"MaxAttempts": 20,
"Comment": "Required table(s) not yet updated as expected"
},
{
"ErrorEquals": [
"Lambda.ServiceException",
Expand Down Expand Up @@ -350,7 +377,7 @@
"BackoffRate": 1,
"IntervalSeconds": 30,
"MaxAttempts": 20,
"Comment": "Required Table Not Ran Yet (postprocess_sql.dependent_on_db_tables)"
"Comment": "Required table(s) not yet updated as expected"
},
{
"ErrorEquals": [
Expand Down Expand Up @@ -470,6 +497,15 @@
}
},
"Retry": [
{
"ErrorEquals": [
"RequiredTableNotUpdated"
],
"BackoffRate": 1,
"IntervalSeconds": 30,
"MaxAttempts": 20,
"Comment": "Required table(s) not yet updated as expected"
},
{
"ErrorEquals": [
"Lambda.ServiceException",
Expand Down Expand Up @@ -581,6 +617,15 @@
}
},
"Retry": [
{
"ErrorEquals": [
"RequiredTableNotUpdated"
],
"BackoffRate": 1,
"IntervalSeconds": 30,
"MaxAttempts": 20,
"Comment": "Required table(s) not yet updated as expected"
},
{
"ErrorEquals": [
"Lambda.ServiceException",
Expand Down Expand Up @@ -652,6 +697,15 @@
}
},
"Retry": [
{
"ErrorEquals": [
"RequiredTableNotUpdated"
],
"BackoffRate": 1,
"IntervalSeconds": 30,
"MaxAttempts": 20,
"Comment": "Required table(s) not yet updated as expected"
},
{
"ErrorEquals": [
"Lambda.ServiceException",
Expand Down