diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py index 3f8b6bf2e6bce..77094269a1951 100644 --- a/airflow/sensors/base.py +++ b/airflow/sensors/base.py @@ -339,12 +339,6 @@ def reschedule(self): def get_serialized_fields(cls): return super().get_serialized_fields() | {"reschedule"} - def raise_failed_or_skiping_exception(self, *, failed_message: str, skipping_message: str = "") -> None: - """Raise AirflowSkipException if self.soft_fail is set to True. Otherwise raise AirflowException.""" - if self.soft_fail: - raise AirflowSkipException(skipping_message or failed_message) - raise AirflowException(failed_message) - def poke_mode_only(cls): """ diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py index ffc2cc1313f0a..92562589b212b 100644 --- a/airflow/sensors/external_task.py +++ b/airflow/sensors/external_task.py @@ -222,8 +222,6 @@ def __init__( self.deferrable = deferrable self.poll_interval = poll_interval - self._skipping_message_postfix = " Skipping due to soft_fail." - def _get_dttm_filter(self, context): if self.execution_delta: dttm = context["logical_date"] - self.execution_delta @@ -276,28 +274,32 @@ def poke(self, context: Context, session: Session = NEW_SESSION) -> bool: # Fail if anything in the list has failed. if count_failed > 0: if self.external_task_ids: - failed_message = ( + if self.soft_fail: + raise AirflowSkipException( + f"Some of the external tasks {self.external_task_ids} " + f"in DAG {self.external_dag_id} failed. Skipping due to soft_fail." + ) + raise AirflowException( f"Some of the external tasks {self.external_task_ids} " f"in DAG {self.external_dag_id} failed." ) - - self.raise_failed_or_skiping_exception( - failed_message=failed_message, - skipping_message=f"{failed_message}{self._skipping_message_postfix}", - ) elif self.external_task_group_id: - self.raise_failed_or_skiping_exception( - failed_message=( + if self.soft_fail: + raise AirflowSkipException( f"The external task_group '{self.external_task_group_id}' " - f"in DAG '{self.external_dag_id}' failed." + f"in DAG '{self.external_dag_id}' failed. Skipping due to soft_fail." ) + raise AirflowException( + f"The external task_group '{self.external_task_group_id}' " + f"in DAG '{self.external_dag_id}' failed." ) + else: - failed_message = f"The external DAG {self.external_dag_id} failed." - self.raise_failed_or_skiping_exception( - failed_message=failed_message, - skipping_message=f"{failed_message}{self._skipping_message_postfix}", - ) + if self.soft_fail: + raise AirflowSkipException( + f"The external DAG {self.external_dag_id} failed. Skipping due to soft_fail." + ) + raise AirflowException(f"The external DAG {self.external_dag_id} failed.") count_skipped = -1 if self.skipped_states: @@ -349,20 +351,12 @@ def execute_complete(self, context, event=None): self.log.info("External task %s has executed successfully.", self.external_task_id) return None elif event["status"] == "timeout": - failed_message = "Dag was not started within 1 minute, assuming fail." - self.raise_failed_or_skiping_exception( - failed_message=failed_message, - skipping_message=f"{failed_message}{self._skipping_message_postfix}", - ) + raise AirflowException("Dag was not started within 1 minute, assuming fail.") else: - failed_message = ( + raise AirflowException( "Error occurred while trying to retrieve task status. Please, check the " "name of executed task and Dag." ) - self.raise_failed_or_skiping_exception( - failed_message=failed_message, - skipping_message=f"{failed_message}{self._skipping_message_postfix}", - ) def _check_for_existence(self, session) -> None: dag_to_wait = DagModel.get_current(self.external_dag_id, session)