From d6af0dcaf81015f30b327dbbfb308bbdec7815ec Mon Sep 17 00:00:00 2001 From: Gautier Dagan Date: Tue, 26 Mar 2024 12:51:07 +0000 Subject: [PATCH 1/2] export POD_NAME as env variable --- kubejobs/jobs.py | 66 +++++++++++++++++++----------------------------- 1 file changed, 26 insertions(+), 40 deletions(-) diff --git a/kubejobs/jobs.py b/kubejobs/jobs.py index 6b3ef6e..750e0ba 100644 --- a/kubejobs/jobs.py +++ b/kubejobs/jobs.py @@ -48,9 +48,7 @@ def fetch_user_info(): group_ids = os.getgrouplist(os.getlogin(), pw_entry.pw_gid) # Get group names from group IDs - user_info["groups"] = " ".join( - [grp.getgrgid(gid).gr_name for gid in group_ids] - ) + user_info["groups"] = " ".join([grp.getgrgid(gid).gr_name for gid in group_ids]) return user_info @@ -62,9 +60,10 @@ class GPU_PRODUCT: NVIDIA_A100_SXM4_40GB_MIG_1G_5GB = "NVIDIA-A100-SXM4-40GB-MIG-1g.5gb" NVIDIA_H100_80GB = "NVIDIA-H100-80GB-HBM3" + class KueueQueue: INFORMATICS = "informatics-user-queue" - + class KubernetesJob: """ @@ -130,16 +129,8 @@ def __init__( self.image = image self.command = command self.args = args - self.cpu_request = ( - cpu_request - if cpu_request - else 12 * gpu_limit - ) - self.ram_request = ( - ram_request - if ram_request - else f"{80 * gpu_limit}G" - ) + self.cpu_request = cpu_request if cpu_request else 12 * gpu_limit + self.ram_request = ram_request if ram_request else f"{80 * gpu_limit}G" self.storage_request = storage_request self.gpu_type = gpu_type self.gpu_product = gpu_product @@ -170,7 +161,10 @@ def __init__( self.user_email = user_email # This is now a required field. self.kueue_queue_name = kueue_queue_name - self.labels = {"eidf/user": self.user_name, "kueue.x-k8s.io/queue-name": self.kueue_queue_name} + self.labels = { + "eidf/user": self.user_name, + "kueue.x-k8s.io/queue-name": self.kueue_queue_name, + } if labels is not None: self.labels.update(labels) @@ -192,16 +186,14 @@ def __init__( def _add_shm_size(self, container: dict): """Adds shared memory volume if shm_size is set.""" if self.shm_size: - container["volumeMounts"].append( - {"name": "dshm", "mountPath": "/dev/shm"} - ) + container["volumeMounts"].append({"name": "dshm", "mountPath": "/dev/shm"}) return container def _add_env_vars(self, container: dict): """Adds secret and normal environment variables to the container.""" + container["env"] = [] if self.secret_env_vars or self.env_vars: - container["env"] = [] if self.secret_env_vars: for key, value in self.secret_env_vars.items(): container["env"].append( @@ -220,6 +212,14 @@ def _add_env_vars(self, container: dict): for key, value in self.env_vars.items(): container["env"].append({"name": key, "value": value}) + # Always export the POD_NAME environment variable + container["env"].append( + { + "name": "POD_NAME", + "valueFrom": {"fieldRef": {"fieldPath": "metadata.name"}}, + } + ) + return container def _add_volume_mounts(self, container: dict): @@ -263,13 +263,9 @@ def generate_yaml(self): container["args"] = self.args if not ( - self.gpu_type is None - or self.gpu_limit is None - or self.gpu_product is None + self.gpu_type is None or self.gpu_limit is None or self.gpu_product is None ): - container["resources"] = { - "limits": {f"{self.gpu_type}": self.gpu_limit} - } + container["resources"] = {"limits": {f"{self.gpu_type}": self.gpu_limit}} container = self._add_shm_size(container) container = self._add_env_vars(container) @@ -296,14 +292,10 @@ def generate_yaml(self): container["resources"]["limits"]["memory"] = self.ram_request if self.storage_request is not None: - container["resources"]["requests"][ - "storage" - ] = self.storage_request + container["resources"]["requests"]["storage"] = self.storage_request if self.gpu_type is not None and self.gpu_limit is not None: - container["resources"]["limits"][ - f"{self.gpu_type}" - ] = self.gpu_limit + container["resources"]["limits"][f"{self.gpu_type}"] = self.gpu_limit job = { "apiVersion": "batch/v1", @@ -339,9 +331,7 @@ def generate_yaml(self): job["metadata"]["namespace"] = self.namespace if not ( - self.gpu_type is None - or self.gpu_limit is None - or self.gpu_product is None + self.gpu_type is None or self.gpu_limit is None or self.gpu_product is None ): job["spec"]["template"]["spec"]["nodeSelector"] = { f"{self.gpu_type}.product": self.gpu_product @@ -364,9 +354,7 @@ def generate_yaml(self): volume = {"name": mount_name} if "pvc" in mount_data: - volume["persistentVolumeClaim"] = { - "claimName": mount_data["pvc"] - } + volume["persistentVolumeClaim"] = {"claimName": mount_data["pvc"]} elif "emptyDir" in mount_data: volume["emptyDir"] = {} # Add more volume types here if needed @@ -540,9 +528,7 @@ def create_pv( """ if pv_type not in ["local", "node"]: - raise ValueError( - "pv_type must be either 'local' or 'gcePersistentDisk'" - ) + raise ValueError("pv_type must be either 'local' or 'gcePersistentDisk'") if pv_type == "local" and not local_path: raise ValueError("local_path must be provided when pv_type is 'local'") From 5cef94a483bb62a6dfddb79a4cbdd4ecf948972a Mon Sep 17 00:00:00 2001 From: Gautier Dagan Date: Tue, 26 Mar 2024 12:51:59 +0000 Subject: [PATCH 2/2] remove unused imports --- kubejobs/jobs.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/kubejobs/jobs.py b/kubejobs/jobs.py index 750e0ba..a28b128 100644 --- a/kubejobs/jobs.py +++ b/kubejobs/jobs.py @@ -1,5 +1,3 @@ -import datetime -import enum import grp import json import logging