Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lambda improvements #84

Merged
merged 2 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/pds/ingress/pds-nucleus-datasync-completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

rds_data = boto3.client('rds-data')


def lambda_handler(event, context):
""" Lambda Handler """

Expand All @@ -43,7 +42,7 @@ def lambda_handler(event, context):
task_id = resource_list[1]
exec_id = resource_list[3]

prefix = f"Detailed-Reports/{task_id}/{exec_id}/{exec_id}.files-transferred-"
prefix = f"Detailed-Reports/{task_id}/{exec_id}/{exec_id}.files-verified-"

datasync_reports_s3_bucket = s3.Bucket(datasync_reports_s3_bucket_name)

Expand All @@ -53,11 +52,11 @@ def lambda_handler(event, context):

transfer_report_file_content = transfer_report.get()['Body'].read().decode('utf-8')
transfer_report_json_content = json.loads(transfer_report_file_content)
trasfered_file_obj_list = transfer_report_json_content['Transferred']
verified_file_obj_list = transfer_report_json_content['Verified']

logger.debug(f"trasfered_file_obj_list: {trasfered_file_obj_list}")
logger.debug(f"verified_file_obj_list: {verified_file_obj_list}")

for file_obj in trasfered_file_obj_list:
for file_obj in verified_file_obj_list:

obj_name = file_obj['RelativePath']
obj_type = file_obj['SrcMetadata']['Type']
Expand Down
37 changes: 22 additions & 15 deletions src/pds/ingress/pds-nucleus-product-completion-checker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
==============================================
pds-nucleus-product-completion-checker-batch.py
==============================================
============================================================
pds-nucleus-product-completion-checker.py (batch processing)
============================================================ =

Lambda function to check if the staging S3 bucket has received a complete product
with all required files. This lambda function is triggered periodically.
Expand All @@ -18,6 +18,7 @@
import http.client
import base64
import ast
import uuid

from xml.dom import minidom

Expand All @@ -29,10 +30,10 @@
rds_data = boto3.client('rds-data')

mwaa_env_name = 'PDS-Nucleus-Airflow-Env'
dag_name = 'PDS_Registry_Use_Case_61_Messenger_Batch-logs'
mwaa_cli_command = 'dags trigger'

# Read environment variables from lambda configurations
dag_name = os.environ.get('AIRFLOW_DAG_NAME')
node_name = os.environ.get('NODE_NAME')
es_url = os.environ.get('ES_URL')
replace_prefix_with = os.environ.get('REPLACE_PREFIX_WITH')
Expand All @@ -47,7 +48,7 @@
def lambda_handler(event, context):
""" Main lambda handler """

logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

logger.info(f"Lambda Request ID: {context.aws_request_id}")
Expand All @@ -65,11 +66,13 @@ def process_completed_products():

logger.debug("Checking completed products...")

sql = """
select distinct s3_url_of_product_label from product where processing_status = 'INCOMPLETE' and s3_url_of_product_label
NOT IN (select distinct s3_url_of_product_label from product_data_file_mapping
where s3_url_of_data_file
NOT IN (select s3_url_of_data_file from data_file));
sql = """
SELECT DISTINCT s3_url_of_product_label from product
WHERE processing_status = 'INCOMPLETE' and s3_url_of_product_label
NOT IN (SELECT s3_url_of_product_label from product_data_file_mapping
where s3_url_of_data_file
NOT IN (SELECT s3_url_of_data_file from data_file)) and s3_url_of_product_label
IN (SELECT s3_url_of_product_label from product_data_file_mapping);
"""

response = rds_data.execute_statement(
Expand Down Expand Up @@ -136,15 +139,15 @@ def submit_data_to_nucleus(list_of_product_labels_to_process):


def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_process):
""" Creates harvest manifest file and harvest config file and trigger Nucleus workflow """
""" Creates harvest manifest file and harvest config file and trigger Nucleus workflow """

logger.debug('List of product labels to process:' + str(list_of_product_labels_to_process))

efs_mount_path = os.environ.get('EFS_MOUNT_PATH')

harvest_config_dir = efs_mount_path + '/harvest-configs'

file_name = os.path.basename(list_of_product_labels_to_process[0].replace("s3:/", efs_mount_path, 1))
file_name = os.path.basename(list_of_product_labels_to_process[0].replace("s3:/", efs_mount_path, 1) )

harvest_manifest_content = ""
list_of_product_labels_to_process_with_file_paths = []
Expand All @@ -154,10 +157,13 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc
harvest_manifest_content = harvest_manifest_content + efs_product_label_file_location + '\n'
list_of_product_labels_to_process_with_file_paths.append(efs_product_label_file_location)

# Generate a random suffix for harvest config file name and manifest file name to avoid conflicting duplicate file names
random_suffix = uuid.uuid4().hex

try:
os.makedirs(harvest_config_dir, exist_ok=True)
harvest_config_file_path = harvest_config_dir + '/harvest_' + file_name + '.cfg'
harvest_manifest_file_path = harvest_config_dir + '/harvest_manifest_' + file_name + '.txt'
harvest_config_file_path = harvest_config_dir + '/harvest_' + file_name + '_' + random_suffix + '.cfg'
harvest_manifest_file_path = harvest_config_dir + '/harvest_manifest_' + file_name + '_' + random_suffix + '.txt'

logger.debug(f"Manifest content: {str(harvest_manifest_content)}")

Expand Down Expand Up @@ -189,6 +195,7 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc
logger.info(f"Created harvest config XML file: {harvest_config_file_path}")
except Exception as e:
logger.error(f"Error creating harvest config files in : {harvest_config_dir}. Exception: {str(e)}")
return

trigger_nucleus_workflow(harvest_manifest_file_path, harvest_config_file_path,
list_of_product_labels_to_process_with_file_paths)
Expand All @@ -197,7 +204,7 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc


def trigger_nucleus_workflow(harvest_manifest_file_path, pds_harvest_config_file, list_of_product_labels_to_process):
""" Triggers Nucleus workflow with parameters """
""" Triggers Nucleus workflow with parameters """

# Convert list to comma seperated list
delim = ","
Expand Down
Loading