Skip to content

Commit

Permalink
Handle the case when LogUri is missing in EMR response
Browse files Browse the repository at this point in the history
When EMR describe-cluster, does not have LogURI information, we
should silently skip persisting the link to XCom rather than fail
on KeyError.

Fixes: apache#31480
  • Loading branch information
potiuk committed May 23, 2023
1 parent c082aec commit 106944d
Showing 1 changed file with 48 additions and 32 deletions.
80 changes: 48 additions & 32 deletions airflow/providers/amazon/aws/operators/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,18 @@ def execute(self, context: Context) -> list[str]:
aws_partition=emr_hook.conn_partition,
job_flow_id=job_flow_id,
)
EmrLogsLink.persist(
context=context,
operator=self,
region_name=emr_hook.conn_region_name,
aws_partition=emr_hook.conn_partition,
job_flow_id=self.job_flow_id,
log_uri=get_log_uri(emr_client=emr_hook.conn, job_flow_id=job_flow_id),
)
try:
EmrLogsLink.persist(
context=context,
operator=self,
region_name=emr_hook.conn_region_name,
aws_partition=emr_hook.conn_partition,
job_flow_id=self.job_flow_id,
log_uri=get_log_uri(emr_client=emr_hook.conn, job_flow_id=job_flow_id),
)
except KeyError:
# There is no logURI in the response, so we can't create a link.
pass

self.log.info("Adding steps to %s", job_flow_id)

Expand Down Expand Up @@ -686,14 +690,18 @@ def execute(self, context: Context) -> str | None:
job_flow_id=self._job_flow_id,
)
if self._job_flow_id:
EmrLogsLink.persist(
context=context,
operator=self,
region_name=self._emr_hook.conn_region_name,
aws_partition=self._emr_hook.conn_partition,
job_flow_id=self._job_flow_id,
log_uri=get_log_uri(emr_client=self._emr_hook.conn, job_flow_id=self._job_flow_id),
)
try:
EmrLogsLink.persist(
context=context,
operator=self,
region_name=self._emr_hook.conn_region_name,
aws_partition=self._emr_hook.conn_partition,
job_flow_id=self._job_flow_id,
log_uri=get_log_uri(emr_client=self._emr_hook.conn, job_flow_id=self._job_flow_id),
)
except KeyError:
# There is no logURI in the response, so we can't create a link.
pass

if self.wait_for_completion:
self._emr_hook.get_waiter("job_flow_waiting").wait(
Expand Down Expand Up @@ -762,14 +770,18 @@ def execute(self, context: Context) -> int:
aws_partition=emr_hook.conn_partition,
job_flow_id=self.cluster_id,
)
EmrLogsLink.persist(
context=context,
operator=self,
region_name=emr_hook.conn_region_name,
aws_partition=emr_hook.conn_partition,
job_flow_id=self.cluster_id,
log_uri=get_log_uri(emr_client=emr_hook.conn, job_flow_id=self.cluster_id),
)
try:
EmrLogsLink.persist(
context=context,
operator=self,
region_name=emr_hook.conn_region_name,
aws_partition=emr_hook.conn_partition,
job_flow_id=self.cluster_id,
log_uri=get_log_uri(emr_client=emr_hook.conn, job_flow_id=self.cluster_id),
)
except KeyError:
# There is no logURI in the response, so we can't create a link.
pass

self.log.info("Modifying cluster %s", self.cluster_id)
response = emr.modify_cluster(
Expand Down Expand Up @@ -819,14 +831,18 @@ def execute(self, context: Context) -> None:
aws_partition=emr_hook.conn_partition,
job_flow_id=self.job_flow_id,
)
EmrLogsLink.persist(
context=context,
operator=self,
region_name=emr_hook.conn_region_name,
aws_partition=emr_hook.conn_partition,
job_flow_id=self.job_flow_id,
log_uri=get_log_uri(emr_client=emr, job_flow_id=self.job_flow_id),
)
try:
EmrLogsLink.persist(
context=context,
operator=self,
region_name=emr_hook.conn_region_name,
aws_partition=emr_hook.conn_partition,
job_flow_id=self.job_flow_id,
log_uri=get_log_uri(emr_client=emr, job_flow_id=self.job_flow_id),
)
except KeyError:
# There is no logURI in the response, so we can't create a link.
pass

self.log.info("Terminating JobFlow %s", self.job_flow_id)
response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])
Expand Down

0 comments on commit 106944d

Please sign in to comment.