Skip to content

Commit

Permalink
* feat(kubejobs): add GPU count actual to pod info and modify user jo…
Browse files Browse the repository at this point in the history
…b management

*
* - refactor(manage_user_jobs.py): change default value of show_job_status to False
* - refactor(manage_user_jobs.py): change user annotation key from 'user' to 'username'
* - feat(wandb_monitor.py): add new file for Weights & Biases monitoring
* - feat(wandb_pod_info.py): add new file for Weights & Biases pod information
* - feat(wandb_pod_injection.py): add new file for Weights & Biases pod injection
* - feat(web_pod_info.py): add GPU count actual to pod information display
  • Loading branch information
AntreasAntoniou committed Oct 30, 2023
1 parent 022bee1 commit 7bdc713
Show file tree
Hide file tree
Showing 5 changed files with 324 additions and 5 deletions.
4 changes: 2 additions & 2 deletions kubejobs/manage_user_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def list_or_delete_jobs_by_user(
username: str,
term: str,
delete: bool = False,
show_job_status: bool = True,
show_job_status: bool = False,
) -> None:
# Fetch all jobs from a specific Kubernetes namespace
get_jobs_cmd = f"kubectl get jobs -n {namespace} -o json"
Expand Down Expand Up @@ -164,7 +164,7 @@ def list_or_delete_jobs_by_user(
for item in tqdm(jobs_json["items"]):
annotations = item["metadata"].get("annotations", {})
if (
annotations.get("user", "") == username
annotations.get("username", "") == username
and term in item["metadata"]["name"]
):
filtered_jobs.append(item)
Expand Down
31 changes: 31 additions & 0 deletions kubejobs/wandb_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import json
import os
import time

import wandb

# Path to the metadata JSON file
metadata_path = os.getenv("WANDB_METADATA_PATH")

# Load metadata from the specified JSON file
metadata = {}
if metadata_path:
try:
with open(metadata_path, "r") as f:
metadata = json.load(f)
except Exception as e:
print(f"Error loading JSON from {metadata_path}: {e}")
metadata = {}

# Initialize the W&B run with metadata
wandb.init(
project=os.getenv("WANDB_PROJECT"),
entity=os.getenv("WANDB_ENTITY"),
name=os.getenv("POD_NAME"),
config=metadata,
)

# Run an infinite loop that logs a heartbeat every 10 seconds
while True:
wandb.log({"heartbeat": 1})
time.sleep(10)
163 changes: 163 additions & 0 deletions kubejobs/wandb_pod_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import json
import subprocess
import time
from collections import defaultdict
from datetime import datetime, timezone

import fire
import numpy as np
import pandas as pd
import rich
import wandb
from tqdm.auto import tqdm


def convert_to_gigabytes(value: str) -> float:
"""
Convert the given storage/memory value to base Gigabytes (GB).
Args:
value (str): Input storage/memory value with units. E.g., '20G', '20Gi', '2000M', '2000Mi'
Returns:
float: The value converted to Gigabytes (GB).
"""
# Define conversion factors
factor_gb = {
"G": 1,
"Gi": 1 / 1.073741824,
"M": 1 / 1024,
"Mi": 1 / (1024 * 1.073741824),
}

# Find the numeric and unit parts of the input
numeric_part = "".join(filter(str.isdigit, value))
unit_part = "".join(filter(str.isalpha, value))

# Convert to Gigabytes (GB)
if unit_part in factor_gb.keys():
return float(numeric_part) * factor_gb[unit_part]
elif value == "N/A":
return -1
else:
raise ValueError(
f"Unknown unit {unit_part}. Supported units are {list(factor_gb.keys())}."
)


def parse_iso_time(time_str: str) -> datetime:
return datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ").replace(
tzinfo=timezone.utc
)


def time_diff_to_human_readable(start: datetime, end: datetime) -> str:
diff = end - start
minutes, seconds = divmod(diff.seconds, 60)
hours, minutes = divmod(minutes, 60)
return f"{diff.days}d {hours}h {minutes}m {seconds}s"


def run_command(command: str) -> (str, str):
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
shell=True,
)
return result.stdout, result.stderr


def ssh_into_pod_and_run_command(
pod_name: str, namespace: str, command: str
) -> str:
ssh_command = f"kubectl exec -n {namespace} {pod_name} -- {command}"
stdout, stderr = run_command(ssh_command)
if stderr:
print(f"Error executing command in pod {pod_name}: {stderr}")
return stdout


def fetch_and_render_pod_info(
namespace="informatics",
refresh_interval=10,
wandb_project="gpu_monitoring",
):
wandb.login()

runs = {} # Store the wandb runs

while True:
get_pods_cmd = f"kubectl get pods -n {namespace} -o json"
pods_output, pods_error = run_command(get_pods_cmd)
pod_data = json.loads(pods_output)

current_time = datetime.now(timezone.utc)

for pod in tqdm(pod_data["items"]):
metadata = pod["metadata"]
spec = pod.get("spec", {})
status = pod["status"]

name = metadata["name"]
namespace = metadata["namespace"]
uid = metadata["uid"]

username = metadata.get("annotations", {}).get("username", "N/A")

pod_status = status["phase"]
node = spec.get("nodeName", "N/A")

container = spec.get("containers", [{}])[0]
image = container.get("image", "N/A")

resources = container.get("resources", {})
cpu_request = resources.get("requests", {}).get("cpu", "0")
memory_request = resources.get("requests", {}).get("memory", "N/A")
gpu_type = spec.get("nodeSelector", {}).get(
"nvidia.com/gpu.product", "N/A"
)
gpu_limit = resources.get("limits", {}).get("nvidia.com/gpu", "0")

