diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index d530f86bcc3ae..ef5366d6c4030 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -36,6 +36,7 @@ import tenacity from deprecated import deprecated from kubernetes.client import CoreV1Api, V1Pod, models as k8s +from kubernetes.client.exceptions import ApiException from kubernetes.stream import stream from urllib3.exceptions import HTTPError @@ -788,9 +789,18 @@ def _clean(self, event: dict[str, Any]) -> None: # Skip await_pod_completion when the event is 'timeout' due to the pod can hang # on the ErrImagePull or ContainerCreating step and it will never complete if event["status"] != "timeout": - self.pod = self.pod_manager.await_pod_completion( - self.pod, istio_enabled, self.base_container_name - ) + try: + self.pod = self.pod_manager.await_pod_completion( + self.pod, istio_enabled, self.base_container_name + ) + except ApiException as e: + if e.status == 404: + self.pod = None + self.log.warning( + "Pod not found while waiting for completion. The last status was %r", event["status"] + ) + else: + raise e if self.pod is not None: self.post_complete_action( pod=self.pod, @@ -817,11 +827,11 @@ def _write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n") if line: self.log.info("[%s] logs: %s", self.base_container_name, line) - except HTTPError as e: + except (HTTPError, ApiException) as e: self.log.warning( "Reading of logs interrupted with error %r; will retry. " "Set log level to DEBUG for traceback.", - e, + e if not isinstance(e, ApiException) else e.reason, ) def post_complete_action(self, *, pod, remote_pod, **kwargs) -> None: diff --git a/tests/integration/executors/test_celery_executor.py b/tests/integration/executors/test_celery_executor.py index 72e9ca9e10b1f..2b72d2e91d1a2 100644 --- a/tests/integration/executors/test_celery_executor.py +++ b/tests/integration/executors/test_celery_executor.py @@ -23,6 +23,7 @@ import os import sys from datetime import datetime +from time import sleep from unittest import mock # leave this it is used by the test worker @@ -42,6 +43,8 @@ from airflow.utils.state import State from tests.test_utils import db +logger = logging.getLogger(__name__) + def _prepare_test_bodies(): if "CELERY_BROKER_URLS" in os.environ: @@ -145,7 +148,15 @@ def fake_execute_command(command): executor.task_publish_retries[key] = 1 executor._process_tasks(task_tuples_to_send) - + for _ in range(20): + num_tasks = len(executor.tasks.keys()) + if num_tasks == 2: + break + logger.info( + "Waiting 0.1 s for tasks to be processed asynchronously. Processed so far %d", + num_tasks, + ) + sleep(0.4) assert list(executor.tasks.keys()) == [ ("success", "fake_simple_ti", execute_date, 0), ("fail", "fake_simple_ti", execute_date, 0), diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index 8b7c238e9c143..f7a8e24d6973d 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -25,7 +25,7 @@ import pendulum import pytest from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, V1PodStatus, models as k8s -from kubernetes.client.rest import ApiException +from kubernetes.client.exceptions import ApiException from urllib3 import HTTPResponse from airflow.exceptions import ( @@ -2062,6 +2062,28 @@ def test_async_write_logs_should_execute_successfully( else: mock_manager.return_value.read_pod_logs.assert_not_called() + @patch(KUB_OP_PATH.format("post_complete_action")) + @patch(KUB_OP_PATH.format("client")) + @patch(KUB_OP_PATH.format("extract_xcom")) + @patch(HOOK_CLASS) + @patch(KUB_OP_PATH.format("pod_manager")) + def test_async_write_logs_handler_api_exception( + self, mock_manager, mocked_hook, mock_extract_xcom, post_complete_action, mocked_client + ): + mocked_client.read_namespaced_pod_log.side_effect = ApiException(status=404) + mock_manager.await_pod_completion.side_effect = ApiException(status=404) + mocked_hook.return_value.get_pod.return_value = k8s.V1Pod( + metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE) + ) + mock_extract_xcom.return_value = "{}" + k = KubernetesPodOperator( + task_id="task", + get_logs=True, + deferrable=True, + ) + self.run_pod_async(k) + post_complete_action.assert_not_called() + @pytest.mark.parametrize( "log_pod_spec_on_failure,expect_match", [