Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add container logs to restart count error messages #383

Merged
merged 11 commits into from
Jan 6, 2022
2 changes: 1 addition & 1 deletion servo/connectors/kube_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async def check_metrics_api_permissions(self) -> None:
)
assert (
access_review.status.allowed
), f'Not allowed to "{verb}" resource "{resource}"'
), f'Not allowed to "{verb}" resource "{resource}" in group "{permission.group}"'

@servo.require('Metrics API connectivity')
async def check_metrics_api(self) -> None:
Expand Down
154 changes: 133 additions & 21 deletions servo/connectors/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
Mapping,
Optional,
Protocol,
Sequence,
Tuple,
Type,
Union,
Expand All @@ -46,11 +45,14 @@
import kubernetes_asyncio.client.exceptions
import kubernetes_asyncio.client.models
from kubernetes_asyncio.client.models.v1_container import V1Container
from kubernetes_asyncio.client.models.v1_container_status import V1ContainerStatus
from kubernetes_asyncio.client.models.v1_env_var import V1EnvVar
import kubernetes_asyncio.watch
import pydantic

import servo
from servo.telemetry import ONE_MiB
from servo.types.kubernetes import *

class Condition(servo.logging.Mixin):
"""A Condition is a convenience wrapper around a function and its arguments
Expand Down Expand Up @@ -1005,7 +1007,56 @@ async def is_ready(self) -> bool:
self.logger.trace(f"unable to find ready=true, continuing to wait...")
return False

async def raise_for_status(self, adjustments: List[servo.Adjustment]) -> None:
async def _try_get_container_log(self, api_client: kubernetes_asyncio.client.CoreV1Api, container: str, limit_bytes: int = ONE_MiB, previous = False) -> str:
"""Get logs for a container while handling common error cases (eg. Not Found)"""
try:
return await api_client.read_namespaced_pod_log(
name=self.name,
namespace=self.namespace,
container=container,
limit_bytes=limit_bytes,
previous=previous,
)
except kubernetes_asyncio.client.exceptions.ApiException as ae:
if ae.status == 400:
ae.data = ae.body
status: kubernetes_asyncio.client.models.V1Status = api_client.api_client.deserialize(ae, "V1Status")
if (status.message or "").endswith("not found"):
return "Logs not found"

raise

async def get_logs_for_container_statuses(
self,
container_statuses: list[V1ContainerStatus],
limit_bytes: int = ONE_MiB,
logs_selector: ContainerLogOptions = ContainerLogOptions.both,
) -> list[str]:
"""
Get container logs from the current pod for the container's whose statuses are provided in the list

Args:
container_statuses (list[V1ContainerStatus]): The name of the Container.
limit_bytes (int): Maximum bytes to provide per log (NOTE: this will be 2x per container )
logs_selector (ContainerLogOptions): "previous", "current", or "both"

