From c94437e8770c0657c74e6fc47acab71725d3510b Mon Sep 17 00:00:00 2001 From: Andrew Johnson Date: Sun, 30 Jan 2022 13:58:01 +0100 Subject: [PATCH] refactor: Extract code to `kubernetes.py`, use `KubernetesError` exception instead of `Exception` --- bahub/exception.py | 27 +++-- bahub/transports/kubernetes.py | 112 ++++++++++++++++- bahub/transports/kubernetes_podexec.py | 74 ++---------- bahub/transports/kubernetes_sidepod.py | 78 +++--------- test/test_transports_kubernetes_transports.py | 113 ++++++++++++++++++ 5 files changed, 268 insertions(+), 136 deletions(-) diff --git a/bahub/exception.py b/bahub/exception.py index ff0874e..83f83fe 100644 --- a/bahub/exception.py +++ b/bahub/exception.py @@ -17,6 +17,22 @@ def from_container_not_running(container_id: str, status: str) -> 'DockerContain return DockerContainerError('Container "{}" is not running but actually {}'.format(container_id, status)) +class KubernetesError(TransportException): + @classmethod + def from_timed_out_waiting_for_pod(cls, pod_name: str, namespace: str) -> 'KubernetesError': + return cls(f"Timed out while waiting for pod '{pod_name}' in namespace '{namespace}'") + + @classmethod + def cannot_scale_resource(cls, name: str, namespace: str, replicas: int): + return cls(f"Cannot achieve desired state of '{replicas}' replicas for '{name}' in '{namespace}' namespace") + + @classmethod + def from_pod_creation_conflict(cls, pod_name: str): + return cls(f"POD '{pod_name}' already exists or is terminating, " + f"please wait a moment - cannot start process in parallel, " + f"it may break something") + + class ConfigurationFactoryException(ApplicationException): pass @@ -79,17 +95,6 @@ def from_early_buffer_exit(stream_description: str) -> 'BufferingError': return BufferingError('Buffering of stream "{}" ended earlier with error'.format(stream_description)) -class CryptographyConfigurationError(ApplicationException): - pass - - -class CryptographyKeysAlreadyCreated(CryptographyConfigurationError): - @staticmethod - def from_keys_already_created(user_id: str) -> 'CryptographyConfigurationError': - return CryptographyKeysAlreadyCreated('Cryptography keys for "{uid}" already created, skipping creation' - .format(uid=user_id)) - - class BackupProcessError(ApplicationException): """Errors related to backup/restore streaming""" diff --git a/bahub/transports/kubernetes.py b/bahub/transports/kubernetes.py index 53735b5..c99fb04 100644 --- a/bahub/transports/kubernetes.py +++ b/bahub/transports/kubernetes.py @@ -1,16 +1,20 @@ """ Generic Kubernetes methods for building Kubernetes transports """ - - +import json import os +import time from typing import List, Callable import yaml from tempfile import TemporaryDirectory + +from kubernetes.client import CoreV1Api, V1PodList, V1Pod, V1ObjectMeta, V1Scale, V1ScaleSpec, AppsV1Api, ApiException from kubernetes.stream.ws_client import WSClient, ERROR_CHANNEL from rkd.api.inputoutput import IO from kubernetes import client from kubernetes.stream import stream + +from ..exception import KubernetesError from ..fs import FilesystemInterface @@ -93,6 +97,110 @@ def pod_exec(pod_name: str, namespace: str, cmd: List[str], io: IO) -> ExecResul ) +def find_pod_name(api: CoreV1Api, selector: str, namespace: str, io: IO) -> str: + """ + Returns a POD name + + :raises: When no matching POD found + """ + + pods: V1PodList = api.list_namespaced_pod(namespace, label_selector=selector, limit=1) + + if len(pods.items) == 0: + raise Exception(f'No pods found matching selector {selector} in {namespace} namespace') + + pod: V1Pod = pods.items[0] + pod_metadata: V1ObjectMeta = pod.metadata + + io.debug(f"Found POD name: '{pod_metadata.name}' in namespace '{namespace}'") + + return pod_metadata.name + + +def wait_for_pod_to_be_ready(api: CoreV1Api, pod_name: str, namespace: str, io: IO, timeout: int = 120): + """ + Waits for POD to reach a valid state + + :raises: When timeout hits + """ + + io.debug("Waiting for POD to be ready...") + + for i in range(0, timeout): + pod: V1Pod = api.read_namespaced_pod(name=pod_name, namespace=namespace) + + if pod.status.phase in ["Ready", "Healthy", "True", "Running"]: + _wait_for_pod_containers_to_be_ready(api, pod_name, namespace, timeout, io) + io.info(f"POD entered '{pod.status.phase}' state") + time.sleep(1) + + return True + + io.debug(f"Pod not ready. Status: {pod.status.phase}") + time.sleep(1) + + raise KubernetesError.from_timed_out_waiting_for_pod(pod_name, namespace) + + +def _wait_for_pod_containers_to_be_ready(api: CoreV1Api, pod_name: str, namespace: str, timeout: int, io: IO): + """ + POD can be running, but containers could be still initializing - this method waits for containers + """ + + for i in range(0, timeout): + pod: V1Pod = api.read_namespaced_pod(name=pod_name, namespace=namespace) + + if all([(c.state.running and not c.state.waiting and not c.state.terminated) + for c in pod.status.container_statuses]): + io.info("All containers in a POD have started") + return + + +def scale_resource(api: AppsV1Api, name: str, namespace: str, replicas: int, io: IO): + """ + Scale down given Deployment/ReplicationController/StatefulSet + """ + + io.info(f"Scaling deployment/{name} in {namespace} namespace to replicas '{replicas}'") + scale_spec = V1Scale( + metadata=V1ObjectMeta(name=name, namespace=namespace), + spec=V1ScaleSpec( + replicas=replicas + ) + ) + api.replace_namespaced_deployment_scale(name, namespace, scale_spec) + + # then wait for it to be applied + for i in range(0, 3600): + deployment_as_dict = json.loads(api.read_namespaced_deployment(name=name, namespace=namespace, + _preload_content=False).data) + + current_replicas_num = int(deployment_as_dict['spec']['replicas']) + + if current_replicas_num == int(replicas): + io.info(f"POD's controller scaled to {replicas}") + return + + io.debug(f"Waiting for cluster to scale POD's controller to {replicas}, currently: {current_replicas_num}") + time.sleep(1) + + raise KubernetesError.cannot_scale_resource(name, namespace, replicas) + + +def create_pod(api: CoreV1Api, pod_name: str, namespace, specification: dict, io: IO): + io.info(f"Creating temporary POD '{pod_name}'") + specification['metadata']['name'] = pod_name + + try: + api.create_namespaced_pod(namespace=namespace, body=specification) + + except ApiException as e: + if e.reason == "Conflict" and "AlreadyExists" in str(e.body): + raise KubernetesError.from_pod_creation_conflict(pod_name) from e + + raise e + + class KubernetesPodFilesystem(FilesystemInterface): io: IO pod_name: str diff --git a/bahub/transports/kubernetes_podexec.py b/bahub/transports/kubernetes_podexec.py index 6533948..bc3f503 100644 --- a/bahub/transports/kubernetes_podexec.py +++ b/bahub/transports/kubernetes_podexec.py @@ -5,10 +5,8 @@ Performs `exec` operation into EXISTING, RUNNING POD to run a backup operation in-place. """ -import time from typing import List from kubernetes import config, client -from kubernetes.client import V1PodList, V1Pod, V1ObjectMeta from rkd.api.inputoutput import IO from bahub.bin import RequiredBinary, copy_required_tools_from_controller_cache_to_target_env, \ @@ -16,7 +14,8 @@ from bahub.exception import ConfigurationError from bahub.settings import BIN_VERSION_CACHE_PATH, TARGET_ENV_BIN_PATH, TARGET_ENV_VERSIONS_PATH from bahub.transports.base import TransportInterface, create_backup_maker_command -from bahub.transports.kubernetes import KubernetesPodFilesystem, pod_exec, ExecResult +from bahub.transports.kubernetes import KubernetesPodFilesystem, pod_exec, ExecResult, find_pod_name, \ + wait_for_pod_to_be_ready from bahub.transports.sh import LocalFilesystem @@ -34,6 +33,7 @@ def __init__(self, spec: dict, io: IO): super().__init__(spec, io) self._namespace = spec.get('namespace', 'default') self._selector = spec.get('selector', '') + self._timeout = int(spec.get('timeout', 120)) if not self._selector: raise ConfigurationError("'selector' for Kubernetes type transport cannot be empty") @@ -56,6 +56,11 @@ def get_specification_schema() -> dict: "type": "string", "example": "prod", "default": "default" + }, + "timeout": { + "type": "string", + "example": 120, + "default": 120 } } } @@ -84,7 +89,7 @@ def schedule(self, command: str, definition, is_backup: bool, version: str = "") Runs a `kubectl exec` on already existing POD """ - pod_name = self.find_pod_name(self._selector, self._namespace) + pod_name = self._find_pod_name(self._selector, self._namespace) self._execute_in_pod_when_pod_will_be_ready(pod_name, command, definition, is_backup, version) def __exit__(self, exc_type, exc_val, exc_t) -> None: @@ -113,7 +118,7 @@ def _execute_in_pod_when_pod_will_be_ready(self, pod_name: str, command: str, de :return: """ - self.wait_for_pod_to_be_ready(pod_name, self._namespace) + wait_for_pod_to_be_ready(self._v1_core_api, pod_name, self._namespace, io=self.io(), timeout=self._timeout) self._prepare_environment_inside_pod(definition, pod_name) complete_cmd = create_backup_maker_command(command, definition, is_backup, version, @@ -162,63 +167,8 @@ def watch(self) -> bool: self._process.watch(self.io().debug) return self._process.has_exited_with_success() - def wait_for_pod_to_be_ready(self, pod_name: str, namespace: str, timeout: int = 120): - """ - Waits for POD to reach a valid state - - :raises: When timeout hits - """ - - self.io().debug("Waiting for POD to be ready...") - - for i in range(0, timeout): - pod: V1Pod = self._v1_core_api.read_namespaced_pod(name=pod_name, namespace=namespace) - - if pod.status.phase in ["Ready", "Healthy", "True", "Running"]: - self._wait_for_pod_containers_to_be_ready(pod_name, namespace, timeout) - self.io().info(f"POD entered '{pod.status.phase}' state") - time.sleep(1) - - return True - - self.io().debug(f"Pod not ready. Status: {pod.status.phase}") - time.sleep(1) - - raise Exception(f"Timed out while waiting for pod '{pod_name}' in namespace '{namespace}'") - - def find_pod_name(self, selector: str, namespace: str): - """ - Returns a POD name - - :raises: When no matching POD found - """ - - pods: V1PodList = self.v1_core_api.list_namespaced_pod(namespace, - label_selector=selector, - limit=1) - - if len(pods.items) == 0: - raise Exception(f'No pods found matching selector {selector} in {namespace} namespace') - - pod: V1Pod = pods.items[0] - pod_metadata: V1ObjectMeta = pod.metadata - - self.io().debug(f"Found POD name: '{pod_metadata.name}' in namespace '{namespace}'") - - return pod_metadata.name - def get_required_binaries(self): return [] - def _wait_for_pod_containers_to_be_ready(self, pod_name: str, namespace: str, timeout: int): - """ - POD can be running, but containers could be still initializing - """ - - for i in range(0, timeout): - pod: V1Pod = self._v1_core_api.read_namespaced_pod(name=pod_name, namespace=namespace) - - if all([(c.state.running and not c.state.waiting and not c.state.terminated) - for c in pod.status.container_statuses]): - self.io().info("All containers in a POD have started") - return + def _find_pod_name(self, selector: str, namespace: str) -> str: + return find_pod_name(self.v1_core_api, selector, namespace, self.io()) diff --git a/bahub/transports/kubernetes_sidepod.py b/bahub/transports/kubernetes_sidepod.py index 3a8cb05..484d9bc 100644 --- a/bahub/transports/kubernetes_sidepod.py +++ b/bahub/transports/kubernetes_sidepod.py @@ -6,12 +6,13 @@ Temporary POD is attempted to be scheduled closest to the original POD to mitigate the latency """ import json -import time from dataclasses import dataclass -from typing import Tuple, List +from typing import Tuple, List, Optional -from kubernetes.client import ApiException, V1Pod, V1ObjectMeta, V1OwnerReference, V1Scale, V1ScaleSpec +from kubernetes.client import V1Pod, V1ObjectMeta, V1OwnerReference from rkd.api.inputoutput import IO + +from .kubernetes import wait_for_pod_to_be_ready, scale_resource, create_pod from .kubernetes_podexec import Transport as KubernetesPodExecTransport @@ -36,7 +37,6 @@ class Transport(KubernetesPodExecTransport): def __init__(self, spec: dict, io: IO): super().__init__(spec, io) self._image = spec.get('image', 'ghcr.io/riotkit-org/backup-maker-env:latest') - self._timeout = spec.get('timeout', 3600) self._replicas_to_scale = [] self._scale_down = bool(spec.get('scaleDown', False)) self._pod_suffix = spec.get('podSuffix', '-backup') @@ -86,12 +86,12 @@ def get_specification_schema() -> dict: "default": "-backup", "example": "-backup", "description": "Suffix for name of a backup POD (original pod name + suffix)" - } + }, } } def schedule(self, command: str, definition, is_backup: bool, version: str = "") -> None: - original_pod_name = self.find_pod_name(self._selector, self._namespace) + original_pod_name = self._find_pod_name(self._selector, self._namespace) try: if self._scale_down: @@ -104,7 +104,9 @@ def schedule(self, command: str, definition, is_backup: bool, version: str = "") # spawn temporary pod self._temporary_pod_name = f"{original_pod_name}{self._pod_suffix}" - self._create_pod( + create_pod( + self._v1_core_api, + namespace=self._namespace, pod_name=self._temporary_pod_name, specification=self._create_backup_pod_definition( original_pod_name, @@ -112,7 +114,8 @@ def schedule(self, command: str, definition, is_backup: bool, version: str = "") self._timeout, volumes, volume_mounts - ) + ), + io=self.io() ) self._execute_in_pod_when_pod_will_be_ready(self._temporary_pod_name, command, definition, is_backup, @@ -168,42 +171,7 @@ def _scale_by_owner_references(self, owners: List[V1OwnerReference], namespace: namespace=namespace, replicas=deployment_as_dict['spec'].get('replicas', 0) )) - self._scale(owner.name, namespace, 0) - - def _scale(self, name: str, namespace: str, replicas: int): - """ - Scale down given Deployment/ReplicationController/StatefulSet - - :param name: - :param namespace: - :param replicas: - :return: - """ - - self.io().info(f"Scaling deployment/{name} in {namespace} namespace to replicas '{replicas}'") - scale = V1Scale( - metadata=V1ObjectMeta(name=name, namespace=namespace), - spec=V1ScaleSpec( - replicas=replicas - ) - ) - self.v1_apps_api.replace_namespaced_deployment_scale(name, namespace, scale) - - # then wait for it to be applied - for i in range(0, 3600): - deployment_as_dict = json.loads(self.v1_apps_api.read_namespaced_deployment(name=name, - namespace=namespace, - _preload_content=False).data) - - current_replicas_num = int(deployment_as_dict['spec']['replicas']) - - if current_replicas_num == int(replicas): - self.io().info(f"POD's controller scaled to {replicas}") - break - - self.io().debug(f"Waiting for cluster to scale POD's controller to {replicas}," - f" currently: {current_replicas_num}") - time.sleep(1) + scale_resource(self.v1_apps_api, owner.name, namespace, 0, io=self.io()) def _scale_back(self): """ @@ -213,21 +181,8 @@ def _scale_back(self): """ for kind_object in self._replicas_to_scale: - self._scale(kind_object.name, kind_object.namespace, kind_object.replicas) - - def _create_pod(self, pod_name: str, specification: dict): - self.io().info(f"Creating temporary POD '{pod_name}'") - - try: - self._v1_core_api.create_namespaced_pod(namespace=self._namespace, body=specification) - - except ApiException as e: - if e.reason == "Conflict" and "AlreadyExists" in str(e.body): - raise Exception(f"POD '{pod_name}' already exists or is terminating, " - f"please wait a moment - cannot start process in parallel, " - f"it may break something") from e - - raise e + scale_resource(self.v1_apps_api, kind_object.name, kind_object.namespace, + kind_object.replicas, io=self.io()) def _terminate_pod(self, pod_name: str): self.io().info("Clean up - deleting temporary POD") @@ -239,7 +194,7 @@ def _terminate_pod(self, pod_name: str): def _copy_volumes_specification_from_existing_pod(self, pod_name: str, namespace: str) -> Tuple[dict, list]: self.io().debug(f"Copying volumes specification from source pod={pod_name}") - self.wait_for_pod_to_be_ready(pod_name, namespace) + wait_for_pod_to_be_ready(self._v1_core_api, pod_name, namespace, io=self.io(), timeout=self._timeout) pod = json.loads(self._v1_core_api.read_namespaced_pod(pod_name, namespace, _preload_content=False).data) try: @@ -291,7 +246,7 @@ def __exit__(self, exc_type, exc_val, exc_t) -> None: self._scale_back() def _create_backup_pod_definition(self, original_pod_name: str, backup_pod_name: str, timeout: int, - volumes: dict, volume_mounts: list) -> dict: + volumes: Optional[dict], volume_mounts: Optional[list]) -> dict: return { 'apiVersion': 'v1', 'kind': 'Pod', @@ -303,6 +258,7 @@ def _create_backup_pod_definition(self, original_pod_name: str, backup_pod_name: } }, 'spec': { + "restartPolicy": "Never", 'containers': [ { 'image': self._image, diff --git a/test/test_transports_kubernetes_transports.py b/test/test_transports_kubernetes_transports.py index 1017a3d..713fd7a 100644 --- a/test/test_transports_kubernetes_transports.py +++ b/test/test_transports_kubernetes_transports.py @@ -3,8 +3,13 @@ import subprocess import time from unittest.mock import patch + +from kubernetes import config +from kubernetes.client import CoreV1Api from rkd.api.inputoutput import IO, BufferedSystemIO from rkd.api.testing import BasicTestingCase + +from bahub.transports.kubernetes import find_pod_name, create_pod, wait_for_pod_to_be_ready from bahub.transports.kubernetes_podexec import Transport as PodExecTransport from bahub.transports.kubernetes_sidepod import Transport as SidePodTransport from bahub.testing import create_example_fs_definition, run_transport @@ -73,6 +78,114 @@ def test_exec_inside_application_pod_finds_mounted_file(self, create_backup_make self.assertTrue(run_transport(definition, transport)) self.assertIn("I have never read Marx Capital, but I have the marks of capital all over me.", io.get_value()) + @patch('bahub.transports.kubernetes_podexec.create_backup_maker_command') + def test_inside_reports_failure_when_command_returns_error_exit_code(self, create_backup_maker_command): + """ + Test SidePod and PodExec transports against invalid command - watch() should return False + """ + + io = BufferedSystemIO() + io.set_log_level('debug') + + transports = [ + PodExecTransport( + spec={ + 'selector': "app=nginx", # see: test/env/kubernetes/nginx + 'namespace': 'default', + }, + io=io + ), + SidePodTransport( + spec={ + 'selector': "app=nginx", # see: test/env/kubernetes/nginx + 'namespace': 'default', + 'shell': '/bin/sh', + 'image': 'ghcr.io/mirrorshub/docker/alpine:3.14', + 'podSuffix': f"-backup-{hashlib.sha224(self._testMethodName.encode('utf-8')).hexdigest()[0:9]}" + }, + io=io + ) + ] + + for transport in transports: + definition = create_example_fs_definition(transport) + + create_backup_maker_command.return_value = ["/bin/false"] + self.assertFalse(run_transport(definition, transport), + msg=f"{transport} failed assertion, as command is expected to fail as /bin/false was used" + f" which returns a exit code '1'") + + def test_find_pod_name_raises_exception_when_pod_does_not_exist(self): + config.load_kube_config() + + with self.assertRaises(Exception) as raised: + find_pod_name(api=CoreV1Api(), + selector="my-non-existing-selector=value", + namespace="kube-system", + io=BufferedSystemIO()) + + self.assertIn("No pods found matching selector", str(raised.exception)) + self.assertIn("my-non-existing-selector=value", str(raised.exception)) + self.assertIn("kube-system", str(raised.exception)) + + def test_find_pod_name_finds_pod(self): + subprocess.check_call(["kubectl", "apply", "-f", os.getcwd() + "/test/env/kubernetes/nginx/"]) + time.sleep(5) + + config.load_kube_config() + name = find_pod_name(api=CoreV1Api(), + selector="app=nginx", # labels taken from deployment.yaml that was just applied + namespace="default", + io=BufferedSystemIO()) + + self.assertIn("nginx", name) + + def test_created_pod_will_timeout_and_will_be_deleted(self): + """ + Covers: _create_backup_pod_definition(), create_pod() and wait_for_pod_to_be_ready() + :return: + """ + + # at first: clean up + subprocess.call(["kubectl", "delete", "pod", "test-creation-and-scaling"]) + + config.load_kube_config() + + transport = SidePodTransport({'selector': '...'}, io=BufferedSystemIO()) + transport._image = "ghcr.io/mirrorshub/docker/alpine:3.14" + + create_pod( + api=CoreV1Api(), + pod_name="test-creation-and-scaling", + namespace="default", + specification=transport._create_backup_pod_definition( + original_pod_name="something", + backup_pod_name="something-backup", + timeout=4, + volumes=None, + volume_mounts=None + ), + io=BufferedSystemIO() + ) + wait_for_pod_to_be_ready(api=CoreV1Api(), + pod_name="test-creation-and-scaling", + namespace="default", + io=BufferedSystemIO(), + timeout=120) + + self.assertIn( + "Running", + subprocess.check_output("kubectl get pods -n default | grep test-creation-and-scaling", shell=True) + .decode('utf-8') + ) + + time.sleep(5) + self.assertIn( + "Completed", + subprocess.check_output("kubectl get pods -n default | grep test-creation-and-scaling", shell=True) + .decode('utf-8') + ) + @classmethod def setUpClass(cls) -> None: super(TestKubernetesTransport, cls).setUpClass()