Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update routers to use GetPodModelMetric api and misc cleanup in metri… #590

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/gateway/gateway-plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ spec:
value: aibrix-redis-master
- name: REDIS_PORT
value: "6379"
# - name: ROUTING_ALGORITHM
# value: "least-request" # random, least-request, throughput
- name: AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS
value: "10000" # 10 seconds, only local testing
- name: POD_NAME
valueFrom:
fieldRef:
Expand Down
21 changes: 12 additions & 9 deletions development/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
# Global storage for overridden values
overrides = {}

MODEL_NAME = os.getenv('MODEL_NAME', 'llama2-70b')
DEPLOYMENT_NAME = os.getenv('DEPLOYMENT_NAME', 'llama2-70b')
MODEL_NAME = os.getenv('MODEL_NAME', 'llama2-7b')
DEPLOYMENT_NAME = os.getenv('DEPLOYMENT_NAME', 'llama2-7b')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch. name has been changed. We probably can inject from env using reference later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

NAMESPACE = os.getenv('POD_NAMESPACE', 'default')
DEFAULT_REPLICAS = int(os.getenv('DEFAULT_REPLICAS', '1'))
SIMULATION = os.getenv('SIMULATION', 'disabled')
Expand Down Expand Up @@ -566,16 +566,19 @@ def metrics():
]

# Generate all metrics
metrics_output = """
# HELP vllm:lora_requests_info Running stats on lora requests.
# TYPE vllm:lora_requests_info gauge
vllm:lora_requests_info{max_lora="1",running_lora_adapters="text2sql-lora-1",waiting_lora_adapters=""} 1
"""
metrics_output = ""
for metric in simple_metrics:
metrics_output += generate_counter_gauge_metric(metric["name"], metric["type"], metric["description"],
model_name, metric["value"])
metrics_output += generate_counter_gauge_metric(metric["name"], metric["type"], metric["description"],
"lora-1", metric["value"], help_header=False)
"text2sql-lora-1", metric["value"], help_header=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

em. does user register a different name here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a blocker for the merge.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure from where lora-1 name came from. I added text2sql lora name based off model adapter.



metrics_output += """
# HELP vllm:lora_requests_info Running stats on lora requests.
# TYPE vllm:lora_requests_info gauge
vllm:lora_requests_info{max_lora="1",running_lora_adapters="text2sql-lora-1",waiting_lora_adapters=""} 1
"""

