Skip to content

Commit

Permalink
moved templates to jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
musoles committed Nov 23, 2024
1 parent 72a8e26 commit 5b5a82d
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 234 deletions.
17 changes: 0 additions & 17 deletions assets/storage_class_template.yaml

This file was deleted.

136 changes: 64 additions & 72 deletions kalavai_client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import json
import uuid
from string import Template
import time
import socket
from pathlib import Path
Expand Down Expand Up @@ -54,10 +53,6 @@
WATCHER_PORT_KEY,
MANDATORY_TOKEN_FIELDS,
USER_NODE_LABEL_KEY,
DEPLOY_HELIOS_KEY,
LONGHORN_UI_PORT_KEY,
LONGHORN_MANAGER_PORT_KEY,
KALAVAI_API_ENDPOINT_KEY,
IS_PUBLIC_POOL_KEY
)
from kalavai_client.cluster import (
Expand All @@ -66,26 +61,26 @@


KALAVAI_PLATFORM_URL = os.getenv("KALAVAI_PLATFORM_URL", "https://platform.kalavai.net")
KALAVAI_API_ENDPOINT = os.getenv("KALAVAI_API_ENDPOINT", "https://platform.kalavai.net/_/api")
LOCAL_TEMPLATES_DIR = os.getenv("LOCAL_TEMPLATES_DIR", None)
VERSION = 1
RESOURCE_EXCLUDE = ["ephemeral-storage", "hugepages-1Gi", "hugepages-2Mi", "pods"]
CORE_NAMESPACES = ["lws-system", "kube-system", "gpu-operator", "kalavai"]
TEMPLATE_LABEL = "kalavai.lws.name"
TEMPLATE_LABEL = "kalavai.job.name"
RAY_LABEL = "kalavai.ray.name"
PVC_NAME_LABEL = "kalavai.storage.name"
POOL_CONFIG_TEMPLATE = resource_path("assets/pool_config_template.yaml")
POOL_CONFIG_DEFAULT_VALUES = resource_path("assets/pool_config_values.yaml")
STORAGE_CLASS_NAME = "longhorn-rwx"
STORAGE_CLASS_LABEL = "kalavai.storage.enabled"
DEFAULT_STORAGE_NAME = "pool-cache"
DEFAULT_STORAGE_SIZE = 5
DEFAULT_STORAGE_REPLICAS = 1
USER_NODE_LABEL = "kalavai.cluster.user"
KUBE_VERSION = os.getenv("KALAVAI_KUBE_VERSION", "v1.31.1+k3s1")
DEFAULT_FLANNEL_IFACE = os.getenv("KALAVAI_FLANNEL_IFACE", "netmaker")
FORBIDEDEN_IPS = ["127.0.0.1"]
# kalavai templates
HELM_APPS_FILE = resource_path("assets/apps.yaml")
STORAGE_CLASS_TEMPLATE_FILE = resource_path("assets/storage_class_template.yaml")
HELM_APPS_VALUES = resource_path("assets/apps_values.yaml")
# user specific config files
USER_HELM_APPS_FILE = user_path("apps.yaml")
USER_KUBECONFIG_FILE = user_path("kubeconfig")
Expand Down Expand Up @@ -192,6 +187,30 @@ def init_user_workspace():
except Exception as e:
console.log(f"[red]Error when connecting to kalavai service: {str(e)}")

def pool_init():
"""Deploy configured objects to initialise pool"""
# load template config and populate with values
sidecar_template_yaml = load_template(
template_path=POOL_CONFIG_TEMPLATE,
values={},
default_values_path=POOL_CONFIG_DEFAULT_VALUES)

try:
result = request_to_server(
method="post",
endpoint="/v1/deploy_generic_model",
data={"config": sidecar_template_yaml},
server_creds=USER_LOCAL_SERVER_FILE,
user_cookie=USER_COOKIE
)
if len(result['failed']) > 0:
console.log(f"[red]Error when deploying pool config\n\n{result['failed']}")
if len(result['successful']) > 0:
console.log(f"[green]Deployed pool config!")
except Exception as e:
console.log(f"[red]Error when connecting to kalavai service: {str(e)}")


def select_ip_address(subnet=None):
ips = []
for iface in ni.interfaces():
Expand Down Expand Up @@ -414,7 +433,7 @@ def pool__list(*others, user_only=False):
console.log("[white]Use [yellow]kalavai pool join <join key> [white]to join a public pool")

@arguably.command
def pool__start(cluster_name, *others, ip_address: str=None, location: str=None):
def pool__start(cluster_name, *others, ip_address: str=None, location: str=None, app_values: str=HELM_APPS_VALUES):
"""
Start Kalavai pool and start/resume sharing resources.
Expand Down Expand Up @@ -455,8 +474,6 @@ def pool__start(cluster_name, *others, ip_address: str=None, location: str=None
write_auth_key = str(uuid.uuid4())
readonly_auth_key = str(uuid.uuid4())
watcher_port = 31000
longhorn_ui_port = 30000
longhorn_manager_port = 30001
watcher_service = f"{ip_address}:{watcher_port}"
values = {
CLUSTER_NAME_KEY: cluster_name,
Expand All @@ -465,13 +482,9 @@ def pool__start(cluster_name, *others, ip_address: str=None, location: str=None
READONLY_AUTH_KEY: readonly_auth_key,
WRITE_AUTH_KEY: write_auth_key,
WATCHER_PORT_KEY: watcher_port,
LONGHORN_UI_PORT_KEY: longhorn_ui_port,
LONGHORN_MANAGER_PORT_KEY: longhorn_manager_port,
WATCHER_SERVICE_KEY: watcher_service,
USER_NODE_LABEL_KEY: USER_NODE_LABEL,
DEPLOY_HELIOS_KEY: location is not None,
IS_PUBLIC_POOL_KEY: location is not None,
KALAVAI_API_ENDPOINT_KEY: KALAVAI_API_ENDPOINT
IS_PUBLIC_POOL_KEY: location is not None
}

# 1. start k3s server
Expand Down Expand Up @@ -499,12 +512,14 @@ def pool__start(cluster_name, *others, ip_address: str=None, location: str=None

console.log("Install dependencies...")
# set template values in helmfile
with open(HELM_APPS_FILE, "r") as f:
config = Template(f.read())
config = config.substitute(values)
helm_yaml = load_template(
template_path=HELM_APPS_FILE,
values=values,
default_values_path=app_values,
force_defaults=True)

with open(USER_HELM_APPS_FILE, "w") as f:
f.write(config)
f.write(helm_yaml)
CLUSTER.update_dependencies(
dependencies_file=USER_HELM_APPS_FILE
)
Expand All @@ -526,8 +541,8 @@ def pool__start(cluster_name, *others, ip_address: str=None, location: str=None
break
console.log("Initialise user workspace...")
init_user_workspace()
console.log(f"Initialising storage: {DEFAULT_STORAGE_NAME} ({DEFAULT_STORAGE_SIZE}Gi)...")
storage__init()
console.log(f"Initialising pool config...")
pool_init()
storage__create()

return None
Expand Down Expand Up @@ -913,40 +928,6 @@ def pool__status(*others, log_file=None):
for log in logs:
console.log(f"{log}\n")

@arguably.command
def storage__init(replicas=DEFAULT_STORAGE_REPLICAS, *others):
"""
Create storage for the cluster
"""
try:
CLUSTER.validate_cluster()
except Exception as e:
console.log(f"[red]Problems with your pool: {str(e)}")
return

sidecar_template_yaml = load_template(
template_path=STORAGE_CLASS_TEMPLATE_FILE,
values={
"sc_name": STORAGE_CLASS_NAME,
"sc_label_selector": f"{STORAGE_CLASS_LABEL}:True",
"sc_replicas": replicas
}
)
try:
result = request_to_server(
method="post",
endpoint="/v1/deploy_generic_model",
data={"config": sidecar_template_yaml},
server_creds=USER_LOCAL_SERVER_FILE,
user_cookie=USER_COOKIE
)
if len(result['failed']) > 0:
console.log(f"[red]Error when creating storage class\n\n{result['failed']}")
if len(result['successful']) > 0:
console.log(f"[green]Created storage class: {STORAGE_CLASS_NAME} ({replicas} replicas)")
except Exception as e:
console.log(f"[red]Error when connecting to kalavai service: {str(e)}")

@arguably.command
def storage__create(name=DEFAULT_STORAGE_NAME, storage=DEFAULT_STORAGE_SIZE, *others):
"""
Expand Down Expand Up @@ -1230,9 +1211,12 @@ def generate_gpu_annotation(input_message, values, value_key, annotation_key):
# deploy template with kube-watcher
data = {
"object": {
"group": "leaderworkerset.x-k8s.io",
"api_version": "v1",
"plural": "leaderworkersets"
"group": "batch.volcano.sh",
"api_version": "v1alpha1",
"plural": "jobs"
# "group": "leaderworkerset.x-k8s.io",
# "api_version": "v1",
# "plural": "leaderworkersets"
},
"body": template_yaml
}
Expand Down Expand Up @@ -1366,9 +1350,12 @@ def job__list(*others):
return

data = {
"group": "leaderworkerset.x-k8s.io",
"api_version": "v1",
"plural": "leaderworkersets",
"group": "batch.volcano.sh",
"api_version": "v1alpha1",
"plural": "jobs"
# "group": "leaderworkerset.x-k8s.io",
# "api_version": "v1",
# "plural": "leaderworkersets",
}
try:
result = request_to_server(
Expand All @@ -1378,7 +1365,7 @@ def job__list(*others):
server_creds=USER_LOCAL_SERVER_FILE,
user_cookie=USER_COOKIE
)
deployment_names = [d["metadata"]["name"] for d in result["items"]]
deployment_names = [d["metadata"]["labels"][TEMPLATE_LABEL] for d in result["items"]]

except Exception as e:
console.log(f"[red]Error when connecting to kalavai service: {str(e)}")
Expand All @@ -1394,9 +1381,12 @@ def job__list(*others):
try:
# get status for deployment
data = {
"group": "leaderworkerset.x-k8s.io",
"api_version": "v1",
"plural": "leaderworkersets",
"group": "batch.volcano.sh",
"api_version": "v1alpha1",
"plural": "jobs",
# "group": "leaderworkerset.x-k8s.io",
# "api_version": "v1",
# "plural": "leaderworkersets",
"name": deployment
}
result = request_to_server(
Expand All @@ -1408,14 +1398,15 @@ def job__list(*others):
)
if len(result) > 0:
last = result[-1]
statuses = f"{last['type']}: {last['message']}"
statuses = f"[{last['lastTransitionTime']}] {last['status']}"
else:
statuses = "Unknown"
# get pod statuses
data = {
"label": "leaderworkerset.sigs.k8s.io/name",
"label": TEMPLATE_LABEL,
"value": deployment
}
# TODO
result = request_to_server(
method="post",
endpoint="/v1/get_pods_status_for_label",
Expand Down Expand Up @@ -1458,7 +1449,7 @@ def job__list(*others):


@arguably.command
def job__logs(name, *others, pod_name=None, stream=False):
def job__logs(name, *others, pod_name=None, stream=False, tail=100):
"""
Get logs for a specific job
"""
Expand All @@ -1469,11 +1460,12 @@ def job__logs(name, *others, pod_name=None, stream=False):
return

data = {
"label": "leaderworkerset.sigs.k8s.io/name",
"label": TEMPLATE_LABEL,
"value": name
}
while True:
try:
# send tail as parameter (fetch only last _tail_ lines)
result = request_to_server(
method="post",
endpoint="/v1/get_logs_for_label",
Expand Down Expand Up @@ -1515,7 +1507,7 @@ def job__manifest(*others, name):
return

data = {
"label": "leaderworkerset.sigs.k8s.io/name",
"label": TEMPLATE_LABEL,
"value": name
}
try:
Expand Down
15 changes: 6 additions & 9 deletions kalavai_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@
READONLY_AUTH_KEY = "watcher_readonly_key"
WATCHER_SERVICE_KEY = "watcher_service"
WATCHER_PORT_KEY = "watcher_port"
LONGHORN_UI_PORT_KEY = "longhorn_ui_port"
LONGHORN_MANAGER_PORT_KEY = "longhorn_manager_port"
DEPLOY_HELIOS_KEY = "deploy_helios"
IS_PUBLIC_POOL_KEY = "is_public_pool"
KALAVAI_API_ENDPOINT_KEY = "kalavai_api_endpoint"
MANDATORY_TOKEN_FIELDS = [
CLUSTER_IP_KEY,
CLUSTER_TOKEN_KEY,
Expand Down Expand Up @@ -369,7 +365,10 @@ def store_server_info(server_ip, auth_key, watcher_service, file, node_name, clu
}, f)
return True

def load_template(template_path, values, default_values_path=None):
def populate_template(template_str, values_dict):
return Template(template_str).render(values_dict)

def load_template(template_path, values, default_values_path=None, force_defaults=False):

if not Path(template_path).exists():
raise FileNotFoundError(f"{template_path} does not exist")
Expand All @@ -381,12 +380,10 @@ def load_template(template_path, values, default_values_path=None):
with open(default_values_path, 'r') as f:
default_values = yaml.safe_load(f)
for default in default_values:
if default["name"] not in values:
if not force_defaults or default["name"] not in values:
values[default['name']] = default['default']

template = Template(yaml_template)

return template.render(values)
return populate_template(template_str=yaml_template, values_dict=values)


def user_confirm(question: str, options: list, multiple: bool=False) -> int:
Expand Down
8 changes: 4 additions & 4 deletions templates/aphrodite/examples/qwen2.5-0.5B.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
default: "pool-cache"
description: "Pool storage to use to cache model weights"

- name: num_workers
value: "1"
default: "1"
description: "Workers per deployment (for tensor parallelism)"
- name: remote_workers
value: "0"
default: "0"
description: "Number of remote workers (for tensor and pipeline parallelism). This is in addition to the main node"

- name: repo_id
value: Qwen/Qwen2.5-0.5B-Instruct
Expand Down
Loading

0 comments on commit 5b5a82d

Please sign in to comment.