Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
satish-chinthanippu authored May 28, 2024
2 parents 1b1e1cc + 729bb64 commit c235f87
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 7 deletions.
20 changes: 15 additions & 5 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import tenacity
from deprecated import deprecated
from kubernetes.client import CoreV1Api, V1Pod, models as k8s
from kubernetes.client.exceptions import ApiException
from kubernetes.stream import stream
from urllib3.exceptions import HTTPError

Expand Down Expand Up @@ -788,9 +789,18 @@ def _clean(self, event: dict[str, Any]) -> None:
# Skip await_pod_completion when the event is 'timeout' due to the pod can hang
# on the ErrImagePull or ContainerCreating step and it will never complete
if event["status"] != "timeout":
self.pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
try:
self.pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
except ApiException as e:
if e.status == 404:
self.pod = None
self.log.warning(
"Pod not found while waiting for completion. The last status was %r", event["status"]
)
else:
raise e
if self.pod is not None:
self.post_complete_action(
pod=self.pod,
Expand All @@ -817,11 +827,11 @@ def _write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime
line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
if line:
self.log.info("[%s] logs: %s", self.base_container_name, line)
except HTTPError as e:
except (HTTPError, ApiException) as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
"Set log level to DEBUG for traceback.",
e,
e if not isinstance(e, ApiException) else e.reason,
)

def post_complete_action(self, *, pod, remote_pod, **kwargs) -> None:
Expand Down
13 changes: 12 additions & 1 deletion tests/integration/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import sys
from datetime import datetime
from time import sleep
from unittest import mock

# leave this it is used by the test worker
Expand All @@ -42,6 +43,8 @@
from airflow.utils.state import State
from tests.test_utils import db

logger = logging.getLogger(__name__)


def _prepare_test_bodies():
if "CELERY_BROKER_URLS" in os.environ:
Expand Down Expand Up @@ -145,7 +148,15 @@ def fake_execute_command(command):
executor.task_publish_retries[key] = 1

executor._process_tasks(task_tuples_to_send)

for _ in range(20):
num_tasks = len(executor.tasks.keys())
if num_tasks == 2:
break
logger.info(
"Waiting 0.1 s for tasks to be processed asynchronously. Processed so far %d",
num_tasks,
)
sleep(0.4)
assert list(executor.tasks.keys()) == [
("success", "fake_simple_ti", execute_date, 0),
("fail", "fake_simple_ti", execute_date, 0),
Expand Down
24 changes: 23 additions & 1 deletion tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import pendulum
import pytest
from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, V1PodStatus, models as k8s
from kubernetes.client.rest import ApiException
from kubernetes.client.exceptions import ApiException
from urllib3 import HTTPResponse

from airflow.exceptions import (
Expand Down Expand Up @@ -2062,6 +2062,28 @@ def test_async_write_logs_should_execute_successfully(
else:
mock_manager.return_value.read_pod_logs.assert_not_called()

@patch(KUB_OP_PATH.format("post_complete_action"))
@patch(KUB_OP_PATH.format("client"))
@patch(KUB_OP_PATH.format("extract_xcom"))
@patch(HOOK_CLASS)
@patch(KUB_OP_PATH.format("pod_manager"))
def test_async_write_logs_handler_api_exception(
self, mock_manager, mocked_hook, mock_extract_xcom, post_complete_action, mocked_client
):
mocked_client.read_namespaced_pod_log.side_effect = ApiException(status=404)
mock_manager.await_pod_completion.side_effect = ApiException(status=404)
mocked_hook.return_value.get_pod.return_value = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE)
)
mock_extract_xcom.return_value = "{}"
k = KubernetesPodOperator(
task_id="task",
get_logs=True,
deferrable=True,
)
self.run_pod_async(k)
post_complete_action.assert_not_called()

@pytest.mark.parametrize(
"log_pod_spec_on_failure,expect_match",
[
Expand Down

0 comments on commit c235f87

Please sign in to comment.