histogram_metrics = [
{
Expand Down Expand Up @@ -663,7 +666,7 @@ def metrics():
histogram_metrics_output += generate_histogram_metric(
metric_name=metric["name"],
description=metric["description"],
model_name="lora-1",
model_name="text2sql-lora-1",
buckets=metric["buckets"],
new_requests=new_requests,
help_header=False
Expand Down
95 changes: 51 additions & 44 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,17 @@ var (
)

func getPodMetricRefreshInterval() time.Duration {
value, exists := os.LookupEnv("AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS")
if exists {
value := LoadEnv("AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS", "")
if value != "" {
intValue, err := strconv.Atoi(value)
if err != nil {
klog.V(4).Infof("Invalid AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS: %s, falling back to default", value)
klog.Infof("invalid AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS: %s, falling back to default", value)
} else {
klog.V(4).Infof("Using env value for refresh interval: %d ms", intValue)
return time.Duration(intValue)
klog.Infof("using AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS env value for pod metrics refresh interval: %d ms", intValue)
return time.Duration(intValue) * time.Millisecond
}
}
klog.V(4).Infof("Using default refresh interval: %d ms", defaultPodMetricRefreshIntervalInMS)
klog.Infof("using default refresh interval: %d ms", defaultPodMetricRefreshIntervalInMS)
return defaultPodMetricRefreshIntervalInMS * time.Millisecond
}

Expand All @@ -151,7 +151,7 @@ func GetCache() (*Cache, error) {
func LoadEnv(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
klog.Warningf("Environment variable %s is not set, using default value: %s", key, defaultValue)
klog.Warningf("environment variable %s is not set, using default value: %s", key, defaultValue)
return defaultValue
}
return value
Expand Down Expand Up @@ -240,6 +240,7 @@ func NewCache(config *rest.Config, stopCh <-chan struct{}, redisClient *redis.Cl
case <-ticker.C:
instance.updatePodMetrics()
instance.updateModelMetrics()
instance.debugInfo()
case <-stopCh:
ticker.Stop()
return
Expand Down Expand Up @@ -272,7 +273,7 @@ func NewCache(config *rest.Config, stopCh <-chan struct{}, redisClient *redis.Cl
// Start ticker
traceTicker = time.NewTicker(RequestTraceWriteInterval)
}
klog.Infof("Trace ticker start at %s", time.Now())
klog.Infof("trace ticker start at %s", time.Now())
for {
select {
case <-traceTicker.C:
Expand Down Expand Up @@ -487,6 +488,13 @@ func (c *Cache) debugInfoLocked() {
klog.V(5).Infof("%v_%v_%v", podName, metricName, metricVal)
}
}
for podName, models := range c.PodModelMetrics {
for modelName, metrics := range models {
for metricName, metricVal := range metrics {
klog.V(5).Infof("%v_%v_%v_%v", podName, modelName, metricName, metricVal)
}
}
}
}

func (c *Cache) GetPod(podName string) (*v1.Pod, error) {
Expand Down Expand Up @@ -581,7 +589,8 @@ func (c *Cache) GetPodModelMetric(podName, modelName string, metricName string)
}

// Update `PodMetrics` and `PodModelMetrics` according to the metric scope
func (c *Cache) updatePodRecord(podName string, modelName string, metricName string, scope metrics.MetricScope, metricValue metrics.MetricValue) error {
// TODO: replace in-place metric update podMetrics and podModelMetrics to fresh copy for preventing stale metric keys
func (c *Cache) updatePodRecordLocked(podName string, modelName string, metricName string, scope metrics.MetricScope, metricValue metrics.MetricValue) error {
if scope == metrics.PodMetricScope {
if modelName != "" {
return fmt.Errorf("modelName should be empty for scope %v", scope)
Expand All @@ -601,7 +610,7 @@ func (c *Cache) updatePodRecord(podName string, modelName string, metricName str
return nil
}

func (c *Cache) queryUpdatePromQLMetrics(metric metrics.Metric, queryLabels map[string]string, podName string, modelName string, metricName string) error {
func (c *Cache) queryUpdatePromQLMetricsLocked(metric metrics.Metric, queryLabels map[string]string, podName string, modelName string, metricName string) error {
scope := metric.MetricScope
query := metrics.BuildQuery(metric.PromQL, queryLabels)
// Querying metrics
Expand All @@ -611,33 +620,32 @@ func (c *Cache) queryUpdatePromQLMetrics(metric metrics.Metric, queryLabels map[
return fmt.Errorf("error executing query: %v", err)
}
if len(warnings) > 0 {
klog.Warningf("Warnings: %v\n", warnings)
klog.V(4).Infof("Warnings: %v\n", warnings)
}

klog.Infof("Query Result:%v\n", result)
// Update metrics
metricValue := &metrics.PrometheusMetricValue{Result: &result}
err = c.updatePodRecord(podName, modelName, metricName, scope, metricValue)
err = c.updatePodRecordLocked(podName, modelName, metricName, scope, metricValue)
if err != nil {
return fmt.Errorf("failed to update metrics %s from prometheus %s: %v", metricName, podName, err)
}
klog.V(5).InfoS("Successfully parsed metrics from prometheus", "metric", metricName, "model", modelName, "PodName", podName, "Port", podPort, "metricValue", metricValue)
return nil
}

func (c *Cache) updateSimpleMetricFromRawMetrics(pod *v1.Pod, allMetrics map[string]*dto.MetricFamily) {
func (c *Cache) updateSimpleMetricFromRawMetricsLocked(pod *v1.Pod, allMetrics map[string]*dto.MetricFamily) {
podName := pod.Name
for _, metricName := range counterGaugeMetricNames {
metric, exists := metrics.Metrics[metricName]
if !exists {
klog.Warningf("Cannot find %v in the metric list", metricName)
klog.V(4).Infof("Cannot find %v in the metric list", metricName)
continue
}

// TODO: we should refact metricName to fit other engine
metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", metricName)]
if !exists {
klog.Warningf("Cannot find %v in the pod metrics", metricName)
klog.V(4).Infof("Cannot find %v in the pod metrics", metricName)
continue
}
scope := metric.MetricScope
Expand All @@ -646,13 +654,13 @@ func (c *Cache) updateSimpleMetricFromRawMetrics(pod *v1.Pod, allMetrics map[str

metricValue, err := metrics.GetCounterGaugeValue(familyMetric, metricFamily.GetType())
if err != nil {
klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
klog.V(4).Infof("failed to parse metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
continue
}

err = c.updatePodRecord(podName, modelName, metricName, scope, &metrics.SimpleMetricValue{Value: metricValue})
err = c.updatePodRecordLocked(podName, modelName, metricName, scope, &metrics.SimpleMetricValue{Value: metricValue})
if err != nil {
klog.Errorf("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
klog.V(4).Infof("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
continue
}

Expand All @@ -661,26 +669,26 @@ func (c *Cache) updateSimpleMetricFromRawMetrics(pod *v1.Pod, allMetrics map[str
}
}

func (c *Cache) updateHistogramMetricFromRawMetrics(pod *v1.Pod, allMetrics map[string]*dto.MetricFamily) {
func (c *Cache) updateHistogramMetricFromRawMetricsLocked(pod *v1.Pod, allMetrics map[string]*dto.MetricFamily) {
podName := pod.Name
for _, metricName := range histogramMetricNames {
metric, exists := metrics.Metrics[metricName]
if !exists {
klog.Warningf("Cannot find %v in the metric list", metricName)
klog.V(4).Infof("Cannot find %v in the metric list", metricName)
continue
}

metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", metricName)]
if !exists {
klog.Warningf("Cannot find %v in the pod metrics", metricName)
klog.V(4).Infof("Cannot find %v in the pod metrics", metricName)
continue
}
scope := metric.MetricScope
for _, familyMetric := range metricFamily.Metric {
modelName, _ := metrics.GetLabelValueForKey(familyMetric, "model_name")
metricValue, err := metrics.GetHistogramValue(familyMetric)
if err != nil {
klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err)
klog.V(4).Infof("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err)
continue
}

Expand All @@ -689,9 +697,9 @@ func (c *Cache) updateHistogramMetricFromRawMetrics(pod *v1.Pod, allMetrics map[
Count: metricValue.Count,
Buckets: metricValue.Buckets,
}
err = c.updatePodRecord(podName, modelName, metricName, scope, histogramValue)
err = c.updatePodRecordLocked(podName, modelName, metricName, scope, histogramValue)
if err != nil {
klog.Errorf("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
klog.V(4).Infof("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
continue
}

Expand All @@ -701,28 +709,28 @@ func (c *Cache) updateHistogramMetricFromRawMetrics(pod *v1.Pod, allMetrics map[
}
}

func (c *Cache) updateQueryLabelMetricFromRawMetrics(pod *v1.Pod, allMetrics map[string]*dto.MetricFamily) {
func (c *Cache) updateQueryLabelMetricFromRawMetricsLocked(pod *v1.Pod, allMetrics map[string]*dto.MetricFamily) {
podName := pod.Name

for _, labelMetricName := range labelQueryMetricNames {
metric, exists := metrics.Metrics[labelMetricName]
if !exists {
klog.Warningf("Cannot find %v in the metric list", labelMetricName)
klog.V(4).Infof("Cannot find %v in the metric list", labelMetricName)
continue
}
rawMetricName := metric.RawMetricName
scope := metric.MetricScope
metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", rawMetricName)]
if !exists {
klog.Warningf("Cannot find %v in the pod metrics", rawMetricName)
klog.V(4).Infof("Cannot find %v in the pod metrics", rawMetricName)
continue
}
for _, familyMetric := range metricFamily.Metric {
modelName, _ := metrics.GetLabelValueForKey(familyMetric, "model_name")
labelValue, _ := metrics.GetLabelValueForKey(familyMetric, labelMetricName)
err := c.updatePodRecord(podName, modelName, labelMetricName, scope, &metrics.LabelValueMetricValue{Value: labelValue})
err := c.updatePodRecordLocked(podName, modelName, labelMetricName, scope, &metrics.LabelValueMetricValue{Value: labelValue})
if err != nil {
klog.Errorf("Failed to update metrics %s from pod %s %s %d: %v", labelMetricName, podName, pod.Status.PodIP, podPort, err)
klog.V(4).Infof("Failed to update metrics %s from pod %s %s %d: %v", labelMetricName, podName, pod.Status.PodIP, podPort, err)
continue
}

Expand All @@ -731,7 +739,7 @@ func (c *Cache) updateQueryLabelMetricFromRawMetrics(pod *v1.Pod, allMetrics map
}
}

func (c *Cache) updateMetricFromPromQL(pod *v1.Pod) {
func (c *Cache) updateMetricFromPromQLLocked(pod *v1.Pod) {
podName := pod.Name

for _, metricName := range prometheusMetricNames {
Expand All @@ -740,31 +748,31 @@ func (c *Cache) updateMetricFromPromQL(pod *v1.Pod) {
}
metric, ok := metrics.Metrics[metricName]
if !ok {
klog.Warningf("Cannot find %v in the metric list", metricName)
klog.V(4).Infof("Cannot find %v in the metric list", metricName)
continue
}
scope := metric.MetricScope
if scope == metrics.PodMetricScope {
err := c.queryUpdatePromQLMetrics(metric, queryLabels, podName, "", metricName)
err := c.queryUpdatePromQLMetricsLocked(metric, queryLabels, podName, "", metricName)
if err != nil {
klog.Errorf("Failed to query and update PromQL metrics: %v", err)
klog.V(4).Infof("Failed to query and update PromQL metrics: %v", err)
continue
}
} else if scope == metrics.PodModelMetricScope {
if modelNames, ok := c.PodToModelMapping[podName]; ok {
for modelName := range modelNames {
queryLabels["model_name"] = modelName
err := c.queryUpdatePromQLMetrics(metric, queryLabels, podName, modelName, metricName)
err := c.queryUpdatePromQLMetricsLocked(metric, queryLabels, podName, modelName, metricName)
if err != nil {
klog.Errorf("Failed to query and update PromQL metrics: %v", err)
klog.V(4).Infof("Failed to query and update PromQL metrics: %v", err)
continue
}
}
} else {
klog.Warningf("Cannot find model names for pod %s", podName)
klog.V(4).Infof("Cannot find model names for pod %s", podName)
}
} else {
klog.Warningf("Scope %v is not supported", scope)
klog.V(4).Infof("Scope %v is not supported", scope)
}
}
}
Expand All @@ -791,25 +799,24 @@ func (c *Cache) updatePodMetrics() {
url := fmt.Sprintf("http://%s:%d/metrics", pod.Status.PodIP, podPort)
allMetrics, err := metrics.ParseMetricsURL(url)
if err != nil {
klog.Warningf("Error parsing metric families: %v\n", err)
klog.V(4).Infof("Error parsing metric families: %v\n", err)
}

// parse counterGaugeMetricsNames
c.updateSimpleMetricFromRawMetrics(pod, allMetrics)
c.updateSimpleMetricFromRawMetricsLocked(pod, allMetrics)

// parse histogramMetrics
c.updateHistogramMetricFromRawMetrics(pod, allMetrics)
c.updateHistogramMetricFromRawMetricsLocked(pod, allMetrics)

// parse QueryLabel metrics
c.updateQueryLabelMetricFromRawMetrics(pod, allMetrics)
c.updateQueryLabelMetricFromRawMetricsLocked(pod, allMetrics)

if c.prometheusApi == nil {
klog.V(4).InfoS("Prometheus api is not initialized, PROMETHEUS_ENDPOINT is not configured, skip fetching prometheus metrics")
continue
}
// parse prometheus metrics
c.updateMetricFromPromQL(pod)

c.updateMetricFromPromQLLocked(pod)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/modeladapter/modeladapter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func (r *ModelAdapterReconciler) getActivePodsForModelAdapter(ctx context.Contex
func (r *ModelAdapterReconciler) schedulePod(ctx context.Context, instance *modelv1alpha1.ModelAdapter, activePods []corev1.Pod) (*corev1.Pod, error) {
// Implement your scheduling logic here to select a Pod based on the instance.Spec.PodSelector
// For the sake of example, we will just list the Pods matching the selector and pick the first one
return r.scheduler.SelectPod(ctx, activePods)
return r.scheduler.SelectPod(ctx, instance.Name, activePods)
}

func (r *ModelAdapterReconciler) reconcileLoading(ctx context.Context, instance *modelv1alpha1.ModelAdapter) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/modeladapter/scheduling/bin_pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewBinPackScheduler(c *cache.Cache) Scheduler {
}
}

func (r binPackScheduler) SelectPod(ctx context.Context, pods []v1.Pod) (*v1.Pod, error) {
func (r binPackScheduler) SelectPod(ctx context.Context, model string, pods []v1.Pod) (*v1.Pod, error) {
// Binpack algorithm: choose the pod (1) can place the adapter, (2) with the least remaining space

selectedPod := v1.Pod{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/modeladapter/scheduling/least_adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewLeastAdapters(c *cache.Cache) Scheduler {
}
}

func (r leastAdapters) SelectPod(ctx context.Context, pods []v1.Pod) (*v1.Pod, error) {
func (r leastAdapters) SelectPod(ctx context.Context, model string, pods []v1.Pod) (*v1.Pod, error) {
selectedPod := v1.Pod{}
modelAdapterCountMin := math.MaxInt

Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/modeladapter/scheduling/least_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ func NewLeastLatencyScheduler(c *cache.Cache) Scheduler {
}
}

func (r leastLatencyScheduler) SelectPod(ctx context.Context, pods []v1.Pod) (*v1.Pod, error) {
func (r leastLatencyScheduler) SelectPod(ctx context.Context, model string, pods []v1.Pod) (*v1.Pod, error) {
selectedPod := v1.Pod{}
podLatencyMin := math.MaxFloat64

for _, pod := range pods {
queueTime, err := r.cache.GetPodMetric(pod.Name, metrics.RequestQueueTimeSeconds)
queueTime, err := r.cache.GetPodModelMetric(pod.Name, model, metrics.RequestQueueTimeSeconds)
if err != nil {
return nil, err
}
inferenceTime, err := r.cache.GetPodMetric(pod.Name, metrics.RequestInferenceTimeSeconds)
inferenceTime, err := r.cache.GetPodModelMetric(pod.Name, model, metrics.RequestInferenceTimeSeconds)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading