Skip to content

Commit

Permalink
Fixed hanged KubernetesPodOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
moiseenkov authored and potiuk committed Feb 27, 2023
1 parent 0db38ad commit 6007df5
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 25 deletions.
2 changes: 2 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class KubernetesPodOperator(BaseOperator):
BASE_CONTAINER_NAME = "base"

POD_CHECKED_KEY = "already_checked"
POST_TERMINATION_TIMEOUT = 120

template_fields: Sequence[str] = (
"image",
Expand Down Expand Up @@ -533,6 +534,7 @@ def execute_sync(self, context: Context):
pod=self.pod,
container_name=self.base_container_name,
follow=True,
post_termination_timeout=self.POST_TERMINATION_TIMEOUT,
)
else:
self.pod_manager.await_container_completion(
Expand Down
140 changes: 129 additions & 11 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,26 @@
import warnings
from contextlib import closing, suppress
from dataclasses import dataclass
from datetime import datetime
from typing import TYPE_CHECKING, Iterable, cast
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Generator, cast

import pendulum
import tenacity
from kubernetes import client, watch
from kubernetes.client.models.v1_container_status import V1ContainerStatus
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
from pendulum import DateTime
from pendulum.parsing.exceptions import ParserError
from urllib3.exceptions import HTTPError as BaseHTTPError
from urllib3.response import HTTPResponse

from airflow.exceptions import AirflowException
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.pod_generator import PodDefaults
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.timezone import utcnow

if TYPE_CHECKING:
from kubernetes.client.models.core_v1_event_list import CoreV1EventList
Expand Down Expand Up @@ -70,15 +73,24 @@ class PodPhase:
terminal_states = {FAILED, SUCCEEDED}


def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None:
"""Retrieves container status"""
container_statuses = pod.status.container_statuses if pod and pod.status else None
if container_statuses:
# In general the variable container_statuses can store multiple items matching different containers.
# The following generator expression yields all items that have name equal to the container_name.
# The function next() here calls the generator to get only the first value. If there's nothing found
# then None is returned.
return next((x for x in container_statuses if x.name == container_name), None)
return None


def container_is_running(pod: V1Pod, container_name: str) -> bool:
"""
Examines V1Pod ``pod`` to determine whether ``container_name`` is running.
If that container is present and running, returns True. Returns False otherwise.
"""
container_statuses = pod.status.container_statuses if pod and pod.status else None
if not container_statuses:
return False
container_status = next((x for x in container_statuses if x.name == container_name), None)
container_status = get_container_status(pod, container_name)
if not container_status:
return False
return container_status.state.running is not None
Expand All @@ -91,6 +103,92 @@ def get_container_termination_message(pod: V1Pod, container_name: str):
return container_status.state.terminated.message if container_status else None


class PodLogsConsumer:
"""
PodLogsConsumer is responsible for pulling pod logs from a stream with checking a container status before
reading data.
This class is a workaround for the issue https://github.com/apache/airflow/issues/23497
:param response: HTTP response with logs
:param pod: Pod instance from Kubernetes client
:param pod_manager: Pod manager instance
:param container_name: Name of the container that we're reading logs from
:param post_termination_timeout: (Optional) The period of time in seconds representing for how long time
logs are available after the container termination.
:param read_pod_cache_timeout: (Optional) The container's status cache lifetime.
The container status is cached to reduce API calls.
:meta private:
"""

def __init__(
self,
response: HTTPResponse,
pod: V1Pod,
pod_manager: PodManager,
container_name: str,
post_termination_timeout: int = 120,
read_pod_cache_timeout: int = 120,
):
self.response = response
self.pod = pod
self.pod_manager = pod_manager
self.container_name = container_name
self.post_termination_timeout = post_termination_timeout
self.last_read_pod_at = None
self.read_pod_cache = None
self.read_pod_cache_timeout = read_pod_cache_timeout

def __iter__(self) -> Generator[bytes, None, None]:
r"""The generator yields log items divided by the '\n' symbol."""
incomplete_log_item: list[bytes] = []
if self.logs_available():
for data_chunk in self.response.stream(amt=None, decode_content=True):
if b"\n" in data_chunk:
log_items = data_chunk.split(b"\n")
yield from self._extract_log_items(incomplete_log_item, log_items)
incomplete_log_item = self._save_incomplete_log_item(log_items[-1])
else:
incomplete_log_item.append(data_chunk)
if not self.logs_available():
break
if incomplete_log_item:
yield b"".join(incomplete_log_item)

@staticmethod
def _extract_log_items(incomplete_log_item: list[bytes], log_items: list[bytes]):
yield b"".join(incomplete_log_item) + log_items[0] + b"\n"
for x in log_items[1:-1]:
yield x + b"\n"

@staticmethod
def _save_incomplete_log_item(sub_chunk: bytes):
return [sub_chunk] if [sub_chunk] else []

def logs_available(self):
remote_pod = self.read_pod()
if container_is_running(pod=remote_pod, container_name=self.container_name):
return True
container_status = get_container_status(pod=remote_pod, container_name=self.container_name)
state = container_status.state if container_status else None
terminated = state.terminated if state else None
if terminated:
termination_time = terminated.finished_at
if termination_time:
return termination_time + timedelta(seconds=self.post_termination_timeout) > utcnow()
return False

def read_pod(self):
_now = utcnow()
if (
self.read_pod_cache is None
or self.last_read_pod_at + timedelta(seconds=self.read_pod_cache_timeout) < _now
):
self.read_pod_cache = self.pod_manager.read_pod(self.pod)
self.last_read_pod_at = _now
return self.read_pod_cache


@dataclass
class PodLoggingStatus:
"""Used for returning the status of the pod and last log time when exiting from `fetch_container_logs`"""
Expand Down Expand Up @@ -203,14 +301,22 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt
return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True)