creation_time = parse_iso_time(metadata["creationTimestamp"])
age = time_diff_to_human_readable(creation_time, current_time)

# SSH into the pod and get GPU utilization details
gpu_usage_output = ssh_into_pod_and_run_command(
name,
namespace,
"nvidia-smi --query-gpu=memory.total,memory.used,utilization.gpu --format=csv,noheader,nounits",
)

# Process the GPU data and log it to wandb
lines = gpu_usage_output.splitlines()
for line in lines:
gpu_memory_total, gpu_memory_used, gpu_utilization = map(
float, line.split(", ")
)

# Initialize a new wandb run if we haven't seen this pod before
if name not in runs:
runs[name] = wandb.init(
project=wandb_project, name=name, reinit=True
)

# Log the GPU data to wandb
runs[name].log(
{
"GPU Memory Total (GB)": gpu_memory_total,
"GPU Memory Used (GB)": gpu_memory_used,
"GPU Utilization (%)": gpu_utilization,
}
)

time.sleep(refresh_interval)

# Finish all wandb runs when exiting the loop
for run in runs.values():
run.finish()


if __name__ == "__main__":
fire.Fire(fetch_and_render_pod_info)
122 changes: 122 additions & 0 deletions kubejobs/wandb_pod_injection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import json
import os
import subprocess
import tempfile
import time
from collections import defaultdict
from datetime import datetime, timezone

import fire
import numpy as np
import pandas as pd
import rich
from tqdm.auto import tqdm


def run_command(command: str) -> (str, str):
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
shell=True,
)
return result.stdout, result.stderr


def ssh_into_pod_and_run_command(
pod_name: str, namespace: str, command: str
) -> str:
ssh_command = f"kubectl exec -n {namespace} {pod_name} -- {command}"
stdout, stderr = run_command(ssh_command)
if stderr:
print(f"Error executing command in pod {pod_name}: {stderr}")
return stdout


def create_and_copy_wandb_script(pod_name, namespace, metadata):
# Convert the metadata dictionary to a JSON string
metadata_json = json.dumps(metadata)
metadata_filename = f"/tmp/wandb_metadata_{pod_name}.json"

# Create the metadata JSON file locally
with open(metadata_filename, "w") as f:
json.dump(metadata, f)

# Copy the metadata JSON file to the pod
copy_metadata_command = [
"kubectl",
"cp",
metadata_filename,
f"{namespace}/{pod_name}:{metadata_filename}",
]
subprocess.run(copy_metadata_command, check=True)

# Copy the script to the pod
copy_script_command = [
"kubectl",
"cp",
"kubejobs/wandb_monitor.py",
f"{namespace}/{pod_name}:/tmp/wandb_monitor.py",
]
subprocess.run(copy_script_command, check=True)

# Define the command to install wandb and start the monitoring script
exec_command = (
f"pip install wandb && "
f"export WANDB_API_KEY='{os.getenv('WANDB_API_KEY')}' && "
f"export WANDB_ENTITY='{os.getenv('WANDB_ENTITY')}' && "
f"export WANDB_PROJECT='{os.getenv('WANDB_PROJECT')}' && "
f"export POD_NAME='{pod_name}' && "
f"export WANDB_METADATA_PATH='{metadata_filename}' && "
f"python /tmp/wandb_monitor.py"
)

# Execute the command inside the pod
exec_command_full = [
"kubectl",
"exec",
"-n",
namespace,
pod_name,
"--",
"/bin/bash",
"-c",
exec_command,
]
subprocess.Popen(
exec_command_full,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)


def fetch_and_render_pod_info(
namespace="informatics",
refresh_interval=10,
wandb_project="gpu_monitoring",
):
name_set = set()

get_pods_cmd = f"kubectl get pods -n {namespace} -o json"
pods_output, pods_error = run_command(get_pods_cmd)
pod_data = json.loads(pods_output)

current_time = datetime.now(timezone.utc)

for pod in tqdm(pod_data["items"]):
metadata = pod["metadata"]
spec = pod.get("spec", {})
status = pod["status"]

name = metadata["name"]
namespace = metadata["namespace"]
if name in name_set:
continue
name_set.add(name)
create_and_copy_wandb_script(name, namespace, pod)


if __name__ == "__main__":
fire.Fire(fetch_and_render_pod_info)
9 changes: 6 additions & 3 deletions kubejobs/web_pod_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def fetch_and_render_pod_info(
"GPU Memory Used",
"GPU Memory Total",
"GPU Utilization",
"GPU Count Actual",
"Creation Time",
"Age",
]
Expand Down Expand Up @@ -185,15 +186,16 @@ def fetch_and_render_pod_info(
age = time_diff_to_human_readable(creation_time, current_time)

# SSH into the pod and get GPU utilization details

gpu_count_actual = 0
for _ in range(samples_per_gpu):
gpu_usage_output = ssh_into_pod_and_run_command(
name,
namespace,
"nvidia-smi --query-gpu=memory.total,memory.used,utilization.gpu --format=csv,noheader,nounits",
)

for line in gpu_usage_output.splitlines():
lines = gpu_usage_output.splitlines()
gpu_count_actual = len(lines)
for line in lines:
(
gpu_memory_total,
gpu_memory_used,
Expand Down Expand Up @@ -251,6 +253,7 @@ def fetch_and_render_pod_info(
gpu_memory_used,
gpu_memory_total,
gpu_utilization,
gpu_count_actual,
str(creation_time),
age,
]
Expand Down

0 comments on commit 7bdc713

Please sign in to comment.