Skip to content

Commit

Permalink
Fix k8s pod.execute randomly stuck indefinitely by logs consumption (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
schattian committed May 10, 2022
1 parent 8280167 commit ccdbd54
Showing 1 changed file with 35 additions and 4 deletions.
39 changes: 35 additions & 4 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
"""Launches PODs"""
import asyncio
import json
import math
import time
Expand Down Expand Up @@ -193,6 +194,35 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt
)
return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True)

def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]:
for line in logs:
timestamp, message = self.parse_log_line(line.decode('utf-8'))
self.log.info(message)
return timestamp

def consume_container_logs_stream(self, pod: V1Pod, container_name: str, stream: Iterable[bytes]) -> Optional[DateTime]:
async def consume_log_stream() -> Optional[DateTime]:
return self.log_iterable(stream)

async def async_await_container_completion() -> None:
self.await_container_completion(pod=pod, container_name=container_name)

await_container_completion = asyncio.create_task(async_await_container_completion())
log_stream = asyncio.create_task(consume_log_stream())
asyncio.run(asyncio.wait({log_stream, await_container_completion}, return_when=asyncio.FIRST_COMPLETED))

if log_stream.done():
return log_stream.result()

log_stream.cancel()
self.log.warning(
"Pod %s log read was interrupted at some point caused by log rotation "
"see https://github.com/kubernetes/kubernetes/issues/59902 ",
"and https://github.com/apache/airflow/issues/23497 for reference.",
pod.metadata.name,
container_name,
)

def fetch_container_logs(
self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None
) -> PodLoggingStatus:
Expand Down Expand Up @@ -220,9 +250,10 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
),
follow=follow,
)
for line in logs:
timestamp, message = self.parse_log_line(line.decode('utf-8'))
self.log.info(message)
if follow:
timestamp = self.consume_container_logs_stream(pod, container_name, logs)
else:
timestamp = self.log_iterable(logs)
except BaseHTTPError as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
Expand Down Expand Up @@ -255,7 +286,7 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
time.sleep(1)

def await_container_completion(self, pod: V1Pod, container_name: str) -> None:
while not self.container_is_running(pod=pod, container_name=container_name):
while self.container_is_running(pod=pod, container_name=container_name):
time.sleep(1)

def await_pod_completion(self, pod: V1Pod) -> V1Pod:
Expand Down

0 comments on commit ccdbd54

Please sign in to comment.