Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix k8s pod.execute randomly stuck indefinitely by logs consumption (#23497) #23618

Merged
merged 6 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
"""Launches PODs"""
import asyncio
import concurrent
import json
import math
import time
Expand Down Expand Up @@ -193,6 +195,40 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt
)
return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True)

def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]:
timestamp = None
for line in logs:
timestamp, message = self.parse_log_line(line.decode('utf-8', errors="backslashreplace"))
self.log.info(message)
return timestamp

def consume_container_logs_stream(
self, pod: V1Pod, container_name: str, stream: Iterable[bytes]
) -> Optional[DateTime]:
async def async_await_container_completion() -> None:
await asyncio.sleep(1)
while self.container_is_running(pod=pod, container_name=container_name):
await asyncio.sleep(1)

loop = asyncio.get_event_loop()
await_container_completion = loop.create_task(async_await_container_completion())
log_stream = asyncio.ensure_future(loop.run_in_executor(None, self.log_iterable, stream))
tasks: Iterable[asyncio.Task] = {await_container_completion, log_stream}
loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
if log_stream.done():
return log_stream.result()

log_stream.cancel()
try:
loop.run_until_complete(log_stream)
except concurrent.futures.CancelledError:
self.log.warning(
"Container %s log read was interrupted at some point caused by log rotation "
"see https://github.com/apache/airflow/issues/23497 for reference.",
container_name,
)
return None

def fetch_container_logs(
self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None
) -> PodLoggingStatus:
Expand Down Expand Up @@ -220,10 +256,11 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
),
follow=follow,
)
for raw_line in logs:
line = raw_line.decode('utf-8', errors="backslashreplace")
timestamp, message = self.parse_log_line(line)
self.log.info(message)
Comment on lines -223 to -226
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you remove it?
This was just added in https://github.com/apache/airflow/pull/23301/files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didnt. It is happening in log_iterable. I just moved the new kwarg (, errors="backslashreplace") which was added by the pr you linked

if follow:
timestamp = self.consume_container_logs_stream(pod, container_name, logs)
else:
timestamp = self.log_iterable(logs)

except BaseHTTPError as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
Expand Down Expand Up @@ -256,7 +293,7 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
time.sleep(1)

def await_container_completion(self, pod: V1Pod, container_name: str) -> None:
while not self.container_is_running(pod=pod, container_name=container_name):
while self.container_is_running(pod=pod, container_name=container_name):
Copy link
Contributor Author

@schattian schattian May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is the change unrelated to the issue.
I think the logic was reversed, blocking until the container is running (instead of blocking til terminated).

Tests are all mocking the function, so this is not detected by them.

Probably that's not a great deal as the default parameter for the operator is get_logs=True thus this is unused (til now, as i'm using it on fetch_container_logs).

I dont know if this is out of the scope of this pr and should be added separately - which guideline do you follow for these cases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does look suspiciously wrong indeed. Since this is used in your fix, I see no problem with having it as part of the PR.

time.sleep(1)

def await_pod_completion(self, pod: V1Pod) -> V1Pod:
Expand Down
28 changes: 26 additions & 2 deletions tests/providers/cncf/kubernetes/utils/test_pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
import logging
import time
from typing import Generator
from unittest import mock
from unittest.mock import MagicMock

Expand Down Expand Up @@ -312,7 +314,7 @@ def test_fetch_container_since_time(self, container_running, mock_now):
args, kwargs = self.mock_kube_client.read_namespaced_pod_log.call_args_list[0]
assert kwargs['since_seconds'] == 5

@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False), (False, 1, True)])
@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 4, False), (False, 1, True)])
@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running')
def test_fetch_container_running_follow(
self, container_running_mock, follow, is_running_calls, exp_running
Expand All @@ -322,13 +324,35 @@ def test_fetch_container_running_follow(
When called with follow=False, should return immediately even though still running.
"""
mock_pod = MagicMock()
container_running_mock.side_effect = [True, True, False] # only will be called once
container_running_mock.side_effect = [True, False, False, False] # called once when follow=False
self.mock_kube_client.read_namespaced_pod_log.return_value = [b'2021-01-01 hi']
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow)
assert len(container_running_mock.call_args_list) == is_running_calls
assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone('UTC'))
assert ret.running is exp_running

@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False)])
@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running')
def test_fetch_container_running_follow_when_kube_api_hangs(
self, container_running_mock, follow, is_running_calls, exp_running
):
"""
When called with follow, should keep looping even after disconnections, if pod still running.
"""
mock_pod = MagicMock()
container_running_mock.side_effect = [True, False, False]

def stream_logs() -> Generator:
while True:
time.sleep(1) # this is intentional: urllib3.response.stream() is not async
yield b'2021-01-01 hi'

self.mock_kube_client.read_namespaced_pod_log.return_value = stream_logs()
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow)
assert len(container_running_mock.call_args_list) == is_running_calls
assert ret.running is exp_running
assert ret.last_log_time is None


def params_for_test_container_is_running():
"""The `container_is_running` method is designed to handle an assortment of bad objects
Expand Down