diff --git a/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py b/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py index 7b49d101..6c0eb0f7 100644 --- a/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py +++ b/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py @@ -2,10 +2,15 @@ import boto3 import os import json +import re import urllib.parse import inspect from botocore.exceptions import ClientError + +class RequiredTableNotUpdated(Exception): + """ This is a custom exception to report back to the AWS Step Function that a required table does not exist or has not yet been updated with the current reference time. """ + ################################################################################################################################################### ################################################################################################################################################### class database: #TODO: Should we be creating a connection/engine upon initialization, or within each method like we are now? @@ -178,6 +183,119 @@ def cache_data(self, table, reference_time, retention_days=30): db_engine.execute(f'SELECT * INTO {new_archive_table} FROM publish.{table};') db_engine.dispose() print(f"---> Wrote cache data into {new_archive_table} and dropped corresponding table from {retention_days} days ago, if it existed.") + + ########################################### + def required_tables_updated(self, sql_path_or_str, sql_replace={}, reference_time=None, stop_on_first_issue=True, raise_if_false=False): + """ Determines if tables required by provided SQL path or string are updated as expected + + Args: + sql_path_or_str (str): Path to SQL file or raw SQL string + sql_replace (dict): Dictionary containing find/replace values for SQL, if applicable + reference_time (str): The reference_time that should be compared against for tables that contain a + reference_time column. If the table does not contain that column, it is + considered to be up to date + stop_on_first_issue (bool): If True, the first issue encountered will cause the script to terminate + either returning false or raising an exception if raise_if_false is also True. If False, every + error will be explored before returning (only useful if raise_if_false is True since the error + message will thus contain all relevant failures, rather than just the first.) + raise_if_false (bool): If True, a custom RequiredTableNotUpdated exception will be raised + if either a table does not exist, or if the reference_time column + exists its current value does not match the provided reference_time. The specific + details of the failure will be included in the exception message, which will only + be the first failure encountered unless stop_on_first_issue is False. + + Raises: + RequiredTableNotUpdated if raise_if_false is True + + Returns: + Bool. True if no issues encountered, False otherwise. + """ + issues_encountered = [] + # Determine if arg is file or raw SQL string + if os.path.exists(sql_path_or_str): + sql = open(sql_path_or_str, 'r').read() + else: + sql = sql_path_or_str + + for word, replacement in sql_replace.items(): + sql = re.sub(word, replacement, sql, flags=re.IGNORECASE).replace('utc', 'UTC') + + required_tables = set(re.findall('(?<=FROM |JOIN )\w+\.\w+', sql, flags=re.IGNORECASE)) + if not required_tables: + return True + + # This next 3 lines were added specifically to abort checking cache.max_flows_ana when creating + # cache.max_flows_ana_past_hour since cache.max_flow_ana will always be an hour behind at the + # time of creating the past_hour table. Rather than hard-code it, I've left it more generalized + # in case other similar cases come up. But this could ideally be removed once We figure out a + # new method for storing the past hour of max_flows_ana. + tables_that_invalidate_check = set(re.findall('(?<=INTO )\w+\.\w*past\w*', sql, flags=re.IGNORECASE)) + if tables_that_invalidate_check: + return True + + # Required tables exist and should be checked + with self.connection as connection: + cur = connection.cursor() + for table in required_tables: + if issues_encountered and stop_on_first_issue: + break + schemaname, tablename = table.lower().split('.') + sql = f''' + SELECT EXISTS ( + SELECT FROM + pg_tables + WHERE + schemaname = '{schemaname}' AND + tablename = '{tablename}' + ); + ''' + cur.execute(sql) + table_exists = cur.fetchone()[0] + + if not table_exists: + issues_encountered.append(f'Table {table} does not exist.') + continue + + # Table exists. + + if not reference_time or any(x in table for x in ['past', 'ahps']): + continue + + # Reference time provided. + + # Check if reference_time column exists and if its entry matches + sql = f''' + SELECT EXISTS ( + SELECT 1 + FROM information_schema.columns + WHERE table_schema='{schemaname}' AND table_name='{tablename}' AND column_name='reference_time' + ); + ''' + cur.execute(sql) + reftime_col_exists = cur.fetchone()[0] + if not reftime_col_exists: + continue + + # Column 'reference_time' exists + + # Check if it matches + sql = f"SELECT reference_time FROM {table} LIMIT 1" + cur.execute(sql) + reftime_result = cur.fetchone() + if not reftime_result: # table is empty + issues_encountered.append(f'Table {table} is empty.') + continue + + data_reftime = reftime_result[0].replace(" UTC", "") + if data_reftime != reference_time: # table reference time matches current reference time + issues_encountered.append(f'Table {table} has unexpected reftime. Expected {reference_time} but found {data_reftime}.') + continue + + if issues_encountered: + if raise_if_false: + raise RequiredTableNotUpdated(' '.join(issues_encountered)) + return False + return True ################################################################################################################################################### ################################################################################################################################################### diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/lambda_function.py b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/lambda_function.py index 5621909c..bde03926 100644 --- a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/lambda_function.py +++ b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/lambda_function.py @@ -2,9 +2,6 @@ import os from viz_classes import database -class RequiredTableNotUpdated(Exception): - """ This is a custom exception to report back to the AWS Step Function that a required table (dependent_on_table) has not yet been updated with the current reference time. """ - def lambda_handler(event, context): step = event['step'] folder = event['folder'] @@ -33,13 +30,6 @@ def lambda_handler(event, context): elif step == "products": folder = os.path.join(folder, event['args']['product']['configuration']) sql_file = event['args']['postprocess_sql']['sql_file'] - if event.get('args').get('postprocess_sql').get('dependent_on_db_tables'): - dependent_on_tables = event['args']['postprocess_sql']['dependent_on_db_tables'] - if len(dependent_on_tables) > 0: - print("Dependent DB Table Specified - Checking if complete.") - if check_required_tables(dependent_on_tables, reference_time, sql_replace) is False: - print("Reference Time not yet present in required table. Pausing and Retrying.") - raise RequiredTableNotUpdated # Summary elif step == 'summaries': folder = os.path.join(folder, event['args']['product']['product']) @@ -47,6 +37,12 @@ def lambda_handler(event, context): ### Run the Appropriate SQL File ### sql_path = f"{folder}/{sql_file}.sql" + + # Checks if all tables references in sql file exist and are updated (if applicable) + # Raises a custom RequiredTableNotUpdated if not, which will be caught by viz_pipline + # and invoke a retry + database(db_type="viz").required_tables_updated(sql_path, sql_replace, reference_time, raise_if_false=True) + run_sql(sql_path, sql_replace) return True @@ -100,20 +96,3 @@ def run_sql(sql_path_or_str, sql_replace=None): connection.commit() print(f"Finished executing the SQL statement above.") return result - -# Check if a specified reference time is present in a database table -def check_required_tables(dependent_on_tables, reference_time, sql_replace): - for table in dependent_on_tables: - sql = f"SELECT reference_time FROM {table} LIMIT 1" - for word, replacement in sql_replace.items(): - sql = re.sub(word, replacement, sql, flags=re.IGNORECASE).replace('utc', 'UTC') - try: - result = run_sql(sql) - except: # table doesn't exist - return False - if not result: # table is empty - return False - elif result[0].replace(" UTC", "") == reference_time: # table reference time matches current reference time - return True - else: # table reference time doesn't match current reference time - return False \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_fim_data_prep/lambda_function.py b/Core/LAMBDA/viz_functions/viz_fim_data_prep/lambda_function.py index 4f2007c3..d39c124a 100644 --- a/Core/LAMBDA/viz_functions/viz_fim_data_prep/lambda_function.py +++ b/Core/LAMBDA/viz_functions/viz_fim_data_prep/lambda_function.py @@ -53,6 +53,12 @@ def setup_huc_inundation(event): # Find the sql file, and replace any items in the dictionary sql_path = f'data_sql/{fim_config_sql}.sql' + + # Checks if all tables references in sql file exist and are updated (if applicable) + # Raises a custom RequiredTableNotUpdated if not, which will be caught by viz_pipline + # and invoke a retry + process_db.required_tables_updated(sql_path, sql_replace, reference_time, raise_if_false=True) + sql = open(sql_path, 'r').read().lower() setup_db_table(target_table, reference_time, viz_db, process_db, sql_replace) diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/medium_range_mem1/mrf_gfs_10day_peak_flow_arrival_time.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/medium_range_mem1/mrf_gfs_10day_peak_flow_arrival_time.yml index 536faaa9..b6b1913c 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/medium_range_mem1/mrf_gfs_10day_peak_flow_arrival_time.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/medium_range_mem1/mrf_gfs_10day_peak_flow_arrival_time.yml @@ -20,8 +20,6 @@ db_max_flows: postprocess_sql: - sql_file: mrf_gfs_10day_peak_flow_arrival_time target_table: publish.mrf_gfs_10day_peak_flow_arrival_time - dependent_on_db_tables: - - publish.mrf_gfs_10day_high_water_arrival_time services: - mrf_gfs_10day_peak_flow_arrival_time_noaa \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range/srf_18hr_peak_flow_arrival_time.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range/srf_18hr_peak_flow_arrival_time.yml index 6420d898..2d68dfad 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range/srf_18hr_peak_flow_arrival_time.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range/srf_18hr_peak_flow_arrival_time.yml @@ -20,8 +20,6 @@ db_max_flows: postprocess_sql: - sql_file: srf_18hr_peak_flow_arrival_time target_table: publish.srf_18hr_peak_flow_arrival_time - dependent_on_db_tables: - - publish.srf_18hr_high_water_arrival_time services: - srf_18hr_peak_flow_arrival_time_noaa \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range_hawaii/srf_48hr_peak_flow_arrival_time_hi.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range_hawaii/srf_48hr_peak_flow_arrival_time_hi.yml index 3a49ae74..765cd9a6 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range_hawaii/srf_48hr_peak_flow_arrival_time_hi.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range_hawaii/srf_48hr_peak_flow_arrival_time_hi.yml @@ -20,8 +20,6 @@ db_max_flows: postprocess_sql: - sql_file: srf_48hr_peak_flow_arrival_time_hi target_table: publish.srf_48hr_peak_flow_arrival_time_hi - dependent_on_db_tables: - - publish.srf_48hr_high_water_arrival_time_hi services: - srf_48hr_peak_flow_arrival_time_hi_noaa \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range_puertorico/srf_48hr_peak_flow_arrival_time_prvi.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range_puertorico/srf_48hr_peak_flow_arrival_time_prvi.yml index 4e588f81..4583908c 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range_puertorico/srf_48hr_peak_flow_arrival_time_prvi.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range_puertorico/srf_48hr_peak_flow_arrival_time_prvi.yml @@ -20,8 +20,6 @@ db_max_flows: postprocess_sql: - sql_file: srf_48hr_peak_flow_arrival_time_prvi target_table: publish.srf_48hr_peak_flow_arrival_time_prvi - dependent_on_db_tables: - - publish.srf_48hr_high_water_arrival_time_prvi services: - srf_48hr_peak_flow_arrival_time_prvi_noaa \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/template.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/template.yml index 462169cd..aab66110 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/template.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/template.yml @@ -55,8 +55,6 @@ fim_configs: # (OPTIONAL) List of fim configurations to run postprocess_sql: # (OPTIONAL) List of postprocess sql files to run in the postprocess sql lambda function - sql_file: postprocess_sql_file_name # (REQUIRED) Name of the ingest postprocess sql file target_table: publish.product # (REQUIRED) Name of the DB table where the fileset will be imported - dependent_on_db_tables: # (OPTIONAL) A list of other product database table(s) - i.e. publish schema tables - that the current product postprocessing is dependent on (will wait until this table has the current reference time). - - publish.dependent_on_db_tables # (REQUIRED) Name of the dependent DB table product_summaries: # (OPTIONAL) List of dictionaries which provides the names of the summary sql files to run in the postprocess sql lambda function and their table output results - sql_file: product_summary_sql_file_name # (REQUIRED) Name of the summary sql file ran in the postprocess sql lambda function diff --git a/Core/StepFunctions/viz_processing_pipeline.json.tftpl b/Core/StepFunctions/viz_processing_pipeline.json.tftpl index 8ea1584f..6ad1082c 100644 --- a/Core/StepFunctions/viz_processing_pipeline.json.tftpl +++ b/Core/StepFunctions/viz_processing_pipeline.json.tftpl @@ -73,6 +73,15 @@ "FunctionName": "${db_postprocess_sql_arn}" }, "Retry": [ + { + "ErrorEquals": [ + "RequiredTableNotUpdated" + ], + "BackoffRate": 1, + "IntervalSeconds": 30, + "MaxAttempts": 20, + "Comment": "Required table(s) not yet updated as expected" + }, { "ErrorEquals": [ "Lambda.ServiceException", @@ -150,6 +159,15 @@ "FunctionName": "${db_postprocess_sql_arn}" }, "Retry": [ + { + "ErrorEquals": [ + "RequiredTableNotUpdated" + ], + "BackoffRate": 1, + "IntervalSeconds": 30, + "MaxAttempts": 20, + "Comment": "Required table(s) not yet updated as expected" + }, { "ErrorEquals": [ "Lambda.ServiceException", @@ -194,6 +212,15 @@ } }, "Retry": [ + { + "ErrorEquals": [ + "RequiredTableNotUpdated" + ], + "BackoffRate": 1, + "IntervalSeconds": 30, + "MaxAttempts": 20, + "Comment": "Required table(s) not yet updated as expected" + }, { "ErrorEquals": [ "Lambda.ServiceException", @@ -350,7 +377,7 @@ "BackoffRate": 1, "IntervalSeconds": 30, "MaxAttempts": 20, - "Comment": "Required Table Not Ran Yet (postprocess_sql.dependent_on_db_tables)" + "Comment": "Required table(s) not yet updated as expected" }, { "ErrorEquals": [ @@ -470,6 +497,15 @@ } }, "Retry": [ + { + "ErrorEquals": [ + "RequiredTableNotUpdated" + ], + "BackoffRate": 1, + "IntervalSeconds": 30, + "MaxAttempts": 20, + "Comment": "Required table(s) not yet updated as expected" + }, { "ErrorEquals": [ "Lambda.ServiceException", @@ -581,6 +617,15 @@ } }, "Retry": [ + { + "ErrorEquals": [ + "RequiredTableNotUpdated" + ], + "BackoffRate": 1, + "IntervalSeconds": 30, + "MaxAttempts": 20, + "Comment": "Required table(s) not yet updated as expected" + }, { "ErrorEquals": [ "Lambda.ServiceException", @@ -652,6 +697,15 @@ } }, "Retry": [ + { + "ErrorEquals": [ + "RequiredTableNotUpdated" + ], + "BackoffRate": 1, + "IntervalSeconds": 30, + "MaxAttempts": 20, + "Comment": "Required table(s) not yet updated as expected" + }, { "ErrorEquals": [ "Lambda.ServiceException",