Skip to content
This repository has been archived by the owner on Jan 14, 2024. It is now read-only.

Commit

Permalink
feat: #161 Add initial support for scaling up/down original service (…
Browse files Browse the repository at this point in the history
…only for basic APIs)
  • Loading branch information
blackandred committed Jan 25, 2022
1 parent 74481b2 commit 862f05b
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 38 deletions.
7 changes: 4 additions & 3 deletions bahub.conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ transports:
kubernetes_pod_fs:
type: bahub.transports.kubernetes_sidepod
spec:
namespace: kube-system
selector: "component=etcd"
image: "ubuntu:22.04"
namespace: default
selector: "app=nginx"
image: "ghcr.io/riotkit-org/backup-maker-env:latest"
timeout: 300
scaleDown: true

# docker_postgres:
# type: bahub.transports.docker
Expand Down
11 changes: 10 additions & 1 deletion bahub/transports/kubernetes_podexec.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

class Transport(TransportInterface):
_v1_core_api: client.CoreV1Api
_v1_apps_api: client.AppsV1Api
_process: ExecResult
_binaries: List[RequiredBinary]

Expand Down Expand Up @@ -56,13 +57,21 @@ def get_specification_schema() -> dict:
}

@property
def v1_core_api(self):
def v1_core_api(self) -> client.CoreV1Api:
if not hasattr(self, '_v1_core_api'):
config.load_kube_config() # todo: Add support for selecting cluster
self._v1_core_api = client.CoreV1Api()

return self._v1_core_api

@property
def v1_apps_api(self) -> client.AppsV1Api:
if not hasattr(self, '_v1_apps_api'):
config.load_kube_config() # todo: Add support for selecting cluster
self._v1_apps_api = client.AppsV1Api()

return self._v1_apps_api

def prepare_environment(self, binaries: List[RequiredBinary]) -> None:
self._binaries = binaries