Returns:
list[str]: List of logs per container in the same order as the list of container_statuses
"""
api_client: kubernetes_asyncio.client.CoreV1Api
async with self.api_client() as api_client:
read_logs_partial = functools.partial(self._try_get_container_log, api_client=api_client, limit_bytes=limit_bytes)
if logs_selector == ContainerLogOptions.both:
return [
f"previous (crash):\n {await read_logs_partial(container=cs.name, previous=True)} \n\n--- \n\n"
f"current (latest):\n {await read_logs_partial(container=cs.name, previous=False)}"
for cs in container_statuses
]
else:
previous = (logs_selector == ContainerLogOptions.previous)
return [ await read_logs_partial(container=cs.name, previous=previous) for cs in container_statuses ]

async def raise_for_status(self, adjustments: List[servo.Adjustment], include_container_logs = False) -> None:
"""Raise an exception if the Pod status is not not ready."""
# NOTE: operate off of current state, assuming you have checked is_ready()
status = self.obj.status
Expand All @@ -1026,11 +1077,23 @@ async def raise_for_status(self, adjustments: List[servo.Adjustment]) -> None:
if cont_stat.state and cont_stat.state.waiting and cont_stat.state.waiting.reason in ["ImagePullBackOff", "ErrImagePull"]:
raise servo.AdjustmentFailedError("Container image pull failure detected", reason="image-pull-failed")

restarted_container_statuses = list(filter(lambda cont_stat: cont_stat.restart_count > 0, (status.container_statuses or [])))
restarted_container_statuses: List[V1ContainerStatus] = [
cont_stat for cont_stat in status.container_statuses or [] if cont_stat.restart_count > 0
]
if restarted_container_statuses:
container_messages = list(map(lambda cont_stat: f"{cont_stat.name} x{cont_stat.restart_count}", restarted_container_statuses))
container_logs: list[str] = ["DISABLED" for _ in restarted_container_statuses]
if include_container_logs: # TODO enable logs config on per container basis
container_logs = await self.get_logs_for_container_statuses(restarted_container_statuses)
container_messages = [
(
f"{cont_stat.name} x{cont_stat.restart_count}"
f"{'' if not include_container_logs else f' container logs {container_logs[idx]}'}"
)
for idx, cont_stat in enumerate(restarted_container_statuses)
]
raise servo.AdjustmentRejectedError(
f"Tuning optimization {self.name} crash restart detected on container(s): {', '.join(container_messages)}",
# NOTE: cant use f-string with newline (backslash) insertion
(f"Tuning optimization {self.name} crash restart detected on container(s): " + ", \n".join(container_messages)),
reason="unstable"
)

Expand All @@ -1045,7 +1108,15 @@ async def raise_for_status(self, adjustments: List[servo.Adjustment]) -> None:
)

if cond.type == "Ready" and cond.status == "False":
raise servo.AdjustmentRejectedError(f"(reason {cond.reason}) {cond.message}", reason="start-failed")
rejection_message = cond.message
if include_container_logs and cond.reason == "ContainersNotReady":
unready_container_statuses: List[V1ContainerStatus] = [
cont_stat for cont_stat in status.container_statuses or [] if not cont_stat.ready
]
container_logs = await self.get_logs_for_container_statuses(unready_container_statuses)
# NOTE: cant use f-string with newline (backslash) insertion
rejection_message = (f"{rejection_message} container logs " + "\n\n--- \n\n".join(container_logs))
raise servo.AdjustmentRejectedError(f"(reason {cond.reason}) {rejection_message}", reason="start-failed")

# we only care about the condition type 'ready'
if cond.type.lower() != "ready":
Expand Down Expand Up @@ -2031,7 +2102,7 @@ def _check_conditions(self, conditions: List[kubernetes_asyncio.client.V1Deploym
f"unknown deployment status condition: {condition.status}"
)

async def raise_for_status(self, adjustments: List[servo.Adjustment]) -> None:
async def raise_for_status(self, adjustments: List[servo.Adjustment], include_container_logs = False) -> None:
# NOTE: operate off of current state, assuming you have checked is_ready()
status = self.obj.status
self.logger.trace(f"current deployment status is {status}")
Expand All @@ -2043,13 +2114,13 @@ async def raise_for_status(self, adjustments: List[servo.Adjustment]) -> None:

# Check for failure conditions
self._check_conditions(status.conditions)
await self.raise_for_failed_pod_adjustments(adjustments=adjustments)
await self.raise_for_failed_pod_adjustments(adjustments=adjustments, include_container_logs=include_container_logs)

# Catchall
self.logger.trace(f"unable to map deployment status to exception. Deployment: {self.obj}")
raise RuntimeError(f"Unknown Deployment status for '{self.name}': {status}")

async def raise_for_failed_pod_adjustments(self, adjustments: List[servo.Adjustment]):
async def raise_for_failed_pod_adjustments(self, adjustments: List[servo.Adjustment], include_container_logs = False):
pods = await self.get_latest_pods()
self.logger.trace(f"latest pod(s) status {list(map(lambda p: p.obj.status, pods))}")
unschedulable_pods = [
Expand Down Expand Up @@ -2087,14 +2158,33 @@ async def raise_for_failed_pod_adjustments(self, adjustments: List[servo.Adjustm
reason="image-pull-failed"
)

restarted_pods_container_statuses = [
restarted_pods_container_statuses: list[tuple[Pod, V1ContainerStatus]] = [
(pod, cont_stat) for pod in pods for cont_stat in (pod.obj.status.container_statuses or [])
if cont_stat.restart_count > 0
]
if restarted_pods_container_statuses:
container_logs: list[str] = ["DISABLED" for _ in range(len(restarted_pods_container_statuses))]
if include_container_logs: # TODO enable logs config on per container basis
# Reduce api requests to 1 per pod then fan back out into per container status list
curpod = restarted_pods_container_statuses[0][0]
curstats = []
for pod, container_status in restarted_pods_container_statuses:
if pod == curpod:
curstats.append(container_status)
else:
# Set up for next pod in list
container_logs.extend(await curpod.get_logs_for_container_statuses(curstats))
curpod = pod
curstats = [container_status]
# Get statuses for the last (or only) pod in the list
container_logs.extend(await curpod.get_logs_for_container_statuses(curstats))

pod_to_counts = collections.defaultdict(list)
for pod_cont_stat in restarted_pods_container_statuses:
pod_to_counts[pod_cont_stat[0].obj.metadata.name].append(f"{pod_cont_stat[1].name} x{pod_cont_stat[1].restart_count}")
for idx, (pod, cont_stat) in enumerate(restarted_pods_container_statuses):
pod_to_counts[pod.obj.metadata.name].append(
f"{cont_stat.name} x{cont_stat.restart_count} "
f"{'' if not include_container_logs else f' container logs {container_logs[idx]}'}"
)

pod_message = ", ".join(map(
lambda kv_tup: f"{kv_tup[0]} - {'; '.join(kv_tup[1])}",
Expand All @@ -2111,12 +2201,23 @@ async def raise_for_failed_pod_adjustments(self, adjustments: List[servo.Adjustm
if cond.type == "Ready" and cond.status == "False"
]
if unready_pod_conds:
pod_message = ", ".join(map(
lambda pod_cond: f"{pod_cond[0].obj.metadata.name} - (reason {pod_cond[1].reason}) {pod_cond[1].message}",
unready_pod_conds
))
pod_messages = []
for pod, cond in unready_pod_conds:
pod_message = f"{pod.obj.metadata.name} - (reason {cond.reason}) {cond.message}"

# TODO expand criteria for safely getting container logs and/or implement graceful fallback
if include_container_logs and cond.reason == "ContainersNotReady":
unready_container_statuses: List[V1ContainerStatus] = [
cont_stat for cont_stat in pod.obj.status.container_statuses or [] if not cont_stat.ready
]
container_logs = await pod.get_logs_for_container_statuses(unready_container_statuses)
# NOTE: cant use f-string with newline (backslash) insertion
pod_message = (f"{pod_message} container logs " + "\n\n--- \n\n".join(container_logs))

pod_messages.append(pod_message)

raise servo.AdjustmentRejectedError(
f"Found {len(unready_pod_conds)} unready pod(s) for deployment {self.name}: {pod_message}",
f"Found {len(unready_pod_conds)} unready pod(s) for deployment {self.name}: {', '.join(pod_messages)}",
reason="start-failed"
)

Expand Down Expand Up @@ -3373,7 +3474,10 @@ async def is_ready(self) -> bool:

async def raise_for_status(self) -> None:
"""Raise an exception if in an unhealthy state."""
await self.deployment.raise_for_status(adjustments=self.adjustments)
await self.deployment.raise_for_status(
adjustments=self.adjustments,
include_container_logs=self.deployment_config.container_logs_in_error_status
)


# TODO: Break down into CanaryDeploymentOptimization and CanaryContainerOptimization
Expand Down Expand Up @@ -3755,7 +3859,7 @@ async def create_tuning_pod(self) -> Pod:
await t
servo.logger.debug(f"Cancelled Task: {t}, progress: {progress}")

await tuning_pod.raise_for_status(adjustments=self.adjustments)
await self.raise_for_status(tuning_pod=tuning_pod)

# Load the in memory model for various convenience accessors
await tuning_pod.refresh()
Expand Down Expand Up @@ -4020,9 +4124,14 @@ async def is_ready(self) -> bool:
)
return is_ready and restart_count == 0

async def raise_for_status(self) -> None:
async def raise_for_status(self, tuning_pod = None) -> None:
"""Raise an exception if in an unhealthy state."""
await self.tuning_pod.raise_for_status(adjustments=self.adjustments)
if tuning_pod is None:
tuning_pod = self.tuning_pod
await tuning_pod.raise_for_status(
adjustments=self.adjustments,
include_container_logs=self.target_controller_config.container_logs_in_error_status
)


class Config:
Expand Down Expand Up @@ -4437,6 +4546,9 @@ class BaseKubernetesConfiguration(servo.BaseConfiguration):
timeout: Optional[servo.Duration] = pydantic.Field(
description="Time interval to wait before considering Kubernetes operations to have failed."
)
container_logs_in_error_status: bool = pydantic.Field(
False, description="Enable to include container logs in error message"
)

@pydantic.validator("on_failure")
def validate_failure_mode(cls, v):
Expand Down
4 changes: 4 additions & 0 deletions servo/connectors/opsani_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class OpsaniDevConfiguration(servo.BaseConfiguration):
settlement: Optional[servo.Duration] = pydantic.Field(
description="Duration to observe the application after an adjust to ensure the deployment is stable. May be overridden by optimizer supplied `control.adjust.settlement` value."
)
container_logs_in_error_status: bool = pydantic.Field(
False, description="Enable to include container logs in error message"
)

@pydantic.root_validator
def check_deployment_and_rollout(cls, values):
Expand Down Expand Up @@ -129,6 +132,7 @@ def generate_kubernetes_config(
description="Update the namespace, deployment, etc. to match your Kubernetes cluster",
timeout=self.timeout,
settlement=self.settlement,
container_logs_in_error_status=self.container_logs_in_error_status,
**main_arg,
**kwargs,
)
Expand Down
1 change: 1 addition & 0 deletions servo/types/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Promote all symbols from submodules to the top-level package (TODO is this bad practice?)
from .api import *
from .core import *
from .kubernetes import *
from .settings import *
from .slo import *
6 changes: 6 additions & 0 deletions servo/types/kubernetes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import enum

class ContainerLogOptions(str, enum.Enum):
previous = "previous"
current = "current"
both = "both"
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ filterwarnings =
ignore: .*Using or importing the ABCs from 'collections':DeprecationWarning
ignore: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead:DeprecationWarning
ignore: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10:DeprecationWarning
ignore: distutils Version classes are deprecated. Use packaging.version instead.:DeprecationWarning
ignore: unclosed file <_io.TextIOWrapper name=.*:ResourceWarning
ignore: unclosed file <_io.FileIO name=.*:ResourceWarning
ignore: unclosed <ssl.SSLSocket .*:ResourceWarning
Expand Down
2 changes: 1 addition & 1 deletion tests/connectors/opsani_dev_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_generate(self) -> None:
config = servo.connectors.opsani_dev.OpsaniDevConfiguration.generate()
assert list(config.dict().keys()) == [
'description', 'namespace', 'deployment', 'rollout', 'container', 'service','port', 'cpu', 'memory', 'env',
'static_environment_variables', 'prometheus_base_url', 'envoy_sidecar_image', 'timeout', 'settlement'
'static_environment_variables', 'prometheus_base_url', 'envoy_sidecar_image', 'timeout', 'settlement', 'container_logs_in_error_status'
]

def test_generate_yaml(self) -> None:
Expand Down
5 changes: 3 additions & 2 deletions tests/connectors/prometheus_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import freezegun
import httpx
import kubetest.client
import pydantic
import pytest
import pytz
Expand Down Expand Up @@ -373,8 +374,8 @@ def optimizer(self) -> servo.Optimizer:
)

@pytest.fixture(autouse=True)
def _wait_for_cluster(self, kube) -> None:
kube.wait_for_registered()
def _wait_for_cluster(self, kube: kubetest.client.TestClient) -> None:
kube.wait_for_registered(timeout=1800)

async def test_targets(
self,
Expand Down