From 7bdc713fb30cbe4712c3e1b5ae30f982aa1c822d Mon Sep 17 00:00:00 2001 From: AntreasAntoniou Date: Mon, 30 Oct 2023 22:49:00 +0000 Subject: [PATCH] * feat(kubejobs): add GPU count actual to pod info and modify user job management * * - refactor(manage_user_jobs.py): change default value of show_job_status to False * - refactor(manage_user_jobs.py): change user annotation key from 'user' to 'username' * - feat(wandb_monitor.py): add new file for Weights & Biases monitoring * - feat(wandb_pod_info.py): add new file for Weights & Biases pod information * - feat(wandb_pod_injection.py): add new file for Weights & Biases pod injection * - feat(web_pod_info.py): add GPU count actual to pod information display --- kubejobs/manage_user_jobs.py | 4 +- kubejobs/wandb_monitor.py | 31 ++++++ kubejobs/wandb_pod_info.py | 163 ++++++++++++++++++++++++++++++++ kubejobs/wandb_pod_injection.py | 122 ++++++++++++++++++++++++ kubejobs/web_pod_info.py | 9 +- 5 files changed, 324 insertions(+), 5 deletions(-) create mode 100644 kubejobs/wandb_monitor.py create mode 100644 kubejobs/wandb_pod_info.py create mode 100644 kubejobs/wandb_pod_injection.py diff --git a/kubejobs/manage_user_jobs.py b/kubejobs/manage_user_jobs.py index ce59482..db337d1 100644 --- a/kubejobs/manage_user_jobs.py +++ b/kubejobs/manage_user_jobs.py @@ -111,7 +111,7 @@ def list_or_delete_jobs_by_user( username: str, term: str, delete: bool = False, - show_job_status: bool = True, + show_job_status: bool = False, ) -> None: # Fetch all jobs from a specific Kubernetes namespace get_jobs_cmd = f"kubectl get jobs -n {namespace} -o json" @@ -164,7 +164,7 @@ def list_or_delete_jobs_by_user( for item in tqdm(jobs_json["items"]): annotations = item["metadata"].get("annotations", {}) if ( - annotations.get("user", "") == username + annotations.get("username", "") == username and term in item["metadata"]["name"] ): filtered_jobs.append(item) diff --git a/kubejobs/wandb_monitor.py b/kubejobs/wandb_monitor.py new file mode 100644 index 0000000..af40814 --- /dev/null +++ b/kubejobs/wandb_monitor.py @@ -0,0 +1,31 @@ +import json +import os +import time + +import wandb + +# Path to the metadata JSON file +metadata_path = os.getenv("WANDB_METADATA_PATH") + +# Load metadata from the specified JSON file +metadata = {} +if metadata_path: + try: + with open(metadata_path, "r") as f: + metadata = json.load(f) + except Exception as e: + print(f"Error loading JSON from {metadata_path}: {e}") + metadata = {} + +# Initialize the W&B run with metadata +wandb.init( + project=os.getenv("WANDB_PROJECT"), + entity=os.getenv("WANDB_ENTITY"), + name=os.getenv("POD_NAME"), + config=metadata, +) + +# Run an infinite loop that logs a heartbeat every 10 seconds +while True: + wandb.log({"heartbeat": 1}) + time.sleep(10) diff --git a/kubejobs/wandb_pod_info.py b/kubejobs/wandb_pod_info.py new file mode 100644 index 0000000..5da726e --- /dev/null +++ b/kubejobs/wandb_pod_info.py @@ -0,0 +1,163 @@ +import json +import subprocess +import time +from collections import defaultdict +from datetime import datetime, timezone + +import fire +import numpy as np +import pandas as pd +import rich +import wandb +from tqdm.auto import tqdm + + +def convert_to_gigabytes(value: str) -> float: + """ + Convert the given storage/memory value to base Gigabytes (GB). + + Args: + value (str): Input storage/memory value with units. E.g., '20G', '20Gi', '2000M', '2000Mi' + + Returns: + float: The value converted to Gigabytes (GB). + """ + # Define conversion factors + factor_gb = { + "G": 1, + "Gi": 1 / 1.073741824, + "M": 1 / 1024, + "Mi": 1 / (1024 * 1.073741824), + } + + # Find the numeric and unit parts of the input + numeric_part = "".join(filter(str.isdigit, value)) + unit_part = "".join(filter(str.isalpha, value)) + + # Convert to Gigabytes (GB) + if unit_part in factor_gb.keys(): + return float(numeric_part) * factor_gb[unit_part] + elif value == "N/A": + return -1 + else: + raise ValueError( + f"Unknown unit {unit_part}. Supported units are {list(factor_gb.keys())}." + ) + + +def parse_iso_time(time_str: str) -> datetime: + return datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ").replace( + tzinfo=timezone.utc + ) + + +def time_diff_to_human_readable(start: datetime, end: datetime) -> str: + diff = end - start + minutes, seconds = divmod(diff.seconds, 60) + hours, minutes = divmod(minutes, 60) + return f"{diff.days}d {hours}h {minutes}m {seconds}s" + + +def run_command(command: str) -> (str, str): + result = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + shell=True, + ) + return result.stdout, result.stderr + + +def ssh_into_pod_and_run_command( + pod_name: str, namespace: str, command: str +) -> str: + ssh_command = f"kubectl exec -n {namespace} {pod_name} -- {command}" + stdout, stderr = run_command(ssh_command) + if stderr: + print(f"Error executing command in pod {pod_name}: {stderr}") + return stdout + + +def fetch_and_render_pod_info( + namespace="informatics", + refresh_interval=10, + wandb_project="gpu_monitoring", +): + wandb.login() + + runs = {} # Store the wandb runs + + while True: + get_pods_cmd = f"kubectl get pods -n {namespace} -o json" + pods_output, pods_error = run_command(get_pods_cmd) + pod_data = json.loads(pods_output) + + current_time = datetime.now(timezone.utc) + + for pod in tqdm(pod_data["items"]): + metadata = pod["metadata"] + spec = pod.get("spec", {}) + status = pod["status"] + + name = metadata["name"] + namespace = metadata["namespace"] + uid = metadata["uid"] + + username = metadata.get("annotations", {}).get("username", "N/A") + + pod_status = status["phase"] + node = spec.get("nodeName", "N/A") + + container = spec.get("containers", [{}])[0] + image = container.get("image", "N/A") + + resources = container.get("resources", {}) + cpu_request = resources.get("requests", {}).get("cpu", "0") + memory_request = resources.get("requests", {}).get("memory", "N/A") + gpu_type = spec.get("nodeSelector", {}).get( + "nvidia.com/gpu.product", "N/A" + ) + gpu_limit = resources.get("limits", {}).get("nvidia.com/gpu", "0") + + creation_time = parse_iso_time(metadata["creationTimestamp"]) + age = time_diff_to_human_readable(creation_time, current_time) + + # SSH into the pod and get GPU utilization details + gpu_usage_output = ssh_into_pod_and_run_command( + name, + namespace, + "nvidia-smi --query-gpu=memory.total,memory.used,utilization.gpu --format=csv,noheader,nounits", + ) + + # Process the GPU data and log it to wandb + lines = gpu_usage_output.splitlines() + for line in lines: + gpu_memory_total, gpu_memory_used, gpu_utilization = map( + float, line.split(", ") + ) + + # Initialize a new wandb run if we haven't seen this pod before + if name not in runs: + runs[name] = wandb.init( + project=wandb_project, name=name, reinit=True + ) + + # Log the GPU data to wandb + runs[name].log( + { + "GPU Memory Total (GB)": gpu_memory_total, + "GPU Memory Used (GB)": gpu_memory_used, + "GPU Utilization (%)": gpu_utilization, + } + ) + + time.sleep(refresh_interval) + + # Finish all wandb runs when exiting the loop + for run in runs.values(): + run.finish() + + +if __name__ == "__main__": + fire.Fire(fetch_and_render_pod_info) diff --git a/kubejobs/wandb_pod_injection.py b/kubejobs/wandb_pod_injection.py new file mode 100644 index 0000000..add6fda --- /dev/null +++ b/kubejobs/wandb_pod_injection.py @@ -0,0 +1,122 @@ +import json +import os +import subprocess +import tempfile +import time +from collections import defaultdict +from datetime import datetime, timezone + +import fire +import numpy as np +import pandas as pd +import rich +from tqdm.auto import tqdm + + +def run_command(command: str) -> (str, str): + result = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + shell=True, + ) + return result.stdout, result.stderr + + +def ssh_into_pod_and_run_command( + pod_name: str, namespace: str, command: str +) -> str: + ssh_command = f"kubectl exec -n {namespace} {pod_name} -- {command}" + stdout, stderr = run_command(ssh_command) + if stderr: + print(f"Error executing command in pod {pod_name}: {stderr}") + return stdout + + +def create_and_copy_wandb_script(pod_name, namespace, metadata): + # Convert the metadata dictionary to a JSON string + metadata_json = json.dumps(metadata) + metadata_filename = f"/tmp/wandb_metadata_{pod_name}.json" + + # Create the metadata JSON file locally + with open(metadata_filename, "w") as f: + json.dump(metadata, f) + + # Copy the metadata JSON file to the pod + copy_metadata_command = [ + "kubectl", + "cp", + metadata_filename, + f"{namespace}/{pod_name}:{metadata_filename}", + ] + subprocess.run(copy_metadata_command, check=True) + + # Copy the script to the pod + copy_script_command = [ + "kubectl", + "cp", + "kubejobs/wandb_monitor.py", + f"{namespace}/{pod_name}:/tmp/wandb_monitor.py", + ] + subprocess.run(copy_script_command, check=True) + + # Define the command to install wandb and start the monitoring script + exec_command = ( + f"pip install wandb && " + f"export WANDB_API_KEY='{os.getenv('WANDB_API_KEY')}' && " + f"export WANDB_ENTITY='{os.getenv('WANDB_ENTITY')}' && " + f"export WANDB_PROJECT='{os.getenv('WANDB_PROJECT')}' && " + f"export POD_NAME='{pod_name}' && " + f"export WANDB_METADATA_PATH='{metadata_filename}' && " + f"python /tmp/wandb_monitor.py" + ) + + # Execute the command inside the pod + exec_command_full = [ + "kubectl", + "exec", + "-n", + namespace, + pod_name, + "--", + "/bin/bash", + "-c", + exec_command, + ] + subprocess.Popen( + exec_command_full, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + +def fetch_and_render_pod_info( + namespace="informatics", + refresh_interval=10, + wandb_project="gpu_monitoring", +): + name_set = set() + + get_pods_cmd = f"kubectl get pods -n {namespace} -o json" + pods_output, pods_error = run_command(get_pods_cmd) + pod_data = json.loads(pods_output) + + current_time = datetime.now(timezone.utc) + + for pod in tqdm(pod_data["items"]): + metadata = pod["metadata"] + spec = pod.get("spec", {}) + status = pod["status"] + + name = metadata["name"] + namespace = metadata["namespace"] + if name in name_set: + continue + name_set.add(name) + create_and_copy_wandb_script(name, namespace, pod) + + +if __name__ == "__main__": + fire.Fire(fetch_and_render_pod_info) diff --git a/kubejobs/web_pod_info.py b/kubejobs/web_pod_info.py index 68c130b..62c4da9 100644 --- a/kubejobs/web_pod_info.py +++ b/kubejobs/web_pod_info.py @@ -140,6 +140,7 @@ def fetch_and_render_pod_info( "GPU Memory Used", "GPU Memory Total", "GPU Utilization", + "GPU Count Actual", "Creation Time", "Age", ] @@ -185,15 +186,16 @@ def fetch_and_render_pod_info( age = time_diff_to_human_readable(creation_time, current_time) # SSH into the pod and get GPU utilization details - + gpu_count_actual = 0 for _ in range(samples_per_gpu): gpu_usage_output = ssh_into_pod_and_run_command( name, namespace, "nvidia-smi --query-gpu=memory.total,memory.used,utilization.gpu --format=csv,noheader,nounits", ) - - for line in gpu_usage_output.splitlines(): + lines = gpu_usage_output.splitlines() + gpu_count_actual = len(lines) + for line in lines: ( gpu_memory_total, gpu_memory_used, @@ -251,6 +253,7 @@ def fetch_and_render_pod_info( gpu_memory_used, gpu_memory_total, gpu_utilization, + gpu_count_actual, str(creation_time), age, ]