def fetch_container_logs(
self, pod: V1Pod, container_name: str, *, follow=False, since_time: DateTime | None = None
self,
pod: V1Pod,
container_name: str,
*,
follow=False,
since_time: DateTime | None = None,
post_termination_timeout: int = 120,
) -> PodLoggingStatus:
"""
Follows the logs of container and streams to airflow logging.
Returns when container exits.
"""

def consume_logs(*, since_time: DateTime | None = None, follow: bool = True) -> DateTime | None:
def consume_logs(
*, since_time: DateTime | None = None, follow: bool = True, termination_timeout: int = 120
) -> DateTime | None:
"""
Tries to follow container logs until container completes.
For a long-running container, sometimes the log read may be interrupted
Expand All @@ -228,6 +334,7 @@ def consume_logs(*, since_time: DateTime | None = None, follow: bool = True) ->
math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None
),
follow=follow,
post_termination_timeout=termination_timeout,
)
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace")
Expand All @@ -251,7 +358,9 @@ def consume_logs(*, since_time: DateTime | None = None, follow: bool = True) ->
# So the looping logic is there to let us resume following the logs.
last_log_time = since_time
while True:
last_log_time = consume_logs(since_time=last_log_time, follow=follow)
last_log_time = consume_logs(
since_time=last_log_time, follow=follow, termination_timeout=post_termination_timeout
)
if not self.container_is_running(pod, container_name=container_name):
return PodLoggingStatus(running=False, last_log_time=last_log_time)
if not follow:
Expand Down Expand Up @@ -327,7 +436,8 @@ def read_pod_logs(
timestamps: bool = False,
since_seconds: int | None = None,
follow=True,
) -> Iterable[bytes]:
post_termination_timeout: int = 120,
) -> PodLogsConsumer:
"""Reads log from the POD"""
additional_kwargs = {}
if since_seconds:
Expand All @@ -337,7 +447,7 @@ def read_pod_logs(
additional_kwargs["tail_lines"] = tail_lines

try:
return self._client.read_namespaced_pod_log(
logs = self._client.read_namespaced_pod_log(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
container=container_name,
Expand All @@ -350,6 +460,14 @@ def read_pod_logs(
self.log.exception("There was an error reading the kubernetes API.")
raise

return PodLogsConsumer(
response=logs,
pod=pod,
pod_manager=self,
container_name=container_name,
post_termination_timeout=post_termination_timeout,
)

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
"""Reads events from the POD"""
Expand Down
Loading

0 comments on commit 6007df5

Please sign in to comment.