From ccdbd5412d2c3674483ccb3b1cc9dad67b59210d Mon Sep 17 00:00:00 2001 From: schattian Date: Tue, 10 May 2022 15:01:20 +0200 Subject: [PATCH] Fix k8s pod.execute randomly stuck indefinitely by logs consumption (#23497) --- .../cncf/kubernetes/utils/pod_manager.py | 39 +++++++++++++++++-- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 993ba12e313fe..fd8fb2299752e 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. """Launches PODs""" +import asyncio import json import math import time @@ -193,6 +194,35 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt ) return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True) + def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]: + for line in logs: + timestamp, message = self.parse_log_line(line.decode('utf-8')) + self.log.info(message) + return timestamp + + def consume_container_logs_stream(self, pod: V1Pod, container_name: str, stream: Iterable[bytes]) -> Optional[DateTime]: + async def consume_log_stream() -> Optional[DateTime]: + return self.log_iterable(stream) + + async def async_await_container_completion() -> None: + self.await_container_completion(pod=pod, container_name=container_name) + + await_container_completion = asyncio.create_task(async_await_container_completion()) + log_stream = asyncio.create_task(consume_log_stream()) + asyncio.run(asyncio.wait({log_stream, await_container_completion}, return_when=asyncio.FIRST_COMPLETED)) + + if log_stream.done(): + return log_stream.result() + + log_stream.cancel() + self.log.warning( + "Pod %s log read was interrupted at some point caused by log rotation " + "see https://github.com/kubernetes/kubernetes/issues/59902 ", + "and https://github.com/apache/airflow/issues/23497 for reference.", + pod.metadata.name, + container_name, + ) + def fetch_container_logs( self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None ) -> PodLoggingStatus: @@ -220,9 +250,10 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) ), follow=follow, ) - for line in logs: - timestamp, message = self.parse_log_line(line.decode('utf-8')) - self.log.info(message) + if follow: + timestamp = self.consume_container_logs_stream(pod, container_name, logs) + else: + timestamp = self.log_iterable(logs) except BaseHTTPError as e: self.log.warning( "Reading of logs interrupted with error %r; will retry. " @@ -255,7 +286,7 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) time.sleep(1) def await_container_completion(self, pod: V1Pod, container_name: str) -> None: - while not self.container_is_running(pod=pod, container_name=container_name): + while self.container_is_running(pod=pod, container_name=container_name): time.sleep(1) def await_pod_completion(self, pod: V1Pod) -> V1Pod: