diff --git a/servo/connectors/kube_metrics.py b/servo/connectors/kube_metrics.py index ac2309809..f5e8a7ea3 100644 --- a/servo/connectors/kube_metrics.py +++ b/servo/connectors/kube_metrics.py @@ -143,7 +143,7 @@ async def check_metrics_api_permissions(self) -> None: ) assert ( access_review.status.allowed - ), f'Not allowed to "{verb}" resource "{resource}"' + ), f'Not allowed to "{verb}" resource "{resource}" in group "{permission.group}"' @servo.require('Metrics API connectivity') async def check_metrics_api(self) -> None: diff --git a/servo/connectors/kubernetes.py b/servo/connectors/kubernetes.py index 748cce6e6..4b6f7c54e 100644 --- a/servo/connectors/kubernetes.py +++ b/servo/connectors/kubernetes.py @@ -30,7 +30,6 @@ Mapping, Optional, Protocol, - Sequence, Tuple, Type, Union, @@ -46,11 +45,14 @@ import kubernetes_asyncio.client.exceptions import kubernetes_asyncio.client.models from kubernetes_asyncio.client.models.v1_container import V1Container +from kubernetes_asyncio.client.models.v1_container_status import V1ContainerStatus from kubernetes_asyncio.client.models.v1_env_var import V1EnvVar import kubernetes_asyncio.watch import pydantic import servo +from servo.telemetry import ONE_MiB +from servo.types.kubernetes import * class Condition(servo.logging.Mixin): """A Condition is a convenience wrapper around a function and its arguments @@ -1005,7 +1007,56 @@ async def is_ready(self) -> bool: self.logger.trace(f"unable to find ready=true, continuing to wait...") return False - async def raise_for_status(self, adjustments: List[servo.Adjustment]) -> None: + async def _try_get_container_log(self, api_client: kubernetes_asyncio.client.CoreV1Api, container: str, limit_bytes: int = ONE_MiB, previous = False) -> str: + """Get logs for a container while handling common error cases (eg. Not Found)""" + try: + return await api_client.read_namespaced_pod_log( + name=self.name, + namespace=self.namespace, + container=container, + limit_bytes=limit_bytes, + previous=previous, + ) + except kubernetes_asyncio.client.exceptions.ApiException as ae: + if ae.status == 400: + ae.data = ae.body + status: kubernetes_asyncio.client.models.V1Status = api_client.api_client.deserialize(ae, "V1Status") + if (status.message or "").endswith("not found"): + return "Logs not found" + + raise + + async def get_logs_for_container_statuses( + self, + container_statuses: list[V1ContainerStatus], + limit_bytes: int = ONE_MiB, + logs_selector: ContainerLogOptions = ContainerLogOptions.both, + ) -> list[str]: + """ + Get container logs from the current pod for the container's whose statuses are provided in the list + + Args: + container_statuses (list[V1ContainerStatus]): The name of the Container. + limit_bytes (int): Maximum bytes to provide per log (NOTE: this will be 2x per container ) + logs_selector (ContainerLogOptions): "previous", "current", or "both" + + Returns: + list[str]: List of logs per container in the same order as the list of container_statuses + """ + api_client: kubernetes_asyncio.client.CoreV1Api + async with self.api_client() as api_client: + read_logs_partial = functools.partial(self._try_get_container_log, api_client=api_client, limit_bytes=limit_bytes) + if logs_selector == ContainerLogOptions.both: + return [ + f"previous (crash):\n {await read_logs_partial(container=cs.name, previous=True)} \n\n--- \n\n" + f"current (latest):\n {await read_logs_partial(container=cs.name, previous=False)}" + for cs in container_statuses + ] + else: + previous = (logs_selector == ContainerLogOptions.previous) + return [ await read_logs_partial(container=cs.name, previous=previous) for cs in container_statuses ] + + async def raise_for_status(self, adjustments: List[servo.Adjustment], include_container_logs = False) -> None: """Raise an exception if the Pod status is not not ready.""" # NOTE: operate off of current state, assuming you have checked is_ready() status = self.obj.status @@ -1026,11 +1077,23 @@ async def raise_for_status(self, adjustments: List[servo.Adjustment]) -> None: if cont_stat.state and cont_stat.state.waiting and cont_stat.state.waiting.reason in ["ImagePullBackOff", "ErrImagePull"]: raise servo.AdjustmentFailedError("Container image pull failure detected", reason="image-pull-failed") - restarted_container_statuses = list(filter(lambda cont_stat: cont_stat.restart_count > 0, (status.container_statuses or []))) + restarted_container_statuses: List[V1ContainerStatus] = [ + cont_stat for cont_stat in status.container_statuses or [] if cont_stat.restart_count > 0 + ] if restarted_container_statuses: - container_messages = list(map(lambda cont_stat: f"{cont_stat.name} x{cont_stat.restart_count}", restarted_container_statuses)) + container_logs: list[str] = ["DISABLED" for _ in restarted_container_statuses] + if include_container_logs: # TODO enable logs config on per container basis + container_logs = await self.get_logs_for_container_statuses(restarted_container_statuses) + container_messages = [ + ( + f"{cont_stat.name} x{cont_stat.restart_count}" + f"{'' if not include_container_logs else f' container logs {container_logs[idx]}'}" + ) + for idx, cont_stat in enumerate(restarted_container_statuses) + ] raise servo.AdjustmentRejectedError( - f"Tuning optimization {self.name} crash restart detected on container(s): {', '.join(container_messages)}", + # NOTE: cant use f-string with newline (backslash) insertion + (f"Tuning optimization {self.name} crash restart detected on container(s): " + ", \n".join(container_messages)), reason="unstable" ) @@ -1045,7 +1108,15 @@ async def raise_for_status(self, adjustments: List[servo.Adjustment]) -> None: ) if cond.type == "Ready" and cond.status == "False": - raise servo.AdjustmentRejectedError(f"(reason {cond.reason}) {cond.message}", reason="start-failed") + rejection_message = cond.message + if include_container_logs and cond.reason == "ContainersNotReady": + unready_container_statuses: List[V1ContainerStatus] = [ + cont_stat for cont_stat in status.container_statuses or [] if not cont_stat.ready + ] + container_logs = await self.get_logs_for_container_statuses(unready_container_statuses) + # NOTE: cant use f-string with newline (backslash) insertion + rejection_message = (f"{rejection_message} container logs " + "\n\n--- \n\n".join(container_logs)) + raise servo.AdjustmentRejectedError(f"(reason {cond.reason}) {rejection_message}", reason="start-failed") # we only care about the condition type 'ready' if cond.type.lower() != "ready": @@ -2031,7 +2102,7 @@ def _check_conditions(self, conditions: List[kubernetes_asyncio.client.V1Deploym f"unknown deployment status condition: {condition.status}" ) - async def raise_for_status(self, adjustments: List[servo.Adjustment]) -> None: + async def raise_for_status(self, adjustments: List[servo.Adjustment], include_container_logs = False) -> None: # NOTE: operate off of current state, assuming you have checked is_ready() status = self.obj.status self.logger.trace(f"current deployment status is {status}") @@ -2043,13 +2114,13 @@ async def raise_for_status(self, adjustments: List[servo.Adjustment]) -> None: # Check for failure conditions self._check_conditions(status.conditions) - await self.raise_for_failed_pod_adjustments(adjustments=adjustments) + await self.raise_for_failed_pod_adjustments(adjustments=adjustments, include_container_logs=include_container_logs) # Catchall self.logger.trace(f"unable to map deployment status to exception. Deployment: {self.obj}") raise RuntimeError(f"Unknown Deployment status for '{self.name}': {status}") - async def raise_for_failed_pod_adjustments(self, adjustments: List[servo.Adjustment]): + async def raise_for_failed_pod_adjustments(self, adjustments: List[servo.Adjustment], include_container_logs = False): pods = await self.get_latest_pods() self.logger.trace(f"latest pod(s) status {list(map(lambda p: p.obj.status, pods))}") unschedulable_pods = [ @@ -2087,14 +2158,33 @@ async def raise_for_failed_pod_adjustments(self, adjustments: List[servo.Adjustm reason="image-pull-failed" ) - restarted_pods_container_statuses = [ + restarted_pods_container_statuses: list[tuple[Pod, V1ContainerStatus]] = [ (pod, cont_stat) for pod in pods for cont_stat in (pod.obj.status.container_statuses or []) if cont_stat.restart_count > 0 ] if restarted_pods_container_statuses: + container_logs: list[str] = ["DISABLED" for _ in range(len(restarted_pods_container_statuses))] + if include_container_logs: # TODO enable logs config on per container basis + # Reduce api requests to 1 per pod then fan back out into per container status list + curpod = restarted_pods_container_statuses[0][0] + curstats = [] + for pod, container_status in restarted_pods_container_statuses: + if pod == curpod: + curstats.append(container_status) + else: + # Set up for next pod in list + container_logs.extend(await curpod.get_logs_for_container_statuses(curstats)) + curpod = pod + curstats = [container_status] + # Get statuses for the last (or only) pod in the list + container_logs.extend(await curpod.get_logs_for_container_statuses(curstats)) + pod_to_counts = collections.defaultdict(list) - for pod_cont_stat in restarted_pods_container_statuses: - pod_to_counts[pod_cont_stat[0].obj.metadata.name].append(f"{pod_cont_stat[1].name} x{pod_cont_stat[1].restart_count}") + for idx, (pod, cont_stat) in enumerate(restarted_pods_container_statuses): + pod_to_counts[pod.obj.metadata.name].append( + f"{cont_stat.name} x{cont_stat.restart_count} " + f"{'' if not include_container_logs else f' container logs {container_logs[idx]}'}" + ) pod_message = ", ".join(map( lambda kv_tup: f"{kv_tup[0]} - {'; '.join(kv_tup[1])}", @@ -2111,12 +2201,23 @@ async def raise_for_failed_pod_adjustments(self, adjustments: List[servo.Adjustm if cond.type == "Ready" and cond.status == "False" ] if unready_pod_conds: - pod_message = ", ".join(map( - lambda pod_cond: f"{pod_cond[0].obj.metadata.name} - (reason {pod_cond[1].reason}) {pod_cond[1].message}", - unready_pod_conds - )) + pod_messages = [] + for pod, cond in unready_pod_conds: + pod_message = f"{pod.obj.metadata.name} - (reason {cond.reason}) {cond.message}" + + # TODO expand criteria for safely getting container logs and/or implement graceful fallback + if include_container_logs and cond.reason == "ContainersNotReady": + unready_container_statuses: List[V1ContainerStatus] = [ + cont_stat for cont_stat in pod.obj.status.container_statuses or [] if not cont_stat.ready + ] + container_logs = await pod.get_logs_for_container_statuses(unready_container_statuses) + # NOTE: cant use f-string with newline (backslash) insertion + pod_message = (f"{pod_message} container logs " + "\n\n--- \n\n".join(container_logs)) + + pod_messages.append(pod_message) + raise servo.AdjustmentRejectedError( - f"Found {len(unready_pod_conds)} unready pod(s) for deployment {self.name}: {pod_message}", + f"Found {len(unready_pod_conds)} unready pod(s) for deployment {self.name}: {', '.join(pod_messages)}", reason="start-failed" ) @@ -3373,7 +3474,10 @@ async def is_ready(self) -> bool: async def raise_for_status(self) -> None: """Raise an exception if in an unhealthy state.""" - await self.deployment.raise_for_status(adjustments=self.adjustments) + await self.deployment.raise_for_status( + adjustments=self.adjustments, + include_container_logs=self.deployment_config.container_logs_in_error_status + ) # TODO: Break down into CanaryDeploymentOptimization and CanaryContainerOptimization @@ -3755,7 +3859,7 @@ async def create_tuning_pod(self) -> Pod: await t servo.logger.debug(f"Cancelled Task: {t}, progress: {progress}") - await tuning_pod.raise_for_status(adjustments=self.adjustments) + await self.raise_for_status(tuning_pod=tuning_pod) # Load the in memory model for various convenience accessors await tuning_pod.refresh() @@ -4020,9 +4124,14 @@ async def is_ready(self) -> bool: ) return is_ready and restart_count == 0 - async def raise_for_status(self) -> None: + async def raise_for_status(self, tuning_pod = None) -> None: """Raise an exception if in an unhealthy state.""" - await self.tuning_pod.raise_for_status(adjustments=self.adjustments) + if tuning_pod is None: + tuning_pod = self.tuning_pod + await tuning_pod.raise_for_status( + adjustments=self.adjustments, + include_container_logs=self.target_controller_config.container_logs_in_error_status + ) class Config: @@ -4437,6 +4546,9 @@ class BaseKubernetesConfiguration(servo.BaseConfiguration): timeout: Optional[servo.Duration] = pydantic.Field( description="Time interval to wait before considering Kubernetes operations to have failed." ) + container_logs_in_error_status: bool = pydantic.Field( + False, description="Enable to include container logs in error message" + ) @pydantic.validator("on_failure") def validate_failure_mode(cls, v): diff --git a/servo/connectors/opsani_dev.py b/servo/connectors/opsani_dev.py index 57e7298d9..e523a6b3f 100644 --- a/servo/connectors/opsani_dev.py +++ b/servo/connectors/opsani_dev.py @@ -66,6 +66,9 @@ class OpsaniDevConfiguration(servo.BaseConfiguration): settlement: Optional[servo.Duration] = pydantic.Field( description="Duration to observe the application after an adjust to ensure the deployment is stable. May be overridden by optimizer supplied `control.adjust.settlement` value." ) + container_logs_in_error_status: bool = pydantic.Field( + False, description="Enable to include container logs in error message" + ) @pydantic.root_validator def check_deployment_and_rollout(cls, values): @@ -129,6 +132,7 @@ def generate_kubernetes_config( description="Update the namespace, deployment, etc. to match your Kubernetes cluster", timeout=self.timeout, settlement=self.settlement, + container_logs_in_error_status=self.container_logs_in_error_status, **main_arg, **kwargs, ) diff --git a/servo/types/__init__.py b/servo/types/__init__.py index 871f526ca..b3300c1c9 100644 --- a/servo/types/__init__.py +++ b/servo/types/__init__.py @@ -1,5 +1,6 @@ # Promote all symbols from submodules to the top-level package (TODO is this bad practice?) from .api import * from .core import * +from .kubernetes import * from .settings import * from .slo import * diff --git a/servo/types/kubernetes.py b/servo/types/kubernetes.py new file mode 100644 index 000000000..b5ba27e15 --- /dev/null +++ b/servo/types/kubernetes.py @@ -0,0 +1,6 @@ +import enum + +class ContainerLogOptions(str, enum.Enum): + previous = "previous" + current = "current" + both = "both" diff --git a/setup.cfg b/setup.cfg index e2b26dbe1..8d6da3141 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,6 +7,7 @@ filterwarnings = ignore: .*Using or importing the ABCs from 'collections':DeprecationWarning ignore: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead:DeprecationWarning ignore: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10:DeprecationWarning + ignore: distutils Version classes are deprecated. Use packaging.version instead.:DeprecationWarning ignore: unclosed file <_io.TextIOWrapper name=.*:ResourceWarning ignore: unclosed file <_io.FileIO name=.*:ResourceWarning ignore: unclosed None: config = servo.connectors.opsani_dev.OpsaniDevConfiguration.generate() assert list(config.dict().keys()) == [ 'description', 'namespace', 'deployment', 'rollout', 'container', 'service','port', 'cpu', 'memory', 'env', - 'static_environment_variables', 'prometheus_base_url', 'envoy_sidecar_image', 'timeout', 'settlement' + 'static_environment_variables', 'prometheus_base_url', 'envoy_sidecar_image', 'timeout', 'settlement', 'container_logs_in_error_status' ] def test_generate_yaml(self) -> None: diff --git a/tests/connectors/prometheus_test.py b/tests/connectors/prometheus_test.py index 27840cc5f..b360ba19c 100644 --- a/tests/connectors/prometheus_test.py +++ b/tests/connectors/prometheus_test.py @@ -6,6 +6,7 @@ import freezegun import httpx +import kubetest.client import pydantic import pytest import pytz @@ -373,8 +374,8 @@ def optimizer(self) -> servo.Optimizer: ) @pytest.fixture(autouse=True) - def _wait_for_cluster(self, kube) -> None: - kube.wait_for_registered() + def _wait_for_cluster(self, kube: kubetest.client.TestClient) -> None: + kube.wait_for_registered(timeout=1800) async def test_targets( self,