From 106944df61bed82e804e5a73a4e87cd5adffd95d Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 23 May 2023 19:15:41 +0200 Subject: [PATCH] Handle the case when LogUri is missing in EMR response When EMR describe-cluster, does not have LogURI information, we should silently skip persisting the link to XCom rather than fail on KeyError. Fixes: #31480 --- airflow/providers/amazon/aws/operators/emr.py | 80 +++++++++++-------- 1 file changed, 48 insertions(+), 32 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index 3e36231993c12..b0402df336d41 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -122,14 +122,18 @@ def execute(self, context: Context) -> list[str]: aws_partition=emr_hook.conn_partition, job_flow_id=job_flow_id, ) - EmrLogsLink.persist( - context=context, - operator=self, - region_name=emr_hook.conn_region_name, - aws_partition=emr_hook.conn_partition, - job_flow_id=self.job_flow_id, - log_uri=get_log_uri(emr_client=emr_hook.conn, job_flow_id=job_flow_id), - ) + try: + EmrLogsLink.persist( + context=context, + operator=self, + region_name=emr_hook.conn_region_name, + aws_partition=emr_hook.conn_partition, + job_flow_id=self.job_flow_id, + log_uri=get_log_uri(emr_client=emr_hook.conn, job_flow_id=job_flow_id), + ) + except KeyError: + # There is no logURI in the response, so we can't create a link. + pass self.log.info("Adding steps to %s", job_flow_id) @@ -686,14 +690,18 @@ def execute(self, context: Context) -> str | None: job_flow_id=self._job_flow_id, ) if self._job_flow_id: - EmrLogsLink.persist( - context=context, - operator=self, - region_name=self._emr_hook.conn_region_name, - aws_partition=self._emr_hook.conn_partition, - job_flow_id=self._job_flow_id, - log_uri=get_log_uri(emr_client=self._emr_hook.conn, job_flow_id=self._job_flow_id), - ) + try: + EmrLogsLink.persist( + context=context, + operator=self, + region_name=self._emr_hook.conn_region_name, + aws_partition=self._emr_hook.conn_partition, + job_flow_id=self._job_flow_id, + log_uri=get_log_uri(emr_client=self._emr_hook.conn, job_flow_id=self._job_flow_id), + ) + except KeyError: + # There is no logURI in the response, so we can't create a link. + pass if self.wait_for_completion: self._emr_hook.get_waiter("job_flow_waiting").wait( @@ -762,14 +770,18 @@ def execute(self, context: Context) -> int: aws_partition=emr_hook.conn_partition, job_flow_id=self.cluster_id, ) - EmrLogsLink.persist( - context=context, - operator=self, - region_name=emr_hook.conn_region_name, - aws_partition=emr_hook.conn_partition, - job_flow_id=self.cluster_id, - log_uri=get_log_uri(emr_client=emr_hook.conn, job_flow_id=self.cluster_id), - ) + try: + EmrLogsLink.persist( + context=context, + operator=self, + region_name=emr_hook.conn_region_name, + aws_partition=emr_hook.conn_partition, + job_flow_id=self.cluster_id, + log_uri=get_log_uri(emr_client=emr_hook.conn, job_flow_id=self.cluster_id), + ) + except KeyError: + # There is no logURI in the response, so we can't create a link. + pass self.log.info("Modifying cluster %s", self.cluster_id) response = emr.modify_cluster( @@ -819,14 +831,18 @@ def execute(self, context: Context) -> None: aws_partition=emr_hook.conn_partition, job_flow_id=self.job_flow_id, ) - EmrLogsLink.persist( - context=context, - operator=self, - region_name=emr_hook.conn_region_name, - aws_partition=emr_hook.conn_partition, - job_flow_id=self.job_flow_id, - log_uri=get_log_uri(emr_client=emr, job_flow_id=self.job_flow_id), - ) + try: + EmrLogsLink.persist( + context=context, + operator=self, + region_name=emr_hook.conn_region_name, + aws_partition=emr_hook.conn_partition, + job_flow_id=self.job_flow_id, + log_uri=get_log_uri(emr_client=emr, job_flow_id=self.job_flow_id), + ) + except KeyError: + # There is no logURI in the response, so we can't create a link. + pass self.log.info("Terminating JobFlow %s", self.job_flow_id) response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])