diff --git a/README.md b/README.md index ee19074..6ca713c 100644 --- a/README.md +++ b/README.md @@ -31,9 +31,30 @@ pip install kubejobs ## Usage +### KubernetesPod + +The `KubernetesPod` class helps you create a Kubernetes Pod, generate its YAML configuration, and run the pod. Kubernetes Pods are the smallest deployable units in Kubernetes and can contain one or more containers. They have no automatic restart policy, making them suitable for short-lived tasks. + +```python +from kubejobs.pods import KubernetesPod + +# Create a Kubernetes Pod with a name, container image, and command +pod = KubernetesPod( + name="my-pod", + image="ubuntu:20.04", + command=["/bin/bash", "-c", "echo 'Hello, World!'"], +) + +# Generate the YAML configuration for the Pod +print(pod.generate_yaml()) + +# Run the Pod on the Kubernetes cluster +pod.run() +``` + ### KubernetesJob -The `KubernetesJob` class helps you create a Kubernetes Job, generate its YAML configuration, and run the job. Kubernetes Jobs are useful for running short-lived, one-off tasks in your cluster. +The `KubernetesJob` class helps you create a Kubernetes Job, generate its YAML configuration, and run the job. Kubernetes Jobs are useful for running short-lived, one-off tasks in your cluster, under-the-hood, they create one or more Pods to execute the task. ```python from kubejobs.jobs import KubernetesJob diff --git a/kubejobs/examples/example_pod.py b/kubejobs/examples/example_pod.py new file mode 100644 index 0000000..e54363b --- /dev/null +++ b/kubejobs/examples/example_pod.py @@ -0,0 +1,34 @@ +# Example usage: +# `python example_pod.py` +# Expected output: +# should print the pod yaml and run the pod on the cluster + +# running `kubectl logs -f pod/pod-test-info-40gb-full-` +# should show the disk usage information + +# make sure to delete the pod after running the example +# `kubectl delete pod pod-test-info-40gb-full-` + +import time + +from kubejobs.pods import KubernetesPod + +# unique id generated using time +unique_id = time.strftime("%Y%m%d") + +pod = KubernetesPod( + name=f"pod-test-info-40gb-full-{unique_id}", + image="nvcr.io/nvidia/cuda:12.0.0-cudnn8-devel-ubuntu22.04", + command=["/bin/bash"], + args=["-c", "df -h"], + gpu_type="nvidia.com/gpu", + gpu_product="NVIDIA-A100-SXM4-40GB", + gpu_limit=1, + cpu_request=1, + ram_request="1Gi", + volume_mounts={"nfs": {"mountPath": "/nfs", "server": "10.24.1.255", "path": "/"}}, +) + +pod_yaml = pod.generate_yaml() +print(pod_yaml) +pod.run() diff --git a/kubejobs/pods.py b/kubejobs/pods.py new file mode 100644 index 0000000..39a6640 --- /dev/null +++ b/kubejobs/pods.py @@ -0,0 +1,328 @@ +import os +import yaml +import subprocess +from typing import List, Optional +from kubernetes import config +import logging + +from kubejobs.jobs import fetch_user_info + +logger = logging.getLogger(__name__) +MAX_CPU = 192 +MAX_RAM = 890 +MAX_GPU = 8 + + +class KubernetesPod: + """ + A class for generating Kubernetes Pod YAML configurations. + + Attributes: + name (str): Name of the pod and associated resources. + image (str): Container image to use for the pod. + command (List[str], optional): Command to execute in the container. Defaults to None. + args (List[str], optional): Arguments for the command. Defaults to None. + cpu_request (str, optional): Amount of CPU to request. For example, "500m" for half a CPU. Defaults to None. Max is 192 CPUs. + ram_request (str, optional): Amount of RAM to request. For example, "1Gi" for 1 gibibyte. Defaults to None. Max is 890 GB. + storage_request (str, optional): Amount of storage to request. For example, "10Gi" for 10 gibibytes. Defaults to None. + gpu_type (str, optional): Type of GPU resource, e.g. "nvidia.com/gpu". Defaults to None. + gpu_product (str, optional): GPU product, e.g. "NVIDIA-A100-SXM4-80GB". Defaults to None. + gpu_limit (int, optional): Number of GPU resources to allocate. Defaults to None. + restart_policy (str, optional): Restart policy for the pod, default is "Never". + shm_size (str, optional): Size of shared memory, e.g. "2Gi". If not set, defaults to None. + secret_env_vars (dict, optional): Dictionary of secret environment variables. Defaults to None. + env_vars (dict, optional): Dictionary of normal (non-secret) environment variables. Defaults to None. + volume_mounts (dict, optional): Dictionary of volume mounts. Defaults to None. + namespace (str, optional): Namespace of the pod. Defaults to None. + image_pull_secret (str, optional): Name of the image pull secret. Defaults to None. + """ + + def __init__( + self, + name: str, + image: str, + command: List[str] = None, + args: Optional[List[str]] = None, + cpu_request: Optional[str] = None, + ram_request: Optional[str] = None, + storage_request: Optional[str] = None, + gpu_type: Optional[str] = None, + gpu_product: Optional[str] = None, + gpu_limit: Optional[int] = None, + restart_policy: str = "Never", + shm_size: Optional[str] = None, + secret_env_vars: Optional[dict] = None, + env_vars: Optional[dict] = None, + volume_mounts: Optional[dict] = None, + privileged_security_context: bool = False, + user_name: Optional[str] = None, + user_email: Optional[str] = None, + labels: Optional[dict] = None, + annotations: Optional[dict] = None, + namespace: Optional[str] = None, + image_pull_secret: Optional[str] = None, + kueue_queue_name: str = "informatics-user-queue", + ): + self.name = name + self.image = image + self.command = command + self.args = args + self.cpu_request = cpu_request + self.ram_request = ram_request + self.storage_request = storage_request + self.gpu_type = gpu_type + self.gpu_product = gpu_product + self.kueue_queue_name = kueue_queue_name + + self.gpu_limit = gpu_limit + self.restart_policy = restart_policy + self.shm_size = ( + shm_size + if shm_size is not None + else ( + ram_request + if ram_request is not None + else f"{MAX_RAM // (MAX_GPU - gpu_limit + 1)}G" + ) + ) + self.secret_env_vars = secret_env_vars + self.env_vars = env_vars + self.volume_mounts = volume_mounts + self.privileged_security_context = privileged_security_context + + self.user_name = user_name or os.environ.get("USER", "unknown") + self.user_email = user_email # This is now a required field. + + self.labels = { + "eidf/user": self.user_name, + "kueue.x-k8s.io/queue-name": self.kueue_queue_name, + } + + if labels is not None: + self.labels.update(labels) + + self.annotations = {"eidf/user": self.user_name} + if user_email is not None: + self.annotations["eidf/email"] = user_email + + if annotations is not None: + self.annotations.update(annotations) + + self.user_info = fetch_user_info() + self.annotations.update(self.user_info) + logger.info(f"labels {self.labels}") + logger.info(f"annotations {self.annotations}") + + self.namespace = namespace + self.image_pull_secret = image_pull_secret + + def _add_shm_size(self, container: dict): + """Adds shared memory volume if shm_size is set.""" + if self.shm_size: + container["volumeMounts"].append({"name": "dshm", "mountPath": "/dev/shm"}) + return container + + def _add_env_vars(self, container: dict): + """Adds secret and normal environment variables to the container.""" + container["env"] = [] + if self.secret_env_vars or self.env_vars: + if self.secret_env_vars: + for key, value in self.secret_env_vars.items(): + container["env"].append( + { + "name": key, + "valueFrom": { + "secretKeyRef": { + "name": value["secret_name"], + "key": value["key"], + } + }, + } + ) + + if self.env_vars: + for key, value in self.env_vars.items(): + container["env"].append({"name": key, "value": value}) + + # Always export the POD_NAME environment variable + container["env"].append( + { + "name": "POD_NAME", + "valueFrom": {"fieldRef": {"fieldPath": "metadata.name"}}, + } + ) + + return container + + def _add_volume_mounts(self, container: dict): + """Adds volume mounts to the container.""" + if self.volume_mounts: + for mount_name, mount_data in self.volume_mounts.items(): + container["volumeMounts"].append( + { + "name": mount_name, + "mountPath": mount_data["mountPath"], + } + ) + + return container + + def _add_privileged_security_context(self, container: dict): + """Adds privileged security context to the container.""" + if self.privileged_security_context: + container["securityContext"] = { + "privileged": True, + } + + return container + + def generate_yaml(self): + container = { + "name": self.name, + "image": self.image, + "imagePullPolicy": "Always", + "volumeMounts": [], + "resources": { + "requests": {}, + "limits": {}, + }, + } + + if self.command is not None: + container["command"] = self.command + + if self.args is not None: + container["args"] = self.args + + if not ( + self.gpu_type is None or self.gpu_limit is None or self.gpu_product is None + ): + container["resources"] = {"limits": {f"{self.gpu_type}": self.gpu_limit}} + + container = self._add_shm_size(container) + container = self._add_env_vars(container) + container = self._add_volume_mounts(container) + container = self._add_privileged_security_context(container) + + if ( + self.cpu_request is not None + or self.ram_request is not None + or self.storage_request is not None + ): + if "resources" not in container: + container["resources"] = {"requests": {}} + + if "requests" not in container["resources"]: + container["resources"]["requests"] = {} + + if self.cpu_request is not None: + container["resources"]["requests"]["cpu"] = self.cpu_request + container["resources"]["limits"]["cpu"] = self.cpu_request + + if self.ram_request is not None: + container["resources"]["requests"]["memory"] = self.ram_request + container["resources"]["limits"]["memory"] = self.ram_request + + if self.storage_request is not None: + container["resources"]["requests"]["storage"] = self.storage_request + + if self.gpu_type is not None and self.gpu_limit is not None: + container["resources"]["limits"][f"{self.gpu_type}"] = self.gpu_limit + + pod = { + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": self.name, + "labels": self.labels, # Add labels here + "annotations": self.annotations, # Add metadata here + }, + "spec": { + "containers": [container], + "restartPolicy": self.restart_policy, + "volumes": [], + }, + } + + if self.namespace: + pod["metadata"]["namespace"] = self.namespace + + if not ( + self.gpu_type is None or self.gpu_limit is None or self.gpu_product is None + ): + pod["spec"]["nodeSelector"] = {f"{self.gpu_type}.product": self.gpu_product} + + # Add shared memory volume if shm_size is set + if self.shm_size: + pod["spec"]["volumes"].append( + { + "name": "dshm", + "emptyDir": { + "medium": "Memory", + "sizeLimit": self.shm_size, + }, + } + ) + + # Add volumes for the volume mounts + if self.volume_mounts: + for mount_name, mount_data in self.volume_mounts.items(): + volume = {"name": mount_name} + + if "pvc" in mount_data: + volume["persistentVolumeClaim"] = {"claimName": mount_data["pvc"]} + elif "emptyDir" in mount_data: + volume["emptyDir"] = {} + # Add more volume types here if needed + if "server" in mount_data: + volume["nfs"] = { + "server": mount_data["server"], + "path": mount_data["path"], + } + + pod["spec"]["volumes"].append(volume) + + if self.image_pull_secret: + pod["spec"]["imagePullSecrets"] = [{"name": self.image_pull_secret}] + + return yaml.dump(pod) + + def run(self): + config.load_kube_config() + + pod_yaml = self.generate_yaml() + + # Save the generated YAML to a temporary file + with open("temp_pod.yaml", "w") as temp_file: + temp_file.write(pod_yaml) + + # Run the kubectl command with --validate=False + cmd = ["kubectl", "apply", "-f", "temp_pod.yaml"] + + try: + result = subprocess.run( + cmd, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + # Remove the temporary file + os.remove("temp_pod.yaml") + return result.returncode + except subprocess.CalledProcessError as e: + logger.info( + f"Command '{' '.join(cmd)}' failed with return code {e.returncode}." + ) + logger.info(f"Stdout:\n{e.stdout}") + logger.info(f"Stderr:\n{e.stderr}") + # Remove the temporary file + os.remove("temp_pod.yaml") + return e.returncode # return the exit code + except Exception as e: + logger.exception( + f"An unexpected error occurred while running '{' '.join(cmd)}'." + ) # This logs the traceback too + # Remove the temporary file + os.remove("temp_pod.yaml") + return 1 # return the exit code