Expand Down
173 changes: 139 additions & 34 deletions bahub/transports/kubernetes_sidepod.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,38 @@
Temporary POD is attempted to be scheduled closest to the original POD to mitigate the latency
"""
import json
from typing import Tuple
import time
from dataclasses import dataclass
from typing import Tuple, List

from kubernetes.client import ApiException
from kubernetes.client import ApiException, V1Pod, V1ObjectMeta, V1OwnerReference, V1Scale, V1ScaleSpec
from rkd.api.inputoutput import IO
from .kubernetes_podexec import Transport as KubernetesPodExecTransport


@dataclass
class ReplicaToScale(object):
kind: str
name: str
namespace: str
replicas: int


class Transport(KubernetesPodExecTransport):
_image: str
_timeout: int
_scale_down: bool

# dynamic
_temporary_pod_name: str
_replicas_to_scale: List[ReplicaToScale]

def __init__(self, spec: dict, io: IO):
super().__init__(spec, io)
self._image = spec.get('image')
self._timeout = spec.get('timeout')
self._replicas_to_scale = []
self._scale_down = bool(spec.get('scaleDown'))

@staticmethod
def get_specification_schema() -> dict:
Expand Down Expand Up @@ -53,49 +67,136 @@ def get_specification_schema() -> dict:
"type": "integer",
"default": 3600,
"example": 3600
},
"scaleDown": {
"type": "boolean",
"default": False,
"example": False
}

# todo: Optional scale-down of application for backup time
# "scaleResource": {
# "type": "object",
# "properties": {
# "kind": {
# "type": "string",
# "example": "deployment",
# "default": "deployment"
# },
# "selector": {
# "type": "string",
# "example": "my-label=myvalue",
# "default": ""
# }
# },
# }
}
}

def schedule(self, command: str, definition, is_backup: bool, version: str = "") -> None:
original_pod_name = self.find_pod_name(self._selector, self._namespace)

volumes, volume_mounts = self._copy_volumes_specification_from_existing_pod(
original_pod_name,
namespace=self._namespace
)
try:
if self._scale_down:
self._scale_pod_owner(original_pod_name, self._namespace)

# spawn temporary pod
self._temporary_pod_name = f"{original_pod_name}-backup"
self._create_pod(
pod_name=self._temporary_pod_name,
specification=self._create_backup_pod_definition(
volumes, volume_mounts = self._copy_volumes_specification_from_existing_pod(
original_pod_name,
self._temporary_pod_name,
self._timeout,
volumes,
volume_mounts
namespace=self._namespace
)

# spawn temporary pod
self._temporary_pod_name = f"{original_pod_name}-backup"
self._create_pod(
pod_name=self._temporary_pod_name,
specification=self._create_backup_pod_definition(
original_pod_name,
self._temporary_pod_name,
self._timeout,
volumes,
volume_mounts
)
)

self._execute_in_pod_when_pod_will_be_ready(self._temporary_pod_name, command, definition, is_backup, version)
except:
if self._scale_down:
self._scale_back()

raise

def _scale_pod_owner(self, pod_name: str, namespace: str):
"""
Tell Deployment/ReplicationController/StatefulSet etc. to scale down
:param pod_name:
:param namespace:
:return:
"""

pod: V1Pod = self._v1_core_api.read_namespaced_pod(name=pod_name, namespace=namespace)
metadata: V1ObjectMeta = pod.metadata
owners: List[V1OwnerReference] = metadata.owner_references

if not owners:
self.io().warn("No POD owner found through owner references")
return

self._scale_by_owner_references(owners, namespace)

def _scale_by_owner_references(self, owners: List[V1OwnerReference], namespace: str):
for owner in owners:
if owner.kind == "ReplicaSet":
rs = self.v1_apps_api.read_namespaced_replica_set(name=owner.name, namespace=namespace)
metadata: V1ObjectMeta = rs.metadata
rs_owners: List[V1OwnerReference] = metadata.owner_references

self._scale_by_owner_references(rs_owners, namespace)
continue

elif owner.kind != "Deployment":
self.io().warn(f"Unsupported controller type '{owner.kind}', will not attempt to scale it")
continue

deployment_as_dict = json.loads(self.v1_apps_api.read_namespaced_deployment(name=owner.name,
namespace=namespace,
_preload_content=False).data)

self._replicas_to_scale.append(ReplicaToScale(
kind='Deployment',
name=owner.name,
namespace=namespace,
replicas=deployment_as_dict['spec'].get('replicas', 0)
))
self._scale(owner.name, namespace, 0)

def _scale(self, name: str, namespace: str, replicas: int):
"""
Scale down given Deployment/ReplicationController/StatefulSet
:param name:
:param namespace:
:param replicas:
:return:
"""

self.io().info(f"Scaling deployment/{name} in {namespace} namespace to replicas '{replicas}'")
scale = V1Scale(
metadata=V1ObjectMeta(name=name, namespace=namespace),
spec=V1ScaleSpec(
replicas=replicas
)
)
self.v1_apps_api.replace_namespaced_deployment_scale(name, namespace, scale)

self._execute_in_pod_when_pod_will_be_ready(self._temporary_pod_name, command, definition, is_backup, version)
# then wait for it to be applied
for i in range(0, 3600):
deployment_as_dict = json.loads(self.v1_apps_api.read_namespaced_deployment(name=name,
namespace=namespace,
_preload_content=False).data)

current_replicas_num = int(deployment_as_dict['spec']['replicas'])

if current_replicas_num == int(replicas):
self.io().info(f"POD's controller scaled to {replicas}")
break

self.io().debug(f"Waiting for cluster to scale POD's controller to {replicas},"
f" currently: {current_replicas_num}")
time.sleep(1)

def _scale_back(self):
"""
Bring back service pods
:return:
"""

for kind_object in self._replicas_to_scale:
self._scale(kind_object.name, kind_object.namespace, kind_object.replicas)

def _create_pod(self, pod_name: str, specification: dict):
self.io().info(f"Creating temporary POD '{pod_name}'")
Expand Down Expand Up @@ -162,7 +263,11 @@ def __exit__(self, exc_type, exc_val, exc_t) -> None:
:return:
"""

self._terminate_pod(self._temporary_pod_name)
try:
if hasattr(self, '_temporary_pod_name'):
self._terminate_pod(self._temporary_pod_name)
finally:
self._scale_back()

def _create_backup_pod_definition(self, original_pod_name: str, backup_pod_name: str, timeout: int,
volumes: dict, volume_mounts: list) -> dict:
Expand Down
22 changes: 22 additions & 0 deletions test/env/kubernetes/nginx/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
labels:
app: nginx
spec:
replicas: 2
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.19
ports:
- containerPort: 80

0 comments on commit 862f05b

Please sign in to comment.