From 862f05b60817c126bec35d643bdc22bdefef8e4d Mon Sep 17 00:00:00 2001 From: Andrew Johnson Date: Tue, 25 Jan 2022 08:33:52 +0100 Subject: [PATCH] feat: #161 Add initial support for scaling up/down original service (only for basic APIs) --- bahub.conf.yaml | 7 +- bahub/transports/kubernetes_podexec.py | 11 +- bahub/transports/kubernetes_sidepod.py | 173 +++++++++++++++++----- test/env/kubernetes/nginx/deployment.yaml | 22 +++ 4 files changed, 175 insertions(+), 38 deletions(-) create mode 100644 test/env/kubernetes/nginx/deployment.yaml diff --git a/bahub.conf.yaml b/bahub.conf.yaml index 2e299a4..0c415b8 100644 --- a/bahub.conf.yaml +++ b/bahub.conf.yaml @@ -45,10 +45,11 @@ transports: kubernetes_pod_fs: type: bahub.transports.kubernetes_sidepod spec: - namespace: kube-system - selector: "component=etcd" - image: "ubuntu:22.04" + namespace: default + selector: "app=nginx" + image: "ghcr.io/riotkit-org/backup-maker-env:latest" timeout: 300 + scaleDown: true # docker_postgres: # type: bahub.transports.docker diff --git a/bahub/transports/kubernetes_podexec.py b/bahub/transports/kubernetes_podexec.py index 24ae1c3..7ba8982 100644 --- a/bahub/transports/kubernetes_podexec.py +++ b/bahub/transports/kubernetes_podexec.py @@ -21,6 +21,7 @@ class Transport(TransportInterface): _v1_core_api: client.CoreV1Api + _v1_apps_api: client.AppsV1Api _process: ExecResult _binaries: List[RequiredBinary] @@ -56,13 +57,21 @@ def get_specification_schema() -> dict: } @property - def v1_core_api(self): + def v1_core_api(self) -> client.CoreV1Api: if not hasattr(self, '_v1_core_api'): config.load_kube_config() # todo: Add support for selecting cluster self._v1_core_api = client.CoreV1Api() return self._v1_core_api + @property + def v1_apps_api(self) -> client.AppsV1Api: + if not hasattr(self, '_v1_apps_api'): + config.load_kube_config() # todo: Add support for selecting cluster + self._v1_apps_api = client.AppsV1Api() + + return self._v1_apps_api + def prepare_environment(self, binaries: List[RequiredBinary]) -> None: self._binaries = binaries diff --git a/bahub/transports/kubernetes_sidepod.py b/bahub/transports/kubernetes_sidepod.py index 3c4d26b..54f598c 100644 --- a/bahub/transports/kubernetes_sidepod.py +++ b/bahub/transports/kubernetes_sidepod.py @@ -6,24 +6,38 @@ Temporary POD is attempted to be scheduled closest to the original POD to mitigate the latency """ import json -from typing import Tuple +import time +from dataclasses import dataclass +from typing import Tuple, List -from kubernetes.client import ApiException +from kubernetes.client import ApiException, V1Pod, V1ObjectMeta, V1OwnerReference, V1Scale, V1ScaleSpec from rkd.api.inputoutput import IO from .kubernetes_podexec import Transport as KubernetesPodExecTransport +@dataclass +class ReplicaToScale(object): + kind: str + name: str + namespace: str + replicas: int + + class Transport(KubernetesPodExecTransport): _image: str _timeout: int + _scale_down: bool # dynamic _temporary_pod_name: str + _replicas_to_scale: List[ReplicaToScale] def __init__(self, spec: dict, io: IO): super().__init__(spec, io) self._image = spec.get('image') self._timeout = spec.get('timeout') + self._replicas_to_scale = [] + self._scale_down = bool(spec.get('scaleDown')) @staticmethod def get_specification_schema() -> dict: @@ -53,49 +67,136 @@ def get_specification_schema() -> dict: "type": "integer", "default": 3600, "example": 3600 + }, + "scaleDown": { + "type": "boolean", + "default": False, + "example": False } - - # todo: Optional scale-down of application for backup time - # "scaleResource": { - # "type": "object", - # "properties": { - # "kind": { - # "type": "string", - # "example": "deployment", - # "default": "deployment" - # }, - # "selector": { - # "type": "string", - # "example": "my-label=myvalue", - # "default": "" - # } - # }, - # } } } def schedule(self, command: str, definition, is_backup: bool, version: str = "") -> None: original_pod_name = self.find_pod_name(self._selector, self._namespace) - volumes, volume_mounts = self._copy_volumes_specification_from_existing_pod( - original_pod_name, - namespace=self._namespace - ) + try: + if self._scale_down: + self._scale_pod_owner(original_pod_name, self._namespace) - # spawn temporary pod - self._temporary_pod_name = f"{original_pod_name}-backup" - self._create_pod( - pod_name=self._temporary_pod_name, - specification=self._create_backup_pod_definition( + volumes, volume_mounts = self._copy_volumes_specification_from_existing_pod( original_pod_name, - self._temporary_pod_name, - self._timeout, - volumes, - volume_mounts + namespace=self._namespace + ) + + # spawn temporary pod + self._temporary_pod_name = f"{original_pod_name}-backup" + self._create_pod( + pod_name=self._temporary_pod_name, + specification=self._create_backup_pod_definition( + original_pod_name, + self._temporary_pod_name, + self._timeout, + volumes, + volume_mounts + ) + ) + + self._execute_in_pod_when_pod_will_be_ready(self._temporary_pod_name, command, definition, is_backup, version) + except: + if self._scale_down: + self._scale_back() + + raise + + def _scale_pod_owner(self, pod_name: str, namespace: str): + """ + Tell Deployment/ReplicationController/StatefulSet etc. to scale down + + :param pod_name: + :param namespace: + :return: + """ + + pod: V1Pod = self._v1_core_api.read_namespaced_pod(name=pod_name, namespace=namespace) + metadata: V1ObjectMeta = pod.metadata + owners: List[V1OwnerReference] = metadata.owner_references + + if not owners: + self.io().warn("No POD owner found through owner references") + return + + self._scale_by_owner_references(owners, namespace) + + def _scale_by_owner_references(self, owners: List[V1OwnerReference], namespace: str): + for owner in owners: + if owner.kind == "ReplicaSet": + rs = self.v1_apps_api.read_namespaced_replica_set(name=owner.name, namespace=namespace) + metadata: V1ObjectMeta = rs.metadata + rs_owners: List[V1OwnerReference] = metadata.owner_references + + self._scale_by_owner_references(rs_owners, namespace) + continue + + elif owner.kind != "Deployment": + self.io().warn(f"Unsupported controller type '{owner.kind}', will not attempt to scale it") + continue + + deployment_as_dict = json.loads(self.v1_apps_api.read_namespaced_deployment(name=owner.name, + namespace=namespace, + _preload_content=False).data) + + self._replicas_to_scale.append(ReplicaToScale( + kind='Deployment', + name=owner.name, + namespace=namespace, + replicas=deployment_as_dict['spec'].get('replicas', 0) + )) + self._scale(owner.name, namespace, 0) + + def _scale(self, name: str, namespace: str, replicas: int): + """ + Scale down given Deployment/ReplicationController/StatefulSet + + :param name: + :param namespace: + :param replicas: + :return: + """ + + self.io().info(f"Scaling deployment/{name} in {namespace} namespace to replicas '{replicas}'") + scale = V1Scale( + metadata=V1ObjectMeta(name=name, namespace=namespace), + spec=V1ScaleSpec( + replicas=replicas ) ) + self.v1_apps_api.replace_namespaced_deployment_scale(name, namespace, scale) - self._execute_in_pod_when_pod_will_be_ready(self._temporary_pod_name, command, definition, is_backup, version) + # then wait for it to be applied + for i in range(0, 3600): + deployment_as_dict = json.loads(self.v1_apps_api.read_namespaced_deployment(name=name, + namespace=namespace, + _preload_content=False).data) + + current_replicas_num = int(deployment_as_dict['spec']['replicas']) + + if current_replicas_num == int(replicas): + self.io().info(f"POD's controller scaled to {replicas}") + break + + self.io().debug(f"Waiting for cluster to scale POD's controller to {replicas}," + f" currently: {current_replicas_num}") + time.sleep(1) + + def _scale_back(self): + """ + Bring back service pods + + :return: + """ + + for kind_object in self._replicas_to_scale: + self._scale(kind_object.name, kind_object.namespace, kind_object.replicas) def _create_pod(self, pod_name: str, specification: dict): self.io().info(f"Creating temporary POD '{pod_name}'") @@ -162,7 +263,11 @@ def __exit__(self, exc_type, exc_val, exc_t) -> None: :return: """ - self._terminate_pod(self._temporary_pod_name) + try: + if hasattr(self, '_temporary_pod_name'): + self._terminate_pod(self._temporary_pod_name) + finally: + self._scale_back() def _create_backup_pod_definition(self, original_pod_name: str, backup_pod_name: str, timeout: int, volumes: dict, volume_mounts: list) -> dict: diff --git a/test/env/kubernetes/nginx/deployment.yaml b/test/env/kubernetes/nginx/deployment.yaml new file mode 100644 index 0000000..a3c8717 --- /dev/null +++ b/test/env/kubernetes/nginx/deployment.yaml @@ -0,0 +1,22 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nginx + labels: + app: nginx +spec: + replicas: 2 + selector: + matchLabels: + app: nginx + template: + metadata: + labels: + app: nginx + spec: + containers: + - name: nginx + image: nginx:1.19 + ports: + - containerPort: 80