Skip to content

Commit

Permalink
Merge pull request #12 from gautierdag/pod-name
Browse files Browse the repository at this point in the history
Always export POD_NAME as env variable
  • Loading branch information
AntreasAntoniou authored Mar 26, 2024
2 parents a86a248 + 5cef94a commit 55610b9
Showing 1 changed file with 26 additions and 42 deletions.
68 changes: 26 additions & 42 deletions kubejobs/jobs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import datetime
import enum
import grp
import json
import logging
Expand Down Expand Up @@ -48,9 +46,7 @@ def fetch_user_info():
group_ids = os.getgrouplist(os.getlogin(), pw_entry.pw_gid)

# Get group names from group IDs
user_info["groups"] = " ".join(
[grp.getgrgid(gid).gr_name for gid in group_ids]
)
user_info["groups"] = " ".join([grp.getgrgid(gid).gr_name for gid in group_ids])

return user_info

Expand All @@ -62,9 +58,10 @@ class GPU_PRODUCT:
NVIDIA_A100_SXM4_40GB_MIG_1G_5GB = "NVIDIA-A100-SXM4-40GB-MIG-1g.5gb"
NVIDIA_H100_80GB = "NVIDIA-H100-80GB-HBM3"


class KueueQueue:
INFORMATICS = "informatics-user-queue"


class KubernetesJob:
"""
Expand Down Expand Up @@ -130,16 +127,8 @@ def __init__(
self.image = image
self.command = command
self.args = args
self.cpu_request = (
cpu_request
if cpu_request
else 12 * gpu_limit
)
self.ram_request = (
ram_request
if ram_request
else f"{80 * gpu_limit}G"
)
self.cpu_request = cpu_request if cpu_request else 12 * gpu_limit
self.ram_request = ram_request if ram_request else f"{80 * gpu_limit}G"
self.storage_request = storage_request
self.gpu_type = gpu_type
self.gpu_product = gpu_product
Expand Down Expand Up @@ -170,7 +159,10 @@ def __init__(
self.user_email = user_email # This is now a required field.
self.kueue_queue_name = kueue_queue_name

self.labels = {"eidf/user": self.user_name, "kueue.x-k8s.io/queue-name": self.kueue_queue_name}
self.labels = {
"eidf/user": self.user_name,
"kueue.x-k8s.io/queue-name": self.kueue_queue_name,
}

if labels is not None:
self.labels.update(labels)
Expand All @@ -192,16 +184,14 @@ def __init__(
def _add_shm_size(self, container: dict):
"""Adds shared memory volume if shm_size is set."""
if self.shm_size:
container["volumeMounts"].append(
{"name": "dshm", "mountPath": "/dev/shm"}
)
container["volumeMounts"].append({"name": "dshm", "mountPath": "/dev/shm"})
return container

def _add_env_vars(self, container: dict):
"""Adds secret and normal environment variables to the
container."""
container["env"] = []
if self.secret_env_vars or self.env_vars:
container["env"] = []
if self.secret_env_vars:
for key, value in self.secret_env_vars.items():
container["env"].append(
Expand All @@ -220,6 +210,14 @@ def _add_env_vars(self, container: dict):
for key, value in self.env_vars.items():
container["env"].append({"name": key, "value": value})

# Always export the POD_NAME environment variable
container["env"].append(
{
"name": "POD_NAME",
"valueFrom": {"fieldRef": {"fieldPath": "metadata.name"}},
}
)

return container

def _add_volume_mounts(self, container: dict):
Expand Down Expand Up @@ -263,13 +261,9 @@ def generate_yaml(self):
container["args"] = self.args

if not (
self.gpu_type is None
or self.gpu_limit is None
or self.gpu_product is None
self.gpu_type is None or self.gpu_limit is None or self.gpu_product is None
):
container["resources"] = {
"limits": {f"{self.gpu_type}": self.gpu_limit}
}
container["resources"] = {"limits": {f"{self.gpu_type}": self.gpu_limit}}

container = self._add_shm_size(container)
container = self._add_env_vars(container)
Expand All @@ -296,14 +290,10 @@ def generate_yaml(self):
container["resources"]["limits"]["memory"] = self.ram_request

if self.storage_request is not None:
container["resources"]["requests"][
"storage"
] = self.storage_request
container["resources"]["requests"]["storage"] = self.storage_request

if self.gpu_type is not None and self.gpu_limit is not None:
container["resources"]["limits"][
f"{self.gpu_type}"
] = self.gpu_limit
container["resources"]["limits"][f"{self.gpu_type}"] = self.gpu_limit

job = {
"apiVersion": "batch/v1",
Expand Down Expand Up @@ -337,9 +327,7 @@ def generate_yaml(self):
job["metadata"]["namespace"] = self.namespace

if not (
self.gpu_type is None
or self.gpu_limit is None
or self.gpu_product is None
self.gpu_type is None or self.gpu_limit is None or self.gpu_product is None
):
job["spec"]["template"]["spec"]["nodeSelector"] = {
f"{self.gpu_type}.product": self.gpu_product
Expand All @@ -362,9 +350,7 @@ def generate_yaml(self):
volume = {"name": mount_name}

if "pvc" in mount_data:
volume["persistentVolumeClaim"] = {
"claimName": mount_data["pvc"]
}
volume["persistentVolumeClaim"] = {"claimName": mount_data["pvc"]}
elif "emptyDir" in mount_data:
volume["emptyDir"] = {}
# Add more volume types here if needed
Expand Down Expand Up @@ -542,9 +528,7 @@ def create_pv(
"""

if pv_type not in ["local", "node"]:
raise ValueError(
"pv_type must be either 'local' or 'gcePersistentDisk'"
)
raise ValueError("pv_type must be either 'local' or 'gcePersistentDisk'")

if pv_type == "local" and not local_path:
raise ValueError("local_path must be provided when pv_type is 'local'")
Expand Down

0 comments on commit 55610b9

Please sign in to comment.