Skip to content

Commit

Permalink
validation checks
Browse files Browse the repository at this point in the history
  • Loading branch information
musoles committed Nov 4, 2024
1 parent 4d0e982 commit a6a2900
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 32 deletions.
176 changes: 150 additions & 26 deletions kalavai_client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@
CLUSTER = k3sCluster(
kube_version=KUBE_VERSION,
flannel_iface=DEFAULT_FLANNEL_IFACE,
kubeconfig_file=USER_KUBECONFIG_FILE
kubeconfig_file=USER_KUBECONFIG_FILE,
poolconfig_file=USER_LOCAL_SERVER_FILE
)


Expand Down Expand Up @@ -111,6 +112,22 @@ def fetch_remote_templates():
remote_load=True)
return templates

def pre_join_check(node_name, server_url, server_key):
# check with the server that we can connect
try:
nodes = request_to_server(
force_url=server_url,
force_key=server_key,
method="get",
endpoint="/v1/get_nodes",
data={"node_names": [node_name]},
server_creds=USER_LOCAL_SERVER_FILE
)
return node_name not in nodes.keys()
except Exception as e:
console.log(f"[red]Error when connecting to kalavai service: {str(e)}")
return False

def restart():
console.log("[white] Restarting sharing (may take a few minutes)...")
success = CLUSTER.restart_agent()
Expand Down Expand Up @@ -153,6 +170,8 @@ def select_ip_address(subnet=None):
pass
if len(ips) == 1:
return ips[0]
if len(ips) == 0:
raise ValueError("No IPs available")
while True:
option = user_confirm(
question="Select IP to advertise the node (needs to be visible to other nodes)",
Expand Down Expand Up @@ -235,15 +254,16 @@ def pool__publish(*others, description=None):
# - cluster is up and running
# - cluster is connected to vpn (has net token)
# - user is authenticated
if not CLUSTER.is_cluster_init():
console.log(f"[red] No local cluster running. Start a cluster with [yellow]kalavai pool start [white]first.")
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

if description is None:
console.log("[yellow] [Markdown] In a few words (max 500 chars), describe your goals with this cluster. Remember, this is what other users will see to decide whether to share their resources with you, [blue]so inspire them!")
description = input(f"(You can edit this later in {KALAVAI_PLATFORM_URL}\n")


description = description

try:
Expand All @@ -269,8 +289,10 @@ def pool__unpublish(cluster_name=None, *others):
# Check for:
# - cluster is up and running
# - user is authenticated
if not CLUSTER.is_cluster_init():
console.log(f"[red] No local cluster running. Start a cluster with [yellow]kalavai pool start [white]first.")
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

try:
Expand Down Expand Up @@ -404,8 +426,14 @@ def pool__token(*others, admin_workers=False):
"""
Generate a join token for others to connect to your pool
"""
if not CLUSTER.is_cluster_init() or not CLUSTER.is_seed_node():
console.log("[red]Node is not seed. Possible reasons: the cluster has not been started or this is a worker node.")
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

if not CLUSTER.is_seed_node():
console.log("[red]Node is not seed. Possible reasons: this is a worker node.")
return None

if admin_workers:
Expand Down Expand Up @@ -524,6 +552,10 @@ def pool__join(token, *others, node_name=None, ip_address: str=None):

# local k3s agent join
console.log(f"[white] Connecting to {cluster_name} @ {kalavai_seed_ip} (this may take a few minutes)...")
# send note to server to let them know the node is coming online
if not pre_join_check(node_name=node_name, server_url=watcher_service, server_key=auth_key):
console.log(f"[red] Failed pre join checks. Server offline or node '{node_name}' may already exist. Please specify a different one with '--node-name'")
return
try:
CLUSTER.start_worker_node(
url=kalavai_seed_ip,
Expand Down Expand Up @@ -610,14 +642,47 @@ def pool__resume(*others):
console.log("[white] Resuming kalavai app...")
restart()

@arguably.command
def pool__gpus(*others):
"""
Display GPU information from all connected nodes
"""
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

try:
data = request_to_server(
method="post",
endpoint="/v1/get_node_gpus",
data={},
server_creds=USER_LOCAL_SERVER_FILE
)
columns, rows = [], []
for node, gpus in data.items():
for gpu in gpus:
rows.append([node] + list(gpu.values()))
columns = list(gpu.keys())
columns = ["Node"] + columns
console.print(
generate_table(columns=columns, rows=rows)
)

except Exception as e:
console.log(f"[red]Error when connecting to kalavai service: {str(e)}")


@arguably.command
def pool__resources(*others):
"""
Display information about resources on the pool
"""
if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running():
console.log("[red]Kalavai is not running or not installed on your machine")
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

try:
Expand Down Expand Up @@ -663,9 +728,12 @@ def pool__status(*others):
"""
Check the status of the kalavai pool
"""
if not CLUSTER.is_cluster_init():
console.log("This node is not connected to any pool")
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

try:
response = request_to_server(
method="POST",
Expand Down Expand Up @@ -818,8 +886,10 @@ def node__list(*others):
"""
Display information about nodes connected
"""
if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running():
console.log("[red]Kalavai is not running on your machine")
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

try:
Expand Down Expand Up @@ -855,8 +925,10 @@ def node__delete(name, *others):
"""
Delete a node from the cluster
"""
if not CLUSTER.is_cluster_init():
console.log("This node is not connected to any cluster")
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

data = {
Expand All @@ -882,6 +954,11 @@ def node__cordon(node_name, *others):
"""
Cordon a particular node so no more work will be scheduled on it
"""
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return
set_schedulable(schedulable=False, node_name=node_name)


Expand All @@ -890,6 +967,11 @@ def node__uncordon(node_name, *others):
"""
Uncordon a particular node to allow more work to be scheduled on it
"""
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return
set_schedulable(schedulable=True, node_name=node_name)


Expand Down Expand Up @@ -927,8 +1009,11 @@ def job__run(template_name, *others, values_path=None):
Args:
*others: all the other positional arguments go here
"""
if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running():
console.log("[red]Kalavai is not running or not installed on your machine")
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

expose = True

Expand Down Expand Up @@ -1049,8 +1134,11 @@ def job__delete(name, *others):
"""
Delete job in the cluster
"""
if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running():
console.log("[red]Kalavai is not running or not installed on your machine")
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

# deploy template with kube-watcher
data = {
Expand All @@ -1075,8 +1163,11 @@ def job__list(*others):
"""
List jobs in the cluster
"""
if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running():
console.log("[red]Kalavai is not running or not installed on your machine")
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

data = {
"group": "leaderworkerset.x-k8s.io",
Expand Down Expand Up @@ -1174,6 +1265,12 @@ def job__logs(name, *others, pod_name=None, stream=False):
"""
Get logs for a specific job
"""
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

data = {
"namespace": "default",
"label": "leaderworkerset.sigs.k8s.io/name",
Expand Down Expand Up @@ -1214,6 +1311,12 @@ def job__manifest(*others, name):
"""
Get job manifest description
"""
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

data = {
"namespace": "default",
"label": "leaderworkerset.sigs.k8s.io/name",
Expand All @@ -1240,6 +1343,12 @@ def ray__create(template_path, *others):
"""
Create a cluster using KubeRay operator
"""
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

with open(template_path, "r") as f:
template_yaml = yaml.safe_load(f)
# ensure deployment is labelled (for tracking and deletion)
Expand Down Expand Up @@ -1283,8 +1392,14 @@ def ray__create(template_path, *others):

@arguably.command
def ray__list(*status):
if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running():
console.log("[red]Kalavai is not running or not installed on your machine")
"""
List all available ray clusters
"""
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

data = {
"group": "ray.io",
Expand Down Expand Up @@ -1335,8 +1450,11 @@ def ray__delete(*others, name):
"""
Delete a ray cluster
"""
if not CLUSTER.is_cluster_init() or not CLUSTER.is_agent_running():
console.log("[red]Kalavai is not running or not installed on your machine")
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

# deploy template with kube-watcher
data = {
Expand All @@ -1360,6 +1478,12 @@ def ray__manifest(*others, name):
"""
Get ray cluster manifest description
"""
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

data = {
"namespace": "default",
"label": "ray.io/cluster",
Expand Down
Loading

0 comments on commit a6a2900

Please sign in to comment.