diff --git a/kalavai_client/cli.py b/kalavai_client/cli.py index d6077ca..db7e7b7 100644 --- a/kalavai_client/cli.py +++ b/kalavai_client/cli.py @@ -83,7 +83,8 @@ CLUSTER = k3sCluster( kube_version=KUBE_VERSION, flannel_iface=DEFAULT_FLANNEL_IFACE, - kubeconfig_file=USER_KUBECONFIG_FILE + kubeconfig_file=USER_KUBECONFIG_FILE, + poolconfig_file=USER_LOCAL_SERVER_FILE ) @@ -111,6 +112,22 @@ def fetch_remote_templates(): remote_load=True) return templates +def pre_join_check(node_name, server_url, server_key): + # check with the server that we can connect + try: + nodes = request_to_server( + force_url=server_url, + force_key=server_key, + method="get", + endpoint="/v1/get_nodes", + data={"node_names": [node_name]}, + server_creds=USER_LOCAL_SERVER_FILE + ) + return node_name not in nodes.keys() + except Exception as e: + console.log(f"[red]Error when connecting to kalavai service: {str(e)}") + return False + def restart(): console.log("[white] Restarting sharing (may take a few minutes)...") success = CLUSTER.restart_agent() @@ -153,6 +170,8 @@ def select_ip_address(subnet=None): pass if len(ips) == 1: return ips[0] + if len(ips) == 0: + raise ValueError("No IPs available") while True: option = user_confirm( question="Select IP to advertise the node (needs to be visible to other nodes)", @@ -235,15 +254,16 @@ def pool__publish(*others, description=None): # - cluster is up and running # - cluster is connected to vpn (has net token) # - user is authenticated - if not CLUSTER.is_cluster_init(): - console.log(f"[red] No local cluster running. Start a cluster with [yellow]kalavai pool start [white]first.") + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") return if description is None: console.log("[yellow] [Markdown] In a few words (max 500 chars), describe your goals with this cluster. Remember, this is what other users will see to decide whether to share their resources with you, [blue]so inspire them!") description = input(f"(You can edit this later in {KALAVAI_PLATFORM_URL}\n") - description = description try: @@ -269,8 +289,10 @@ def pool__unpublish(cluster_name=None, *others): # Check for: # - cluster is up and running # - user is authenticated - if not CLUSTER.is_cluster_init(): - console.log(f"[red] No local cluster running. Start a cluster with [yellow]kalavai pool start [white]first.") + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") return try: @@ -404,8 +426,14 @@ def pool__token(*others, admin_workers=False): """ Generate a join token for others to connect to your pool """ - if not CLUSTER.is_cluster_init() or not CLUSTER.is_seed_node(): - console.log("[red]Node is not seed. Possible reasons: the cluster has not been started or this is a worker node.") + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return + + if not CLUSTER.is_seed_node(): + console.log("[red]Node is not seed. Possible reasons: this is a worker node.") return None if admin_workers: @@ -524,6 +552,10 @@ def pool__join(token, *others, node_name=None, ip_address: str=None): # local k3s agent join console.log(f"[white] Connecting to {cluster_name} @ {kalavai_seed_ip} (this may take a few minutes)...") + # send note to server to let them know the node is coming online + if not pre_join_check(node_name=node_name, server_url=watcher_service, server_key=auth_key): + console.log(f"[red] Failed pre join checks. Server offline or node '{node_name}' may already exist. Please specify a different one with '--node-name'") + return try: CLUSTER.start_worker_node( url=kalavai_seed_ip, @@ -610,14 +642,47 @@ def pool__resume(*others): console.log("[white] Resuming kalavai app...") restart() +@arguably.command +def pool__gpus(*others): + """ + Display GPU information from all connected nodes + """ + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return + + try: + data = request_to_server( + method="post", + endpoint="/v1/get_node_gpus", + data={}, + server_creds=USER_LOCAL_SERVER_FILE + ) + columns, rows = [], [] + for node, gpus in data.items(): + for gpu in gpus: + rows.append([node] + list(gpu.values())) + columns = list(gpu.keys()) + columns = ["Node"] + columns + console.print( + generate_table(columns=columns, rows=rows) + ) + + except Exception as e: + console.log(f"[red]Error when connecting to kalavai service: {str(e)}") + @arguably.command def pool__resources(*others): """ Display information about resources on the pool """ - if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running(): - console.log("[red]Kalavai is not running or not installed on your machine") + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") return try: @@ -663,9 +728,12 @@ def pool__status(*others): """ Check the status of the kalavai pool """ - if not CLUSTER.is_cluster_init(): - console.log("This node is not connected to any pool") + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") return + try: response = request_to_server( method="POST", @@ -818,8 +886,10 @@ def node__list(*others): """ Display information about nodes connected """ - if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running(): - console.log("[red]Kalavai is not running on your machine") + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") return try: @@ -855,8 +925,10 @@ def node__delete(name, *others): """ Delete a node from the cluster """ - if not CLUSTER.is_cluster_init(): - console.log("This node is not connected to any cluster") + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") return data = { @@ -882,6 +954,11 @@ def node__cordon(node_name, *others): """ Cordon a particular node so no more work will be scheduled on it """ + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return set_schedulable(schedulable=False, node_name=node_name) @@ -890,6 +967,11 @@ def node__uncordon(node_name, *others): """ Uncordon a particular node to allow more work to be scheduled on it """ + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return set_schedulable(schedulable=True, node_name=node_name) @@ -927,8 +1009,11 @@ def job__run(template_name, *others, values_path=None): Args: *others: all the other positional arguments go here """ - if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running(): - console.log("[red]Kalavai is not running or not installed on your machine") + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return expose = True @@ -1049,8 +1134,11 @@ def job__delete(name, *others): """ Delete job in the cluster """ - if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running(): - console.log("[red]Kalavai is not running or not installed on your machine") + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return # deploy template with kube-watcher data = { @@ -1075,8 +1163,11 @@ def job__list(*others): """ List jobs in the cluster """ - if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running(): - console.log("[red]Kalavai is not running or not installed on your machine") + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return data = { "group": "leaderworkerset.x-k8s.io", @@ -1174,6 +1265,12 @@ def job__logs(name, *others, pod_name=None, stream=False): """ Get logs for a specific job """ + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return + data = { "namespace": "default", "label": "leaderworkerset.sigs.k8s.io/name", @@ -1214,6 +1311,12 @@ def job__manifest(*others, name): """ Get job manifest description """ + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return + data = { "namespace": "default", "label": "leaderworkerset.sigs.k8s.io/name", @@ -1240,6 +1343,12 @@ def ray__create(template_path, *others): """ Create a cluster using KubeRay operator """ + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return + with open(template_path, "r") as f: template_yaml = yaml.safe_load(f) # ensure deployment is labelled (for tracking and deletion) @@ -1283,8 +1392,14 @@ def ray__create(template_path, *others): @arguably.command def ray__list(*status): - if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running(): - console.log("[red]Kalavai is not running or not installed on your machine") + """ + List all available ray clusters + """ + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return data = { "group": "ray.io", @@ -1335,8 +1450,11 @@ def ray__delete(*others, name): """ Delete a ray cluster """ - if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running(): - console.log("[red]Kalavai is not running or not installed on your machine") + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return # deploy template with kube-watcher data = { @@ -1360,6 +1478,12 @@ def ray__manifest(*others, name): """ Get ray cluster manifest description """ + try: + CLUSTER.validate_cluster() + except Exception as e: + console.log(f"[red]Problems with your pool: {str(e)}") + return + data = { "namespace": "default", "label": "ray.io/cluster", diff --git a/kalavai_client/cluster.py b/kalavai_client/cluster.py index 9d7b42e..83a14d1 100644 --- a/kalavai_client/cluster.py +++ b/kalavai_client/cluster.py @@ -5,7 +5,8 @@ from kalavai_client.utils import ( run_cmd, user_path, - check_gpu_drivers + check_gpu_drivers, + validate_poolconfig ) @@ -56,13 +57,18 @@ def get_cluster_token(self) -> str: def diagnostics(self) -> str: raise NotImplementedError() + @abstractmethod + def validate_cluster(self) -> bool: + raise NotImplementedError + class k0sCluster(Cluster): - def __init__(self, kubeconfig_file, kube_version="v1.31.1+k3s1", flannel_iface=None): + def __init__(self, kubeconfig_file, poolconfig_file, kube_version="v1.31.1+k3s1", flannel_iface=None): self.kube_version = kube_version self.flannel_iface = flannel_iface self.kubeconfig_file = kubeconfig_file + self.poolconfig_file = poolconfig_file def start_seed_node(self, ip_address, cluster_config_file, labels): @@ -130,9 +136,10 @@ def diagnostics(self) -> str: class k3sCluster(Cluster): - def __init__(self, kubeconfig_file, kube_version="v1.31.1+k3s1", flannel_iface=None): + def __init__(self, kubeconfig_file, poolconfig_file, kube_version="v1.31.1+k3s1", flannel_iface=None): self.kube_version = kube_version self.kubeconfig_file = kubeconfig_file + self.poolconfig_file = poolconfig_file if flannel_iface is not None: self.default_flannel_iface = flannel_iface @@ -204,14 +211,18 @@ def is_cluster_init(self): return status def pause_agent(self): + status = False try: run_cmd('sudo systemctl stop k3s >/dev/null 2>&1') + status = True except: pass try: run_cmd('sudo systemctl stop k3s-agent >/dev/null 2>&1') + status = True except: pass + return status def restart_agent(self): @@ -237,3 +248,14 @@ def diagnostics(self) -> str: return run_cmd(f"k3s kubectl get pods -A -o wide --kubeconfig {self.kubeconfig_file}").decode() + "\n\n" + run_cmd(f"k3s kubectl get nodes --kubeconfig {self.kubeconfig_file}").decode() else: return None + + def validate_cluster(self) -> bool: + if not self.is_cluster_init(): + raise ValueError("Pool not initialised") + if not self.is_agent_running(): + raise ValueError("Pool initialised but agent is not running") + # check cache files + if not validate_poolconfig(self.poolconfig_file): + raise ValueError("Cache missconfigured. Run 'kalavai pool stop' to clear.") + return True + diff --git a/kalavai_client/utils.py b/kalavai_client/utils.py index 2b2d59d..562e8eb 100644 --- a/kalavai_client/utils.py +++ b/kalavai_client/utils.py @@ -42,6 +42,14 @@ WATCHER_SERVICE_KEY, PUBLIC_LOCATION_KEY ] +MANDATORY_POOLCONFIG_FIELDS = [ + SERVER_IP_KEY, + AUTH_KEY, + WATCHER_SERVICE_KEY, + NODE_NAME_KEY, + CLUSTER_NAME_KEY, + PUBLIC_LOCATION_KEY +] def load_server_info(data_key, file): @@ -147,6 +155,16 @@ def validate_join_public_seed(cluster_name, join_key, user_cookie): ) return seed +def validate_poolconfig(poolconfig_file): + if not Path(poolconfig_file).is_file(): + return False + with open(poolconfig_file, "r") as f: + data = json.load(f) + for field in MANDATORY_POOLCONFIG_FIELDS: + if field not in data: + return False + return True + def check_gpu_drivers(): value = run_cmd("command -v nvidia-smi") if len(value.decode("utf-8")) == 0: @@ -241,9 +259,16 @@ def get_all_templates(local_path, templates_path=None, remote_load=False): return [ (local_path, item) for item in os.listdir(local_path) if os.path.isdir(os.path.join(local_path, item)) ] -def request_to_server(method, endpoint, data, server_creds): - service_url = load_server_info(data_key=WATCHER_SERVICE_KEY, file=server_creds) - auth_key = load_server_info(data_key=AUTH_KEY, file=server_creds) +def request_to_server(method, endpoint, data, server_creds, force_url=None, force_key=None): + if force_url is None: + service_url = load_server_info(data_key=WATCHER_SERVICE_KEY, file=server_creds) + else: + service_url = force_url + + if force_key is None: + auth_key = load_server_info(data_key=AUTH_KEY, file=server_creds) + else: + auth_key = force_key headers = { "X-API-KEY": auth_key