Skip to content

Commit

Permalink
Avoid logging empty line KPO (#38247)
Browse files Browse the repository at this point in the history
* Avoid logging empty line KPO

* cleanup

* Apply review suggestions

* Apply review feedback

* Update airflow/providers/cncf/kubernetes/operators/pod.py
  • Loading branch information
pankajastro authored Apr 15, 2024
1 parent ad1e473 commit 43919c2
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
3 changes: 2 additions & 1 deletion airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,8 @@ def write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime
)
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
self.log.info("Container logs: %s", line)
if line:
self.log.info("Container logs: %s", line)
except HTTPError as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
Expand Down
6 changes: 4 additions & 2 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,8 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
self._callbacks.progress_callback(
line=line, client=self._client, mode=ExecutionMode.SYNC
)
self.log.info("[%s] %s", container_name, message_to_log)
if message_to_log is not None:
self.log.info("[%s] %s", container_name, message_to_log)
last_captured_timestamp = message_timestamp
message_to_log = message
message_timestamp = line_timestamp
Expand All @@ -481,7 +482,8 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
self._callbacks.progress_callback(
line=line, client=self._client, mode=ExecutionMode.SYNC
)
self.log.info("[%s] %s", container_name, message_to_log)
if message_to_log is not None:
self.log.info("[%s] %s", container_name, message_to_log)
last_captured_timestamp = message_timestamp
except TimeoutError as e:
# in case of timeout, increment return time by 2 seconds to avoid
Expand Down
15 changes: 15 additions & 0 deletions tests/providers/cncf/kubernetes/utils/test_pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,21 @@ def test_read_pod_logs_retries_successfully(self):
]
)

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
def test_fetch_container_logs_do_not_log_none(self, mock_container_is_running, caplog):
MockWrapper.reset()
caplog.set_level(logging.INFO)

def consumer_iter():
"""This will simulate a container that hasn't produced any logs in the last read_timeout window"""
yield from ()

with mock.patch.object(PodLogsConsumer, "__iter__") as mock_consumer_iter:
mock_consumer_iter.side_effect = consumer_iter
mock_container_is_running.side_effect = [True, True, False]
self.pod_manager.fetch_container_logs(mock.MagicMock(), "container-name", follow=True)
assert "[container-name] None" not in (record.message for record in caplog.records)

def test_read_pod_logs_retries_fails(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.side_effect = [
Expand Down

0 comments on commit 43919c2

Please sign in to comment.