Skip to content

Commit

Permalink
Viz EC2 Services to Lambda - Part 1 - High Water Probability (#543)
Browse files Browse the repository at this point in the history
* Couple shared functions tweaks to work with high water probability

* Changing viz_max_values lambda function viz_python_preprocessing and associated changes

* High water probability product configuration and sql files

* Remove high water probability services from Viz EC2

* Adding a 'product' key to python_preprocessing configurations for better conditional logic.

* increasing ephemeral strorage on python preprocess lambda to 6.5GB
  • Loading branch information
TylerSchrag-NOAA authored Sep 28, 2023
1 parent 27e31da commit 206e616
Show file tree
Hide file tree
Showing 39 changed files with 433 additions and 444 deletions.
4 changes: 2 additions & 2 deletions Core/EC2/viz/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ variable "fim_output_bucket" {
type = string
}

variable "nwm_max_values_data_bucket" {
variable "python_preprocessing_bucket" {
description = "S3 bucket for NWM max flows data"
type = string
}
Expand Down Expand Up @@ -206,7 +206,7 @@ data "cloudinit_config" "pipeline_setup" {
NWM_DATA_BUCKET = var.nwm_data_bucket
FIM_DATA_BUCKET = var.fim_data_bucket
FIM_OUTPUT_BUCKET = var.fim_output_bucket
NWM_MAX_VALUES_DATA_BUCKET = var.nwm_max_values_data_bucket
PYTHON_PREPROCESSING_BUCKET = var.python_preprocessing_bucket
RNR_DATA_BUCKET = var.rnr_data_bucket
DEPLOYMENT_DATA_BUCKET = var.deployment_data_bucket
DEPLOYMENT_DATA_OBJECT = aws_s3_object.setup_upload.key
Expand Down
4 changes: 2 additions & 2 deletions Core/EC2/viz/scripts/viz_ec2_setup.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ $env:PRIMARY_SERVER = "server"
$env:VIZ_ENVIRONMENT = $VIZ_ENVIRONMENT
$env:VIZ_USER = $PIPELINE_USER
$env:DEPLOYMENT_DATA_BUCKET = $DEPLOYMENT_DATA_BUCKET
$env:NWM_MAX_VALUES_DATA_BUCKET = $NWM_MAX_VALUES_DATA_BUCKET
$env:PYTHON_PREPROCESSING_BUCKET = $PYTHON_PREPROCESSING_BUCKET
$env:RNR_DATA_BUCKET = $RNR_DATA_BUCKET
$env:NWM_DATA_BUCKET = $NWM_DATA_BUCKET
$env:NWM_DATAFLOW_VERSION = $NWM_DATAFLOW_VERSION
Expand All @@ -146,7 +146,7 @@ $env:EGIS_DB_PASSWORD = $EGIS_DB_PASSWORD
[Environment]::SetEnvironmentVariable("VIZ_ENVIRONMENT", $env:VIZ_ENVIRONMENT, "2")
[Environment]::SetEnvironmentVariable("VIZ_USER", $env:VIZ_USER, "2")
[Environment]::SetEnvironmentVariable("DEPLOYMENT_DATA_BUCKET", $env:DEPLOYMENT_DATA_BUCKET, "2")
[Environment]::SetEnvironmentVariable("NWM_MAX_VALUES_DATA_BUCKET", $env:NWM_MAX_VALUES_DATA_BUCKET, "2")
[Environment]::SetEnvironmentVariable("PYTHON_PREPROCESSING_BUCKET", $env:PYTHON_PREPROCESSING_BUCKET, "2")
[Environment]::SetEnvironmentVariable("RNR_DATA_BUCKET", $env:RNR_DATA_BUCKET, "2")
[Environment]::SetEnvironmentVariable("NWM_DATA_BUCKET", $env:NWM_DATA_BUCKET, "2")
[Environment]::SetEnvironmentVariable("NWM_DATAFLOW_VERSION", $env:NWM_DATAFLOW_VERSION, "2")
Expand Down
2 changes: 1 addition & 1 deletion Core/EC2/viz/templates/prc_setup.ps1.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ $NWM_DATA_BUCKET = '${NWM_DATA_BUCKET}'
$NWM_DATAFLOW_VERSION = '${NWM_DATAFLOW_VERSION}'
$FIM_DATA_BUCKET = '${FIM_DATA_BUCKET}'
$FIM_OUTPUT_BUCKET = '${FIM_OUTPUT_BUCKET}'
$NWM_MAX_VALUES_DATA_BUCKET = '${NWM_MAX_VALUES_DATA_BUCKET}'
$PYTHON_PREPROCESSING_BUCKET = '${PYTHON_PREPROCESSING_BUCKET}'
$RNR_DATA_BUCKET = '${RNR_DATA_BUCKET}'
$WINDOWS_SERVICE_STATUS = '${WINDOWS_SERVICE_STATUS}'
$WINDOWS_SERVICE_STARTUP = '${WINDOWS_SERVICE_STARTUP}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ def move_data_to_another_db(origin_db, dest_db, origin_table, dest_table, stage=
dest_engine.execute(f'DROP TABLE IF EXISTS {dest_final_table};') # Drop the published table if it exists
dest_engine.execute(f'ALTER TABLE {dest_table} RENAME TO {dest_final_table_name};') # Rename the staged table

def check_if_file_exists(bucket, file, download=False):
def check_if_file_exists(bucket, file, download=False, download_subfolder=None):
import requests
from viz_classes import s3_file
import xarray as xr
Expand All @@ -548,7 +548,13 @@ def check_if_file_exists(bucket, file, download=False):
file_exists = False

tempdir = tempfile.mkdtemp()
download_path = os.path.join(tempdir, os.path.basename(file))
if download_subfolder:
download_folder=os.path.join(tempdir, download_subfolder)
if not os.path.exists(download_folder):
os.mkdir(download_folder)
download_path = os.path.join(download_folder, os.path.basename(file))
else:
download_path = os.path.join(tempdir, os.path.basename(file))
https_file = None

if "https" in file:
Expand Down Expand Up @@ -615,7 +621,7 @@ def check_if_file_exists(bucket, file, download=False):

return file

def parse_range_token_value(reference_date_file, range_token):
def parse_range_token_value(reference_date_file, range_token, existing_list = []):
range_min = 0
range_step = 1
number_format = '%01d'
Expand All @@ -642,10 +648,14 @@ def parse_range_token_value(reference_date_file, range_token):
raise ValueError("Ranges must be integers")

new_input_files = []
for i in range(range_min, range_max, range_step):
range_value = number_format % i
new_input_file = reference_date_file.replace(f"{{{{range:{range_token}}}}}", range_value)
new_input_files.append(new_input_file)
if existing_list == []:
existing_list = [reference_date_file]

for item in existing_list:
for i in range(range_min, range_max, range_step):
range_value = number_format % i
new_input_file = item.replace(f"{{{{range:{range_token}}}}}", range_value)
new_input_files.append(new_input_file)

return new_input_files

Expand Down Expand Up @@ -707,8 +717,9 @@ def get_formatted_files(file_pattern, token_dict, reference_date):
reference_date_file = parse_datetime_token_value(reference_date_file, reference_date, datetime_token)

if token_dict['range']:
for range_token in token_dict['range']:
reference_date_files = parse_range_token_value(reference_date_file, range_token)
unique_range_tokens = list(set(token_dict['range']))
for range_token in unique_range_tokens:
reference_date_files = parse_range_token_value(reference_date_file, range_token, existing_list=reference_date_files)
else:
reference_date_files = [reference_date_file]

Expand Down
40 changes: 20 additions & 20 deletions Core/LAMBDA/viz_functions/image_based/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ variable "deployment_bucket" {
type = string
}

variable "max_values_bucket" {
variable "python_preprocessing_bucket" {
type = string
}

Expand Down Expand Up @@ -501,25 +501,25 @@ data "archive_file" "schism_processing_zip" {

source {
content = templatefile("${path.module}/viz_schism_fim_processing/serverless.yml.tmpl", {
SERVICE_NAME = replace(local.viz_schism_fim_processing_lambda_name, "_", "-")
LAMBDA_TAGS = jsonencode(merge(var.default_tags, { Name = local.viz_schism_fim_processing_lambda_name }))
DEPLOYMENT_BUCKET = var.deployment_bucket
AWS_DEFAULT_REGION = var.region
LAMBDA_NAME = local.viz_schism_fim_processing_lambda_name
AWS_ACCOUNT_ID = var.account_id
IMAGE_REPO_NAME = aws_ecr_repository.viz_schism_fim_processing_image.name
IMAGE_TAG = var.ecr_repository_image_tag
LAMBDA_ROLE_ARN = var.lambda_role
MAX_VALS_BUCKET = var.max_values_bucket
INPUTS_BUCKET = var.deployment_bucket
INPUTS_PREFIX = "schism_fim"
VIZ_DB_DATABASE = var.viz_db_name
VIZ_DB_HOST = var.viz_db_host
VIZ_DB_PASSWORD = jsondecode(var.viz_db_user_secret_string)["password"]
VIZ_DB_USERNAME = jsondecode(var.viz_db_user_secret_string)["username"]
SECURITY_GROUP_1 = var.hand_fim_processing_sgs[0]
SUBNET_1 = var.hand_fim_processing_subnets[0]
SUBNET_2 = var.hand_fim_processing_subnets[1]
SERVICE_NAME = replace(local.viz_schism_fim_processing_lambda_name, "_", "-")
LAMBDA_TAGS = jsonencode(merge(var.default_tags, { Name = local.viz_schism_fim_processing_lambda_name }))
DEPLOYMENT_BUCKET = var.deployment_bucket
AWS_DEFAULT_REGION = var.region
LAMBDA_NAME = local.viz_schism_fim_processing_lambda_name
AWS_ACCOUNT_ID = var.account_id
IMAGE_REPO_NAME = aws_ecr_repository.viz_schism_fim_processing_image.name
IMAGE_TAG = var.ecr_repository_image_tag
LAMBDA_ROLE_ARN = var.lambda_role
PYTHON_PREPROCESSING_BUCKET = var.python_preprocessing_bucket
INPUTS_BUCKET = var.deployment_bucket
INPUTS_PREFIX = "schism_fim"
VIZ_DB_DATABASE = var.viz_db_name
VIZ_DB_HOST = var.viz_db_host
VIZ_DB_PASSWORD = jsondecode(var.viz_db_user_secret_string)["password"]
VIZ_DB_USERNAME = jsondecode(var.viz_db_user_secret_string)["username"]
SECURITY_GROUP_1 = var.hand_fim_processing_sgs[0]
SUBNET_1 = var.hand_fim_processing_subnets[0]
SUBNET_2 = var.hand_fim_processing_subnets[1]
})
filename = "serverless.yml"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ functions:
timeout: 900
ephemeralStorageSize: 2048
environment:
MAX_VALS_BUCKET: ${MAX_VALS_BUCKET}
PYTHON_PREPROCESSING_BUCKET: ${PYTHON_PREPROCESSING_BUCKET}
INPUTS_BUCKET: ${INPUTS_BUCKET}
INPUTS_PREFIX: ${INPUTS_PREFIX}
VIZ_DB_DATABASE: ${VIZ_DB_DATABASE}
Expand Down
82 changes: 43 additions & 39 deletions Core/LAMBDA/viz_functions/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ variable "deployment_bucket" {
type = string
}

variable "max_values_bucket" {
variable "python_preprocessing_bucket" {
description = "S3 bucket where the outputted max flows will live."
type = string
}
Expand Down Expand Up @@ -236,7 +236,7 @@ resource "aws_lambda_function" "viz_wrds_api_handler" {
environment {
variables = {
DATASERVICES_HOST = var.dataservices_host
MAX_VALS_BUCKET = var.max_values_bucket
PYTHON_PREPROCESSING_BUCKET = var.python_preprocessing_bucket
PROCESSED_OUTPUT_PREFIX = "max_stage/ahps"
INITIALIZE_PIPELINE_FUNCTION = aws_lambda_function.viz_initialize_pipeline.arn
}
Expand Down Expand Up @@ -371,30 +371,30 @@ resource "aws_cloudwatch_metric_alarm" "egis_healthcheck_errors" {
}
}

#########################
## Max Values Function ##
#########################
data "archive_file" "max_values_zip" {
###################################
## Python Preprocessing Function ##
###################################
data "archive_file" "python_preprocessing_zip" {
type = "zip"

source_file = "${path.module}/viz_max_values/lambda_function.py"
source_file = "${path.module}/viz_python_preprocessing/lambda_function.py"

output_path = "${path.module}/temp/viz_max_values_${var.environment}_${var.region}.zip"
output_path = "${path.module}/temp/viz_python_preprocessing_${var.environment}_${var.region}.zip"
}

resource "aws_s3_object" "max_values_zip_upload" {
resource "aws_s3_object" "python_preprocessing_zip_upload" {
bucket = var.deployment_bucket
key = "terraform_artifacts/${path.module}/viz_max_values.zip"
source = data.archive_file.max_values_zip.output_path
source_hash = filemd5(data.archive_file.max_values_zip.output_path)
key = "terraform_artifacts/${path.module}/viz_python_preprocessing.zip"
source = data.archive_file.python_preprocessing_zip.output_path
source_hash = filemd5(data.archive_file.python_preprocessing_zip.output_path)
}

resource "aws_lambda_function" "viz_max_values" {
resource "aws_lambda_function" "viz_python_preprocessing" {
function_name = "hv-vpp-${var.environment}-viz-max-values"
description = "Lambda function to create max streamflow files for NWM data"
memory_size = 2048
ephemeral_storage {
size = 1024
size = 6656
}
timeout = 900

Expand All @@ -407,12 +407,16 @@ resource "aws_lambda_function" "viz_max_values" {
variables = {
CACHE_DAYS = 1
DATA_BUCKET_UPLOAD = var.fim_output_bucket
VIZ_DB_DATABASE = var.viz_db_name
VIZ_DB_HOST = var.viz_db_host
VIZ_DB_USERNAME = jsondecode(var.viz_db_user_secret_string)["username"]
VIZ_DB_PASSWORD = jsondecode(var.viz_db_user_secret_string)["password"]
NWM_DATAFLOW_VERSION = var.nwm_dataflow_version
}
}
s3_bucket = aws_s3_object.max_values_zip_upload.bucket
s3_key = aws_s3_object.max_values_zip_upload.key
source_code_hash = filebase64sha256(data.archive_file.max_values_zip.output_path)
s3_bucket = aws_s3_object.python_preprocessing_zip_upload.bucket
s3_key = aws_s3_object.python_preprocessing_zip_upload.key
source_code_hash = filebase64sha256(data.archive_file.python_preprocessing_zip.output_path)

runtime = "python3.9"
handler = "lambda_function.lambda_handler"
Expand Down Expand Up @@ -460,18 +464,18 @@ resource "aws_lambda_function" "viz_initialize_pipeline" {
}
environment {
variables = {
STEP_FUNCTION_ARN = var.viz_pipeline_step_function_arn
DATA_BUCKET_UPLOAD = var.fim_output_bucket
MAX_VALS_DATA_BUCKET = var.max_values_bucket
RNR_DATA_BUCKET = var.rnr_data_bucket
RASTER_OUTPUT_BUCKET = var.fim_output_bucket
RASTER_OUTPUT_PREFIX = local.raster_output_prefix
INGEST_FLOW_THRESHOLD = local.ingest_flow_threshold
VIZ_DB_DATABASE = var.viz_db_name
VIZ_DB_HOST = var.viz_db_host
VIZ_DB_USERNAME = jsondecode(var.viz_db_user_secret_string)["username"]
VIZ_DB_PASSWORD = jsondecode(var.viz_db_user_secret_string)["password"]
NWM_DATAFLOW_VERSION = var.nwm_dataflow_version
STEP_FUNCTION_ARN = var.viz_pipeline_step_function_arn
DATA_BUCKET_UPLOAD = var.fim_output_bucket
PYTHON_PREPROCESSING_BUCKET = var.python_preprocessing_bucket
RNR_DATA_BUCKET = var.rnr_data_bucket
RASTER_OUTPUT_BUCKET = var.fim_output_bucket
RASTER_OUTPUT_PREFIX = local.raster_output_prefix
INGEST_FLOW_THRESHOLD = local.ingest_flow_threshold
VIZ_DB_DATABASE = var.viz_db_name
VIZ_DB_HOST = var.viz_db_host
VIZ_DB_USERNAME = jsondecode(var.viz_db_user_secret_string)["username"]
VIZ_DB_PASSWORD = jsondecode(var.viz_db_user_secret_string)["password"]
NWM_DATAFLOW_VERSION = var.nwm_dataflow_version
}
}
s3_bucket = aws_s3_object.initialize_pipeline_zip_upload.bucket
Expand Down Expand Up @@ -873,7 +877,7 @@ resource "aws_lambda_function" "viz_publish_service" {
GIS_PASSWORD = var.egis_portal_password
GIS_HOST = local.egis_host
GIS_USERNAME = "hydrovis.proc"
PUBLISH_FLAG_BUCKET = var.max_values_bucket
PUBLISH_FLAG_BUCKET = var.python_preprocessing_bucket
S3_BUCKET = var.viz_authoritative_bucket
SD_S3_PATH = "viz_sd_files/"
SERVICE_TAG = local.service_suffix
Expand Down Expand Up @@ -912,13 +916,13 @@ resource "aws_lambda_function_event_invoke_config" "viz_publish_service_destinat
module "image-based-lambdas" {
source = "./image_based"

environment = var.environment
account_id = var.account_id
region = var.region
deployment_bucket = var.deployment_bucket
max_values_bucket = var.max_values_bucket
lambda_role = var.lambda_role
hand_fim_processing_sgs = var.db_lambda_security_groups
environment = var.environment
account_id = var.account_id
region = var.region
deployment_bucket = var.deployment_bucket
python_preprocessing_bucket = var.python_preprocessing_bucket
lambda_role = var.lambda_role
hand_fim_processing_sgs = var.db_lambda_security_groups
hand_fim_processing_subnets = var.db_lambda_subnets
ecr_repository_image_tag = local.ecr_repository_image_tag
fim_version = var.fim_version
Expand All @@ -936,8 +940,8 @@ module "image-based-lambdas" {
########################################################################################################################################
########################################################################################################################################

output "max_values" {
value = aws_lambda_function.viz_max_values
output "python_preprocessing" {
value = aws_lambda_function.viz_python_preprocessing
}

output "initialize_pipeline" {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
DROP TABLE IF EXISTS publish.mrf_gfs_5day_max_high_water_probability;
SELECT
channels.feature_id,
channels.feature_id::TEXT AS feature_id_str,
channels.name,
channels.strm_order,
channels.huc6,
channels.state,
hwp.nwm_vers,
hwp.reference_time,
hwp.hours_3_to_24,
hwp.hours_27_to_48,
hwp.hours_51_to_72,
hwp.hours_75_to_120,
hwp.hours_3_to_120,
hwp.high_water_threshold,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
channels.geom
INTO publish.mrf_gfs_5day_max_high_water_probability
FROM ingest.mrf_gfs_5day_max_high_water_probability as hwp
JOIN derived.channels_conus channels ON hwp.feature_id = channels.feature_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
DROP TABLE IF EXISTS publish.srf_12hr_max_high_water_probability;
SELECT
channels.feature_id,
channels.feature_id::TEXT AS feature_id_str,
channels.name,
channels.strm_order,
channels.huc6,
channels.state,
hwp.nwm_vers,
hwp.reference_time,
hwp.srf_prob,
hwp.high_water_threshold,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
channels.geom
INTO publish.srf_12hr_max_high_water_probability
FROM ingest.srf_12hr_max_high_water_probability as hwp
JOIN derived.channels_conus channels ON hwp.feature_id = channels.feature_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- HUC8 Hotpsot Layer
DROP TABLE IF EXISTS publish.mrf_gfs_5day_max_high_water_probability_hucs;
SELECT
hucs.huc8,
TO_CHAR(hucs.huc8, 'fm00000000') AS huc8_str,
hucs.total_nwm_features,
count(hwp.feature_id)::double precision / hucs.total_nwm_features::double precision AS nwm_features_flooded_percent,
avg(hours_3_to_120) as avg_prob,
to_char('1900-01-01 00:00:00'::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS reference_time,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
hucs.geom
INTO publish.mrf_gfs_5day_max_high_water_probability_hucs
FROM derived.huc8s_conus AS hucs
JOIN derived.featureid_huc_crosswalk AS crosswalk ON hucs.huc8 = crosswalk.huc8
JOIN publish.mrf_gfs_5day_max_high_water_probability AS hwp ON crosswalk.feature_id = hwp.feature_id
GROUP BY hucs.huc8, total_nwm_features, hucs.geom
Loading

0 comments on commit 206e616

Please sign in to comment.