Skip to content

Commit

Permalink
Tweak logs handling and tests (#708)
Browse files Browse the repository at this point in the history
  • Loading branch information
serhiy-storchaka authored Jun 8, 2022
1 parent 499797c commit 5c00d19
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 26 deletions.
17 changes: 14 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,24 @@ else
endif

test_unit:
pytest -vv --cov=platform_monitoring --cov-report xml:.coverage-unit.xml tests/unit
pytest -vv \
--cov=platform_monitoring --cov-report xml:.coverage-unit.xml \
tests/unit

test_integration:
pytest -vv --maxfail=1 --cov=platform_monitoring --cov-report xml:.coverage-integration.xml tests/integration -m "not minikube"
pytest -vv \
--maxfail=3 \
--cov=platform_monitoring --cov-report xml:.coverage-integration.xml \
--durations=10 \
tests/integration \
-m "not minikube"

test_integration_minikube:
pytest -vv --log-cli-level=debug tests/integration -m minikube
pytest -vv \
--log-cli-level=debug \
--durations=10 \
tests/integration \
-m minikube

docker_build:
rm -rf build dist
Expand Down
24 changes: 13 additions & 11 deletions platform_monitoring/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import aiohttp
from aiobotocore.client import AioBaseClient
from aiobotocore.response import StreamingBody
from aioelasticsearch import Elasticsearch
from aioelasticsearch import Elasticsearch, RequestError
from aioelasticsearch.helpers import Scan
from async_timeout import timeout
from neuro_logging import trace
Expand Down Expand Up @@ -216,7 +216,7 @@ def _combine_search_query(self) -> dict[str, Any]:

async def __aenter__(self) -> AsyncIterator[bytes]:
query = self._combine_search_query()
self._scan = Scan(
scan = Scan(
self._es_client,
index=self._index,
doc_type=self._doc_type,
Expand All @@ -230,20 +230,25 @@ async def __aenter__(self) -> AsyncIterator[bytes]:
preserve_order=True,
size=100,
)
await self._scan.__aenter__()
try:
await scan.__aenter__()
self._scan = scan
except RequestError:
pass
self._iterator = self._iterate()
return self._iterator

async def __aexit__(self, *args: Any) -> None:
assert self._iterator
await self._iterator.aclose() # type: ignore
assert self._scan
scan = self._scan
self._scan = None
await scan.__aexit__(*args)
if scan is not None:
await scan.__aexit__(*args)

async def _iterate(self) -> AsyncIterator[bytes]:
assert self._scan
if self._scan is None:
return
async for doc in self._scan:
try:
source = doc["_source"]
Expand Down Expand Up @@ -727,14 +732,11 @@ async def get_first_log_entry_time(
chunk = await stream.readany()
if not chunk:
break
pos = chunk.find(b" ")
pos = chunk.find(b" "[0])
if pos >= 0:
time_str += chunk[:pos]
break
else:
time_str += chunk
else:
return None
time_str += chunk
except (asyncio.TimeoutError, JobNotFoundException):
return None
try:
Expand Down
107 changes: 95 additions & 12 deletions tests/integration/test_kube.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import logging
import re
import uuid
from collections.abc import AsyncIterator
from contextlib import AbstractAsyncContextManager
from datetime import datetime, timedelta, timezone
from typing import Any
from typing import Any, Union
from unittest import mock
from uuid import uuid4

Expand Down Expand Up @@ -37,6 +38,8 @@
from .conftest_kube import MyKubeClient, MyPodDescriptor
from tests.integration.conftest import ApiAddress, create_local_app_server

logger = logging.getLogger(__name__)


@pytest.fixture
def job_pod() -> MyPodDescriptor:
Expand Down Expand Up @@ -522,7 +525,8 @@ async def _consume_log_reader(
if delay:
await asyncio.sleep(delay)
except asyncio.CancelledError:
pass
logger.exception("Cancelled logs reading")
buffer += b"CANCELLED"
return bytes(buffer)

def _remove_timestamps(self, data: bytes) -> bytes:
Expand Down Expand Up @@ -1069,6 +1073,75 @@ async def test_get_job_s3_log_reader(
kube_config, kube_client, s3_log_service, job_pod
)

async def _test_empty_log_reader(
self,
kube_client: MyKubeClient,
job_pod: MyPodDescriptor,
factory: LogsService,
) -> None:
command = "sleep 5"
job_pod.set_command(command)
names = []
tasks = []

def run_log_reader(name: str, delay: float = 0) -> None:
async def coro() -> Union[bytes, Exception]:
await asyncio.sleep(delay)
try:
async with timeout(60.0):
log_reader = factory.get_pod_log_reader(
job_pod.name, separator=b"===", archive_delay_s=20.0
)
return await self._consume_log_reader(log_reader)
except Exception as e:
logger.exception("Error in logs reading for %r", name)
return e

names.append(name)
task = asyncio.ensure_future(coro())
tasks.append(task)

try:
await kube_client.create_pod(job_pod.payload)
run_log_reader("created")
await kube_client.wait_pod_is_running(pod_name=job_pod.name, timeout_s=60.0)
for i in range(4):
run_log_reader(f"started [{i}]", delay=i * 2)
await kube_client.wait_pod_is_terminated(job_pod.name)
finally:
await kube_client.delete_pod(job_pod.name)
run_log_reader("deleting")
await kube_client.wait_pod_is_deleted(job_pod.name)
run_log_reader("deleted")

payloads = await asyncio.gather(*tasks)

# Output for debugging
for i, (name, payload) in enumerate(zip(names, payloads)):
print(f"{i}. {name}: {payload!r}")

# All logs are completely either live or archive, no separator.
for name, payload in zip(names, payloads):
assert payload == b"", name

async def test_elasticsearch_empty_log_reader(
self,
kube_client: MyKubeClient,
elasticsearch_log_service: LogsService,
job_pod: MyPodDescriptor,
) -> None:
await self._test_empty_log_reader(
kube_client, job_pod, elasticsearch_log_service
)

async def test_s3_empty_log_reader(
self,
kube_client: MyKubeClient,
s3_log_service: LogsService,
job_pod: MyPodDescriptor,
) -> None:
await self._test_empty_log_reader(kube_client, job_pod, s3_log_service)

async def _test_merged_log_reader(
self,
kube_client: MyKubeClient,
Expand All @@ -1081,12 +1154,17 @@ async def _test_merged_log_reader(
tasks = []

def run_log_reader(name: str, delay: float = 0) -> None:
async def coro() -> bytes:
async def coro() -> Union[bytes, Exception]:
await asyncio.sleep(delay)
log_reader = factory.get_pod_log_reader(
job_pod.name, separator=b"===", archive_delay_s=10.0
)
return await self._consume_log_reader(log_reader)
try:
async with timeout(60.0):
log_reader = factory.get_pod_log_reader(
job_pod.name, separator=b"===", archive_delay_s=10.0
)
return await self._consume_log_reader(log_reader)
except Exception as e:
logger.exception("Error in logs reading for %r", name)
return e

names.append(name)
task = asyncio.ensure_future(coro())
Expand Down Expand Up @@ -1149,12 +1227,17 @@ async def _test_merged_log_reader_restarted(
tasks = []

def run_log_reader(name: str, delay: float = 0) -> None:
async def coro() -> bytes:
async def coro() -> Union[bytes, Exception]:
await asyncio.sleep(delay)
log_reader = factory.get_pod_log_reader(
job_pod.name, separator=b"===", archive_delay_s=20.0
)
return await self._consume_log_reader(log_reader)
try:
async with timeout(90.0):
log_reader = factory.get_pod_log_reader(
job_pod.name, separator=b"===", archive_delay_s=20.0
)
return await self._consume_log_reader(log_reader)
except Exception as e:
logger.exception("Error in logs reading for %r", name)
return e

names.append(name)
task = asyncio.ensure_future(coro())
Expand Down

0 comments on commit 5c00d19

Please sign in to comment.