From 1ac1c74f96c1f3bdcc168451aecc0982877bf7cd Mon Sep 17 00:00:00 2001 From: Corey Krewson Date: Fri, 19 May 2023 11:02:54 -0500 Subject: [PATCH 1/6] updated script to run some python scripts as the arcgis user these python scripts were failing due to access issues to the fileshare --- Core/EC2/viz/scripts/viz_ec2_setup.ps1 | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/Core/EC2/viz/scripts/viz_ec2_setup.ps1 b/Core/EC2/viz/scripts/viz_ec2_setup.ps1 index 057dec28..874d9498 100644 --- a/Core/EC2/viz/scripts/viz_ec2_setup.ps1 +++ b/Core/EC2/viz/scripts/viz_ec2_setup.ps1 @@ -253,9 +253,15 @@ LogWrite "-->TRANFERRING PRISTINE DATA" $s3_pristine = "s3://" + $DEPLOYMENT_DATA_BUCKET + "/" + $DEPLOY_FILES_PREFIX + "pristine_data/" aws s3 cp $s3_pristine $PRISTINE_ROOT --recursive +Install-Module -Name Invoke-CommandAs -force +$ec2host = hostname +$strScriptUser = "$ec2host\$PIPELINE_USER" +$PSS = ConvertTo-SecureString $PIPELINE_USER_ACCOUNT_PASSWORD -AsPlainText -Force +$cred = new-object system.management.automation.PSCredential $strScriptUser,$PSS + LogWrite "CREATING CONNECTION FILES FOR $FIM_DATA_BUCKET" -Set-Location -Path $AWS_SERVICE_REPO -& "C:\Program Files\ArcGIS\Pro\bin\Python\envs\viz\python.exe" "aws_loosa\deploy\create_s3_connection_files.py" +$python_file = "$AWS_SERVICE_REPO\aws_loosa\deploy\create_s3_connection_files.py" +Invoke-CommandAs -ScriptBlock { param($python_file) & "C:\Program Files\ArcGIS\Pro\bin\Python\envs\viz\python.exe" $python_file } -ArgumentList $python_file -AsUser $cred LogWrite "UPDATING PYTHON PERMISSIONS FOR $PIPELINE_USER" $ACL = Get-ACL -Path "C:\Program Files\ArcGIS\Pro\bin\Python" @@ -270,8 +276,8 @@ $ACL.SetAccessRule($AccessRule) $ACL | Set-Acl -Path "D:\" LogWrite "ADDING $PUBLISHED_ROOT TO $EGIS_HOST" -Set-Location -Path $AWS_SERVICE_REPO -& "C:\Program Files\ArcGIS\Pro\bin\Python\envs\viz\python.exe" "aws_loosa\deploy\update_data_stores_and_sd_files.py" +$python_file = "$AWS_SERVICE_REPO\aws_loosa\deploy\update_data_stores_and_sd_files.py" +Invoke-CommandAs -ScriptBlock { param($python_file) & "C:\Program Files\ArcGIS\Pro\bin\Python\envs\viz\python.exe" $python_file } -ArgumentList $python_file -AsUser $cred LogWrite "DELETING PUBLISHED FLAGS IF THEY EXIST" $EXISTING_PUBLISHED_FLAGS = aws s3 ls $FLAGS_ROOT From f6aada1491e7dc18357cf4579c54419257252b9f Mon Sep 17 00:00:00 2001 From: Corey Krewson Date: Fri, 19 May 2023 12:41:39 -0500 Subject: [PATCH 2/6] fixed peak flow joins to new names --- .../mrf_gfs_10day_peak_flow_arrival_time.sql | 2 +- .../mrf_gfs_high_water_arrival_time.sql | 40 ----------- .../mrf_gfs_max_high_flow_magnitude.sql | 71 ------------------- .../mrf_gfs_peak_flow_arrival_time.sql | 36 ---------- .../srf_18hr_peak_flow_arrival_time.sql | 2 +- .../srf_48hr_peak_flow_arrival_time_hi.sql | 2 +- .../srf_48hr_peak_flow_arrival_time_prvi.sql | 2 +- 7 files changed, 4 insertions(+), 151 deletions(-) delete mode 100644 Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_high_water_arrival_time.sql delete mode 100644 Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_max_high_flow_magnitude.sql delete mode 100644 Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_peak_flow_arrival_time.sql diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_10day_peak_flow_arrival_time.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_10day_peak_flow_arrival_time.sql index 0810f7e2..e3471fd5 100644 --- a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_10day_peak_flow_arrival_time.sql +++ b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_10day_peak_flow_arrival_time.sql @@ -30,7 +30,7 @@ JOIN derived.channels_conus AS channels ON forecasts.feature_id = channels.featu JOIN derived.recurrence_flows_conus AS rf ON forecasts.feature_id = rf.feature_id -- Join in high water arrival time for return time (the yaml config file ensures that arrival time finishes first for this, but we'll join on reference_time as well to ensure) -JOIN publish.mrf_high_water_arrival_time AS arrival_time ON forecasts.feature_id = arrival_time.feature_id and forecasts.reference_time = arrival_time.reference_time +JOIN publish.mrf_gfs_10day_high_water_arrival_time AS arrival_time ON forecasts.feature_id = arrival_time.feature_id and forecasts.reference_time = arrival_time.reference_time WHERE round((forecasts.streamflow*35.315)::numeric, 2) >= rf.high_water_threshold GROUP BY forecasts.feature_id, forecasts.reference_time, forecasts.nwm_vers, forecasts.streamflow, channels.name, channels.strm_order, channels.huc6, channels.state, rf.high_water_threshold, max_flows.maxflow_10day_cfs, arrival_time.t_normal, channels.geom; \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_high_water_arrival_time.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_high_water_arrival_time.sql deleted file mode 100644 index 9a5ca39a..00000000 --- a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_high_water_arrival_time.sql +++ /dev/null @@ -1,40 +0,0 @@ -DROP TABLE IF EXISTS publish.mrf_gfs_high_water_arrival_time; -WITH arrival_time AS ( - SELECT forecasts.feature_id, - min(forecasts.forecast_hour) AS t_high_water_threshold, - forecasts.nwm_vers, - forecasts.reference_time, - CASE WHEN max(forecasts.forecast_hour) >= 240 THEN '> 10 days'::text - ELSE (max(forecasts.forecast_hour)+3)::text - END AS t_normal, - CASE - WHEN max(forecasts.forecast_hour) >= 240 THEN 'Outside MRF Forecast Window'::text - ELSE ((max(forecasts.forecast_hour)+3) - min(forecasts.forecast_hour))::text - END AS duration, - thresholds.high_water_threshold AS high_water_threshold, - round((max(forecasts.streamflow) * 35.315::double precision)::numeric, 2) AS max_flow, - to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time - FROM ingest.nwm_channel_rt_mrf_gfs AS forecasts - JOIN derived.recurrence_flows_conus thresholds ON forecasts.feature_id = thresholds.feature_id - WHERE thresholds.high_water_threshold > 0::double precision AND (forecasts.streamflow * 35.315::double precision) >= thresholds.high_water_threshold - GROUP BY forecasts.feature_id, forecasts.reference_time, forecasts.nwm_vers, thresholds.high_water_threshold - ) - -SELECT channels.feature_id, -channels.feature_id::TEXT AS feature_id_str, -channels.name, -channels.strm_order, -channels.huc6, -channels.state, -arrival_time.nwm_vers, -arrival_time.reference_time, -arrival_time.t_high_water_threshold, -arrival_time.t_normal, -arrival_time.duration, -arrival_time.high_water_threshold, -arrival_time.max_flow, -arrival_time.update_time, -channels.geom -INTO publish.mrf_gfs_high_water_arrival_time -FROM derived.channels_conus channels - JOIN arrival_time ON channels.feature_id = arrival_time.feature_id; \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_max_high_flow_magnitude.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_max_high_flow_magnitude.sql deleted file mode 100644 index 9c696c83..00000000 --- a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_max_high_flow_magnitude.sql +++ /dev/null @@ -1,71 +0,0 @@ -DROP TABLE IF EXISTS publish.mrf_gfs_max_high_flow_magnitude; -WITH high_flow_mag AS ( - SELECT maxflows.feature_id, - maxflows.maxflow_3day_cfs, - maxflows.maxflow_5day_cfs, - maxflows.maxflow_10day_cfs, - maxflows.nwm_vers, - maxflows.reference_time, - CASE - WHEN maxflows.maxflow_3day_cfs >= thresholds.rf_50_0_17c THEN '2'::text - WHEN maxflows.maxflow_3day_cfs >= thresholds.rf_25_0_17c THEN '4'::text - WHEN maxflows.maxflow_3day_cfs >= thresholds.rf_10_0_17c THEN '10'::text - WHEN maxflows.maxflow_3day_cfs >= thresholds.rf_5_0_17c THEN '20'::text - WHEN maxflows.maxflow_3day_cfs >= thresholds.rf_2_0_17c THEN '50'::text - WHEN maxflows.maxflow_3day_cfs >= thresholds.high_water_threshold THEN '>50'::text - ELSE NULL::text - END AS recur_cat_3day, - CASE - WHEN maxflows.maxflow_5day_cfs >= thresholds.rf_50_0_17c THEN '2'::text - WHEN maxflows.maxflow_5day_cfs >= thresholds.rf_25_0_17c THEN '4'::text - WHEN maxflows.maxflow_5day_cfs >= thresholds.rf_10_0_17c THEN '10'::text - WHEN maxflows.maxflow_5day_cfs >= thresholds.rf_5_0_17c THEN '20'::text - WHEN maxflows.maxflow_5day_cfs >= thresholds.rf_2_0_17c THEN '50'::text - WHEN maxflows.maxflow_5day_cfs >= thresholds.high_water_threshold THEN '>50'::text - ELSE NULL::text - END AS recur_cat_5day, - CASE - WHEN maxflows.maxflow_10day_cfs >= thresholds.rf_50_0_17c THEN '2'::text - WHEN maxflows.maxflow_10day_cfs >= thresholds.rf_25_0_17c THEN '4'::text - WHEN maxflows.maxflow_10day_cfs >= thresholds.rf_10_0_17c THEN '10'::text - WHEN maxflows.maxflow_10day_cfs >= thresholds.rf_5_0_17c THEN '20'::text - WHEN maxflows.maxflow_10day_cfs >= thresholds.rf_2_0_17c THEN '50'::text - WHEN maxflows.maxflow_10day_cfs >= thresholds.high_water_threshold THEN '>50'::text - ELSE NULL::text - END AS recur_cat_10day, - thresholds.high_water_threshold AS high_water_threshold, - thresholds.rf_2_0_17c AS flow_2yr, - thresholds.rf_5_0_17c AS flow_5yr, - thresholds.rf_10_0_17c AS flow_10yr, - thresholds.rf_25_0_17c AS flow_25yr, - thresholds.rf_50_0_17c AS flow_50yr - FROM cache.mrf_gfs_max_flows maxflows - JOIN derived.recurrence_flows_conus thresholds ON maxflows.feature_id = thresholds.feature_id - WHERE (thresholds.high_water_threshold > 0::double precision) AND maxflows.maxflow_10day_cfs >= thresholds.high_water_threshold - ) - -SELECT channels.feature_id, - channels.feature_id::TEXT AS feature_id_str, - channels.strm_order, - channels.name, - channels.huc6, - channels.state, - high_flow_mag.nwm_vers, - high_flow_mag.reference_time, - high_flow_mag.maxflow_3day_cfs, - high_flow_mag.maxflow_5day_cfs, - high_flow_mag.maxflow_10day_cfs, - high_flow_mag.recur_cat_3day, - high_flow_mag.recur_cat_5day, - high_flow_mag.recur_cat_10day, - high_flow_mag.high_water_threshold, - high_flow_mag.flow_2yr, - high_flow_mag.flow_5yr, - high_flow_mag.flow_10yr, - high_flow_mag.flow_25yr, - high_flow_mag.flow_50yr, - to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time, - channels.geom -INTO publish.mrf_gfs_max_high_flow_magnitude -FROM derived.channels_conus channels - JOIN high_flow_mag ON channels.feature_id = high_flow_mag.feature_id; \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_peak_flow_arrival_time.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_peak_flow_arrival_time.sql deleted file mode 100644 index f21d1f8a..00000000 --- a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/medium_range_mem1/mrf_gfs_peak_flow_arrival_time.sql +++ /dev/null @@ -1,36 +0,0 @@ -DROP TABLE IF EXISTS publish.mrf_gfs_peak_flow_arrival_time; - -SELECT - forecasts.feature_id, - forecasts.feature_id::TEXT AS feature_id_str, - channels.name, - (channels.strm_order)::integer, - min(forecasts.forecast_hour) AS peak_flow_arrival_hour, - channels.huc6, - channels.state, - forecasts.nwm_vers, - forecasts.reference_time, - max_flows.maxflow_10day_cfs AS max_flow_cfs, - rf.high_water_threshold, - arrival_time.t_normal AS below_bank_return_time, - to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time, - channels.geom - -INTO publish.mrf_gfs_peak_flow_arrival_time -FROM ingest.nwm_channel_rt_mrf_gfs AS forecasts - --- Join in max flows on max streamflow to only get peak flows -JOIN cache.mrf_gfs_max_flows AS max_flows - ON forecasts.feature_id = max_flows.feature_id AND round((forecasts.streamflow*35.315)::numeric, 2) = max_flows.maxflow_10day_cfs - --- Join in channels data to get reach metadata and geometry -JOIN derived.channels_conus AS channels ON forecasts.feature_id = channels.feature_id - --- Join in recurrence flows to get high water threshold -JOIN derived.recurrence_flows_conus AS rf ON forecasts.feature_id = rf.feature_id - --- Join in high water arrival time for return time (the yaml config file ensures that arrival time finishes first for this, but we'll join on reference_time as well to ensure) -JOIN publish.mrf_gfs_high_water_arrival_time AS arrival_time ON forecasts.feature_id = arrival_time.feature_id AND forecasts.reference_time = arrival_time.reference_time - -WHERE round((forecasts.streamflow*35.315)::numeric, 2) >= rf.high_water_threshold -GROUP BY forecasts.feature_id, forecasts.reference_time, forecasts.nwm_vers, forecasts.streamflow, channels.name, channels.strm_order, channels.huc6, rf.high_water_threshold, max_flows.maxflow_10day_cfs, arrival_time.t_normal, channels.geom; \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/short_range/srf_18hr_peak_flow_arrival_time.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/short_range/srf_18hr_peak_flow_arrival_time.sql index edf84e0e..400d796d 100644 --- a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/short_range/srf_18hr_peak_flow_arrival_time.sql +++ b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/short_range/srf_18hr_peak_flow_arrival_time.sql @@ -30,7 +30,7 @@ JOIN derived.channels_conus AS channels ON forecasts.feature_id = channels.featu JOIN derived.recurrence_flows_conus AS rf ON forecasts.feature_id = rf.feature_id -- Join in high water arrival time for return time (the yaml config file ensures that arrival time finishes first for this, but we'll join on reference_time as well to ensure) -JOIN publish.srf_high_water_arrival_time AS arrival_time ON forecasts.feature_id = arrival_time.feature_id and forecasts.reference_time = arrival_time.reference_time +JOIN publish.srf_18hr_high_water_arrival_time AS arrival_time ON forecasts.feature_id = arrival_time.feature_id and forecasts.reference_time = arrival_time.reference_time WHERE round((forecasts.streamflow*35.315)::numeric, 2) >= rf.high_water_threshold GROUP BY forecasts.feature_id, forecasts.reference_time, forecasts.nwm_vers, channels.name, channels.strm_order, channels.huc6, channels.state, rf.high_water_threshold, arrival_time.t_normal, max_flows.maxflow_18hour_cms, channels.geom \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/short_range_hawaii/srf_48hr_peak_flow_arrival_time_hi.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/short_range_hawaii/srf_48hr_peak_flow_arrival_time_hi.sql index 29909b2a..a33d9bed 100644 --- a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/short_range_hawaii/srf_48hr_peak_flow_arrival_time_hi.sql +++ b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/short_range_hawaii/srf_48hr_peak_flow_arrival_time_hi.sql @@ -30,7 +30,7 @@ JOIN derived.channels_hi as channels ON forecasts.feature_id = channels.feature_ JOIN derived.recurrence_flows_hi as rf ON forecasts.feature_id = rf.feature_id -- Join in high water arrival time for return time (the yaml config file ensures that arrival time finishes first for this, but we'll join on reference_time as well to ensure) -JOIN publish.srf_high_water_arrival_time_hi as arrival_time ON forecasts.feature_id = arrival_time.feature_id and forecasts.reference_time = arrival_time.reference_time +JOIN publish.srf_48hr_high_water_arrival_time_hi as arrival_time ON forecasts.feature_id = arrival_time.feature_id and forecasts.reference_time = arrival_time.reference_time WHERE (rf.high_water_threshold > 0 OR rf.high_water_threshold = '-9999') AND forecasts.streamflow * 35.315::double precision >= rf.high_water_threshold GROUP BY forecasts.feature_id, forecasts.reference_time, forecasts.nwm_vers, channels.name, channels.strm_order, channels.huc6, rf.high_water_threshold, arrival_time.t_normal, max_flows.maxflow_48hour_cms, channels.geom diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/short_range_puertorico/srf_48hr_peak_flow_arrival_time_prvi.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/short_range_puertorico/srf_48hr_peak_flow_arrival_time_prvi.sql index 28a1ed2a..6c40aa07 100644 --- a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/short_range_puertorico/srf_48hr_peak_flow_arrival_time_prvi.sql +++ b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/short_range_puertorico/srf_48hr_peak_flow_arrival_time_prvi.sql @@ -30,7 +30,7 @@ JOIN derived.channels_prvi as channels ON forecasts.feature_id = channels.featur JOIN derived.recurrence_flows_prvi as rf ON forecasts.feature_id = rf.feature_id -- Join in high water arrival time for return time (the yaml config file ensures that arrival time finishes first for this, but we'll join on reference_time as well to ensure) -JOIN publish.srf_high_water_arrival_time_prvi as arrival_time ON forecasts.feature_id = arrival_time.feature_id and forecasts.reference_time = arrival_time.reference_time +JOIN publish.srf_48hr_high_water_arrival_time_prvi as arrival_time ON forecasts.feature_id = arrival_time.feature_id and forecasts.reference_time = arrival_time.reference_time WHERE round((forecasts.streamflow*35.315)::numeric, 2) >= rf.high_water_threshold GROUP BY forecasts.feature_id, forecasts.reference_time, forecasts.nwm_vers, channels.name, channels.strm_order, channels.huc6, rf.high_water_threshold, arrival_time.t_normal, max_flows.maxflow_48hour_cms, channels.geom \ No newline at end of file From 925bfe9f520efd9f37c9d515eff4376ce6e9557c Mon Sep 17 00:00:00 2001 From: Corey Krewson Date: Fri, 19 May 2023 12:46:47 -0500 Subject: [PATCH 3/6] checking table existence using information schema so that foreign tables work --- .../layers/viz_lambda_shared_funcs/python/viz_classes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py b/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py index 59433aaf..98737f26 100644 --- a/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py +++ b/Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py @@ -243,10 +243,10 @@ def check_required_tables_updated(self, sql_path_or_str, sql_replace={}, referen sql = f''' SELECT EXISTS ( SELECT FROM - pg_tables + information_schema.tables WHERE - schemaname = '{schemaname}' AND - tablename = '{tablename}' + table_schema = '{schemaname}' AND + table_name = '{tablename}' ); ''' cur.execute(sql) From a5fe7eea1f71d35504b29d9332efa203033b766d Mon Sep 17 00:00:00 2001 From: Corey Krewson Date: Fri, 19 May 2023 12:57:24 -0500 Subject: [PATCH 4/6] fixed srf coastal inundation sql file for atlgulf --- .../short_range_coastal/srf_18hr_max_coastal_inundation.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range_coastal/srf_18hr_max_coastal_inundation.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range_coastal/srf_18hr_max_coastal_inundation.yml index 08cc4e0e..8f1efc98 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range_coastal/srf_18hr_max_coastal_inundation.yml +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/short_range_coastal/srf_18hr_max_coastal_inundation.yml @@ -16,7 +16,7 @@ fim_configs: - name: srf_18hr_max_coastal_inundation_atlgulf target_table: ingest.srf_18hr_max_coastal_inundation_atlgulf fim_type: coastal - sql_file: coastal_pacific + sql_file: coastal_atlgulf preprocess: file_format: common/data/model/com/nwm/para/nwm.{{datetime:%Y%m%d}}/short_range_coastal_atlgulf/nwm.t{{datetime:%H}}z.short_range_coastal.total_water.f{{range:1,19,1,%03d}}.atlgulf.nc file_step: None From 97eb1f828a5a8602ffd88a4699978b00b6cb8083 Mon Sep 17 00:00:00 2001 From: Corey Krewson Date: Fri, 19 May 2023 15:17:17 -0500 Subject: [PATCH 5/6] altered update_egis_data to run in a parallel/map state unstaging tables happens concurrently with unstaging rasters unstaging rasters is a map that iterates through workspaces --- .../viz_raster_processing/lambda_function.py | 3 +- .../lambda_function.py | 3 +- .../lambda_function.py | 7 +- .../viz_update_egis_data/lambda_function.py | 109 +++++++------- .../viz_processing_pipeline.json.tftpl | 133 ++++++++++++++---- 5 files changed, 165 insertions(+), 90 deletions(-) diff --git a/Core/LAMBDA/viz_functions/image_based/viz_raster_processing/lambda_function.py b/Core/LAMBDA/viz_functions/image_based/viz_raster_processing/lambda_function.py index a28829ef..a7f7801a 100644 --- a/Core/LAMBDA/viz_functions/image_based/viz_raster_processing/lambda_function.py +++ b/Core/LAMBDA/viz_functions/image_based/viz_raster_processing/lambda_function.py @@ -14,8 +14,9 @@ def lambda_handler(event, context): file_window = event['product']['raster_input_files']['file_window'] product_file = event['product']['raster_input_files']['product_file'] input_bucket = event['product']['raster_input_files']['bucket'] + output_bucket = event['product']['raster_outputs']['output_bucket'] - output_workspace = event['product']['raster_outputs']['output_raster_workspaces'][product_name] + output_workspace = event['product']['raster_outputs']['output_raster_workspaces'][0][product_name] reference_time = event['reference_time'] reference_date = datetime.strptime(reference_time, "%Y-%m-%d %H:%M:%S") diff --git a/Core/LAMBDA/viz_functions/image_based/viz_schism_fim_processing/lambda_function.py b/Core/LAMBDA/viz_functions/image_based/viz_schism_fim_processing/lambda_function.py index 21874495..20811f7c 100644 --- a/Core/LAMBDA/viz_functions/image_based/viz_schism_fim_processing/lambda_function.py +++ b/Core/LAMBDA/viz_functions/image_based/viz_schism_fim_processing/lambda_function.py @@ -55,7 +55,8 @@ def lambda_handler(event, context): max_elevs_file = event['args']['fim_config']['max_file'] output_bucket = event['args']['product']['raster_outputs']['output_bucket'] - output_workspace = event['args']['product']['raster_outputs']['output_raster_workspaces'][fim_config] + output_workspaces = event['args']['product']['raster_outputs']['output_raster_workspaces'] + output_workspace = next(list(workspace.values())[0] for workspace in output_workspaces if list(workspace.keys())[0] == fim_config) schism_fim_s3_uri = f's3://{max_elevs_file_bucket}/{max_elevs_file}' diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py index 0e7a3530..31cc29af 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py @@ -472,10 +472,11 @@ def get_product_metadata(self, specific_products=None, run_only=True): raster_output_bucket = os.environ['RASTER_OUTPUT_BUCKET'] raster_output_prefix = os.environ['RASTER_OUTPUT_PREFIX'] product_metadata['raster_outputs'] = {} - product_metadata['raster_outputs']['output_raster_workspaces'] = {} + product_metadata['raster_outputs']['output_bucket'] = "" + product_metadata['raster_outputs']['output_raster_workspaces'] = [] if product_metadata['product_type'] == "raster": product_metadata['raster_outputs']['output_bucket'] = raster_output_bucket - product_metadata['raster_outputs']['output_raster_workspaces'][product_name] = f"{raster_output_prefix}/{product_name}/{pipeline_run_date}/{pipeline_run_hour}/workspace" + product_metadata['raster_outputs']['output_raster_workspaces'].append({product_name: f"{raster_output_prefix}/{product_name}/{pipeline_run_date}/{pipeline_run_hour}/workspace"}) if not product_metadata.get("fim_configs"): product_metadata['fim_configs'] = [] @@ -492,7 +493,7 @@ def get_product_metadata(self, specific_products=None, run_only=True): if fim_config['fim_type'] == "coastal": if not product_metadata['raster_outputs'].get('output_bucket'): product_metadata['raster_outputs']['output_bucket'] = raster_output_bucket - product_metadata['raster_outputs']['output_raster_workspaces'][fim_config_name] = f"{raster_output_prefix}/{product_name}/{fim_config_name}/{pipeline_run_date}/{pipeline_run_hour}/workspace" + product_metadata['raster_outputs']['output_raster_workspaces'].append({fim_config_name: f"{raster_output_prefix}/{product_name}/{fim_config_name}/{pipeline_run_date}/{pipeline_run_hour}/workspace"}) if not product_metadata.get("postprocess_sql"): product_metadata['postprocess_sql'] = [] diff --git a/Core/LAMBDA/viz_functions/viz_update_egis_data/lambda_function.py b/Core/LAMBDA/viz_functions/viz_update_egis_data/lambda_function.py index a82c0a29..2da18a81 100644 --- a/Core/LAMBDA/viz_functions/viz_update_egis_data/lambda_function.py +++ b/Core/LAMBDA/viz_functions/viz_update_egis_data/lambda_function.py @@ -18,70 +18,69 @@ def lambda_handler(event, context): return ################### Unstage EGIS Tables ################### - if step == "unstage" and job_type != "past_event": - print(f"Unstaging tables for {event['args']['product']['product']}") - target_tables = list(gen_dict_extract("target_table", event['args'])) - all_single_tables = [table for table in target_tables if type(table) is not list] - all_list_tables = [table for table in target_tables if type(table) is list] - all_list_tables = [table for table_list in all_list_tables for table in table_list] - - all_tables = all_single_tables + all_list_tables - publish_tables = [table for table in all_tables if table.startswith("publish")] - dest_tables = [f"services.{table.split('.')[1]}" for table in publish_tables] - - egis_db = database(db_type="egis") - unstage_db_tables(egis_db, dest_tables) + if "unstage" in step and job_type != "past_event": + if step == "unstage_db_tables": + print(f"Unstaging tables for {event['args']['product']['product']}") + target_tables = list(gen_dict_extract("target_table", event['args'])) + all_single_tables = [table for table in target_tables if type(table) is not list] + all_list_tables = [table for table in target_tables if type(table) is list] + all_list_tables = [table for table_list in all_list_tables for table in table_list] + + all_tables = all_single_tables + all_list_tables + publish_tables = [table for table in all_tables if table.startswith("publish")] + dest_tables = [f"services.{table.split('.')[1]}" for table in publish_tables] - ################### Move Rasters ################### - if event['args']['product']['raster_outputs'].get('output_raster_workspaces'): + egis_db = database(db_type="egis") + unstage_db_tables(egis_db, dest_tables) + elif step == "unstage_rasters": + ################### Move Rasters ################### print(f"Moving and caching rasters for {event['args']['product']['product']}") s3 = boto3.resource('s3') - output_raster_workspaces = [workspace for config, workspace in event['args']['product']['raster_outputs']['output_raster_workspaces'].items()] - mrf_extensions = ["idx", "til", "mrf", "mrf.aux.xml"] + s3_bucket = event['args']['raster_output_bucket'] + output_raster_workspace = list(event['args']['raster_output_workspace'].values())[0] + mrf_extensions = ["idx", "til", "mrf", "mrf.aux.xml"] product_name = event['args']['product']['product'] - s3_bucket = event['args']['product']['raster_outputs']['output_bucket'] - for output_raster_workspace in output_raster_workspaces: - print(f"Moving and caching rasters in {output_raster_workspace}") - output_raster_workspace = f"{output_raster_workspace}/tif" - - # Getting any sub configs such as fim_configs - product_sub_config = output_raster_workspace.split(product_name,1)[1] - product_sub_config = product_sub_config.split(reference_date,1)[0][1:-1] - processing_prefix = output_raster_workspace.split(reference_date,1)[0][:-1] + print(f"Moving and caching rasters in {output_raster_workspace}") + output_raster_workspace = f"{output_raster_workspace}/tif" + + # Getting any sub configs such as fim_configs + product_sub_config = output_raster_workspace.split(product_name,1)[1] + product_sub_config = product_sub_config.split(reference_date,1)[0][1:-1] + processing_prefix = output_raster_workspace.split(reference_date,1)[0][:-1] - if product_sub_config: - cache_path = f"viz_cache/{reference_date}/{reference_hour_min}/{product_name}/{product_sub_config}" - else: - cache_path = f"viz_cache/{reference_date}/{reference_hour_min}/{product_name}" + if product_sub_config: + cache_path = f"viz_cache/{reference_date}/{reference_hour_min}/{product_name}/{product_sub_config}" + else: + cache_path = f"viz_cache/{reference_date}/{reference_hour_min}/{product_name}" + + workspace_rasters = list_s3_files(s3_bucket, output_raster_workspace) + for s3_key in workspace_rasters: + s3_object = {"Bucket": s3_bucket, "Key": s3_key} + s3_filename = os.path.basename(s3_key) + cache_key = f"{cache_path}/{s3_filename}" - workspace_rasters = list_s3_files(s3_bucket, output_raster_workspace) - for s3_key in workspace_rasters: - s3_object = {"Bucket": s3_bucket, "Key": s3_key} - s3_filename = os.path.basename(s3_key) - cache_key = f"{cache_path}/{s3_filename}" - - print(f"Caching {s3_key} at {cache_key}") - s3.meta.client.copy(s3_object, s3_bucket, cache_key) - - print("Deleting tif workspace raster") - s3.Object(s3_bucket, s3_key).delete() - - raster_name = s3_filename.replace(".tif", "") - mrf_workspace_prefix = s3_key.replace("/tif/", "/mrf/").replace(".tif", "") - published_prefix = f"{processing_prefix}/published/{raster_name}" - - for extension in mrf_extensions: - mrf_workspace_raster = {"Bucket": s3_bucket, "Key": f"{mrf_workspace_prefix}.{extension}"} - mrf_published_raster = f"{published_prefix}.{extension}" - - if job_type == 'auto': - print(f"Moving {mrf_workspace_prefix}.{extension} to published location at {mrf_published_raster}") - s3.meta.client.copy(mrf_workspace_raster, s3_bucket, mrf_published_raster) + print(f"Caching {s3_key} at {cache_key}") + s3.meta.client.copy(s3_object, s3_bucket, cache_key) + + print("Deleting tif workspace raster") + s3.Object(s3_bucket, s3_key).delete() + + raster_name = s3_filename.replace(".tif", "") + mrf_workspace_prefix = s3_key.replace("/tif/", "/mrf/").replace(".tif", "") + published_prefix = f"{processing_prefix}/published/{raster_name}" + + for extension in mrf_extensions: + mrf_workspace_raster = {"Bucket": s3_bucket, "Key": f"{mrf_workspace_prefix}.{extension}"} + mrf_published_raster = f"{published_prefix}.{extension}" - print("Deleting a mrf workspace raster") - s3.Object(s3_bucket, f"{mrf_workspace_prefix}.{extension}").delete() + if job_type == 'auto': + print(f"Moving {mrf_workspace_prefix}.{extension} to published location at {mrf_published_raster}") + s3.meta.client.copy(mrf_workspace_raster, s3_bucket, mrf_published_raster) + + print("Deleting a mrf workspace raster") + s3.Object(s3_bucket, f"{mrf_workspace_prefix}.{extension}").delete() return True diff --git a/Core/StepFunctions/viz_processing_pipeline.json.tftpl b/Core/StepFunctions/viz_processing_pipeline.json.tftpl index cd20698c..37be26e3 100644 --- a/Core/StepFunctions/viz_processing_pipeline.json.tftpl +++ b/Core/StepFunctions/viz_processing_pipeline.json.tftpl @@ -685,7 +685,7 @@ }, "Parallelize Summaries": { "Type": "Map", - "Next": "Update EGIS Data - Unstage", + "Next": "Parallel", "Iterator": { "StartAt": "Postprocess SQL - Summary", "States": { @@ -763,40 +763,113 @@ }, "ResultPath": null }, - "Update EGIS Data - Unstage": { - "Type": "Task", - "Resource": "arn:aws:states:::lambda:invoke", - "Parameters": { - "FunctionName": "${update_egis_data_arn}", - "Payload": { - "args.$": "$", - "step": "unstage" - } - }, - "Retry": [ + "Parallel": { + "Type": "Parallel", + "Next": "Auto vs. Past Event Run", + "Branches": [ { - "ErrorEquals": [ - "Lambda.ServiceException", - "Lambda.AWSLambdaException", - "Lambda.SdkClientException" - ], - "IntervalSeconds": 2, - "MaxAttempts": 6, - "BackoffRate": 2, - "Comment": "Lambda Service Errors" + "StartAt": "Update EGIS Data - Unstage DB Tables", + "States": { + "Update EGIS Data - Unstage DB Tables": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${update_egis_data_arn}", + "Payload": { + "args.$": "$", + "step": "unstage_db_tables" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2, + "Comment": "Lambda Service Errors" + }, + { + "ErrorEquals": [ + "UndefinedTable" + ], + "BackoffRate": 2, + "IntervalSeconds": 5, + "MaxAttempts": 3, + "Comment": "Stage Table Doesn't Exist" + } + ], + "ResultPath": null, + "End": true + } + } }, { - "ErrorEquals": [ - "UndefinedTable" - ], - "BackoffRate": 2, - "IntervalSeconds": 5, - "MaxAttempts": 3, - "Comment": "Stage Table Doesn't Exist" + "StartAt": "Unstage Rasters", + "States": { + "Unstage Rasters": { + "Type": "Map", + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "Update EGIS Data - Unstage Rasters", + "States": { + "Update EGIS Data - Unstage Rasters": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${update_egis_data_arn}", + "Payload": { + "args.$": "$", + "step": "unstage_rasters" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2, + "Comment": "Lambda Service Errors" + }, + { + "ErrorEquals": [ + "UndefinedTable" + ], + "BackoffRate": 2, + "IntervalSeconds": 5, + "MaxAttempts": 3, + "Comment": "Stage Table Doesn't Exist" + } + ], + "ResultPath": null, + "End": true + } + } + }, + "End": true, + "ItemSelector": { + "product.$": "$.product", + "reference_time.$": "$.reference_time", + "job_type.$": "$.job_type", + "sql_rename_dict.$": "$.sql_rename_dict", + "raster_output_bucket.$": "$.product.raster_outputs.output_bucket", + "raster_output_workspace.$": "$$.Map.Item.Value" + }, + "ItemsPath": "$.product.raster_outputs.output_raster_workspaces" + } + } } ], - "ResultPath": null, - "Next": "Auto vs. Past Event Run" + "ResultPath": null }, "Auto vs. Past Event Run": { "Type": "Choice", From 055a2f670f2d3d8df867b58d59ee36347a04ae88 Mon Sep 17 00:00:00 2001 From: Corey Krewson Date: Mon, 22 May 2023 09:03:47 -0500 Subject: [PATCH 6/6] moved deleteing publish flag to update SD file script This makes more sense to delete the publish flag after we confirm that the new SD file is created --- Core/EC2/viz/scripts/viz_ec2_setup.ps1 | 7 ------- .../deploy/update_data_stores_and_sd_files.py | 15 +++++++++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/Core/EC2/viz/scripts/viz_ec2_setup.ps1 b/Core/EC2/viz/scripts/viz_ec2_setup.ps1 index 874d9498..c6178204 100644 --- a/Core/EC2/viz/scripts/viz_ec2_setup.ps1 +++ b/Core/EC2/viz/scripts/viz_ec2_setup.ps1 @@ -279,13 +279,6 @@ LogWrite "ADDING $PUBLISHED_ROOT TO $EGIS_HOST" $python_file = "$AWS_SERVICE_REPO\aws_loosa\deploy\update_data_stores_and_sd_files.py" Invoke-CommandAs -ScriptBlock { param($python_file) & "C:\Program Files\ArcGIS\Pro\bin\Python\envs\viz\python.exe" $python_file } -ArgumentList $python_file -AsUser $cred -LogWrite "DELETING PUBLISHED FLAGS IF THEY EXIST" -$EXISTING_PUBLISHED_FLAGS = aws s3 ls $FLAGS_ROOT -if ($EXISTING_PUBLISHED_FLAGS) { - LogWrite "DELETING PUBLISHED FLAGS" - aws s3 rm $FLAGS_ROOT --recursive -} - Set-Location HKCU:\Software\ESRI\ArcGISPro Remove-Item -Recurse -Force -Confirm:$false Licensing diff --git a/Source/Visualizations/aws_loosa/deploy/update_data_stores_and_sd_files.py b/Source/Visualizations/aws_loosa/deploy/update_data_stores_and_sd_files.py index 2f4c1980..6f0a6fbb 100644 --- a/Source/Visualizations/aws_loosa/deploy/update_data_stores_and_sd_files.py +++ b/Source/Visualizations/aws_loosa/deploy/update_data_stores_and_sd_files.py @@ -9,7 +9,7 @@ import xml.dom.minidom as DOM from aws_loosa.consts import paths -from aws_loosa.utils.viz_lambda_shared_funcs import get_service_metadata, get_mapx_files +from aws_loosa.utils.viz_lambda_shared_funcs import get_service_metadata, get_mapx_files, check_s3_file_existence s3 = boto3.resource('s3') s3_client = boto3.client('s3') @@ -92,6 +92,7 @@ def update_db_sd_files(): print("Updating mapx files and creating SD files") sd_folder = os.path.join(paths.AUTHORITATIVE_ROOT, "sd_files") deployment_bucket = os.environ['DEPLOYMENT_DATA_BUCKET'] + fim_output_bucket = os.environ['FIM_OUTPUT_BUCKET'] print("Creating connection string to DB") conn_str = arcpy.management.CreateDatabaseConnectionString( @@ -139,16 +140,22 @@ def update_db_sd_files(): ExtraArgs={"ServerSideEncryption": "aws:kms"} ) + print(f"Deleting publish flag for {service_name}") + publish_folder = service_data['egis_folder'] + publish_server = service_data['egis_server'] + publish_flag = f"published_flags/{publish_server}/{publish_folder}/{service_name}/{service_name}" + if check_s3_file_existence(fim_output_bucket, publish_flag): + s3_client.delete_object(Bucket=fim_output_bucket, Key=publish_flag) def create_sd_file(aprx, service_name, sd_folder, conn_str, service_data): sd_service_name = f"{service_name}{consts.SERVICE_NAME_TAG}" sd_creation_folder = "C:\\Users\\arcgis\\sd_creation" - sd_file = os.path.join(sd_creation_folder, service_name) + sd_creation_file = os.path.join(sd_creation_folder, service_name) if not os.path.exists(sd_creation_folder): os.makedirs(sd_creation_folder) - if os.path.exists(sd_file): + if os.path.exists(sd_creation_file): print(f"SD file already created for {service_name}") return @@ -339,7 +346,7 @@ def create_sd_file(aprx, service_name, sd_folder, conn_str, service_data): print(e) return - file2 = open(sd_file,"w+") + file2 = open(sd_creation_file,"w+") file2.close() os.remove(sddraft_output_filename)