Skip to content

Commit

Permalink
Support metrics multi labels for different models (#450)
Browse files Browse the repository at this point in the history
* fix: concat indicators with the same name together

* refact: add model args into Router

* feat: support label for multi model in pod metrics

* fix: pod is not ready to skip update metrics

* fix
  • Loading branch information
brosoul authored Dec 2, 2024
1 parent 25eb982 commit f1659ab
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 63 deletions.
40 changes: 20 additions & 20 deletions docs/development/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def set_metrics():
metrics_state = {}


def generate_histogram_metric(metric_name, description, model_name, buckets, new_requests):
def generate_histogram_metric(metric_name, description, model_name, buckets, new_requests, help_header=True):
"""
Generate Prometheus histogram metrics with dynamically updated bucket values.
Expand All @@ -222,6 +222,7 @@ def generate_histogram_metric(metric_name, description, model_name, buckets, new
model_name (str): Model name.
buckets (list): List of bucket boundaries.
new_requests (dict): Dictionary with new requests to update bucket values.
help_header: the flag to include HELP Header
Returns:
str: Prometheus-formatted histogram metric.
Expand Down Expand Up @@ -271,6 +272,10 @@ def generate_histogram_metric(metric_name, description, model_name, buckets, new
vllm:{metric_name}_sum{{model_name="{model_name}"}} {value}
{buckets}
vllm:{metric_name}_count{{model_name="{model_name}"}} {count}
""" if help_header else """
vllm:{metric_name}_sum{{model_name="{model_name}"}} {value}
{buckets}
vllm:{metric_name}_count{{model_name="{model_name}"}} {count}
"""

return histogram_template.format(
Expand All @@ -283,7 +288,7 @@ def generate_histogram_metric(metric_name, description, model_name, buckets, new
)


def generate_counter_gauge_metric(metric_name, metric_type, description, model_name, value):
def generate_counter_gauge_metric(metric_name, metric_type, description, model_name, value, help_header=True):
"""
Generates a Prometheus metric string for counter or gauge.
Expand All @@ -293,6 +298,7 @@ def generate_counter_gauge_metric(metric_name, metric_type, description, model_n
description (str): The HELP description of the metric.
model_name (str): The name of the model.
value (float): The value of the metric.
help_header: the flag to include HELP Header
Returns:
str: A formatted Prometheus metric string.
Expand All @@ -301,6 +307,8 @@ def generate_counter_gauge_metric(metric_name, metric_type, description, model_n
# HELP vllm:{metric_name} {description}
# TYPE vllm:{metric_name} {metric_type}
vllm:{metric_name}{{model_name="{model_name}"}} {value}
""" if help_header else """
vllm:{metric_name}{{model_name="{model_name}"}} {value}
"""

return counter_gauge_template.format(
Expand Down Expand Up @@ -386,17 +394,12 @@ def metrics():
]

# Generate all metrics
metrics_output = "".join(
generate_counter_gauge_metric(metric["name"], metric["type"], metric["description"], model_name,
metric["value"])
for metric in simple_metrics
)

lora_metrics_output = "".join(
generate_counter_gauge_metric(metric["name"], metric["type"], metric["description"], "lora-model-1",
metric["value"])
for metric in simple_metrics
)
metrics_output = ""
for metric in simple_metrics:
metrics_output += generate_counter_gauge_metric(metric["name"], metric["type"], metric["description"],
model_name, metric["value"])
metrics_output += generate_counter_gauge_metric(metric["name"], metric["type"], metric["description"],
"lora-model-1", metric["value"], help_header=False)

histogram_metrics = [
{
Expand Down Expand Up @@ -464,20 +467,17 @@ def metrics():
buckets=metric["buckets"],
new_requests=new_requests
)

lora_histogram_metrics_output = ""
for metric in histogram_metrics:
# Simulate random new requests for the metric
new_requests = {bucket: random.randint(0, 5) for bucket in metric["buckets"]}
lora_histogram_metrics_output += generate_histogram_metric(
histogram_metrics_output += generate_histogram_metric(
metric_name=metric["name"],
description=metric["description"],
model_name="lora-model-1",
buckets=metric["buckets"],
new_requests=new_requests
new_requests=new_requests,
help_header=False
)

return Response(metrics_output + lora_metrics_output + histogram_metrics_output + lora_histogram_metrics_output, mimetype='text/plain')
return Response(metrics_output + histogram_metrics_output, mimetype='text/plain')


if __name__ == '__main__':
Expand Down
153 changes: 126 additions & 27 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net/http"
"os"
Expand All @@ -46,6 +45,7 @@ import (
v1alpha1scheme "github.com/aibrix/aibrix/pkg/client/clientset/versioned/scheme"
"github.com/aibrix/aibrix/pkg/metrics"
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/expfmt"
"k8s.io/client-go/kubernetes/scheme"
)

Expand All @@ -61,10 +61,11 @@ type Cache struct {
metrics map[string]interface{}
ModelMetrics map[string]map[string]interface{}
Pods map[string]*v1.Pod
PodMetrics map[string]map[string]metrics.MetricValue // pod_name: map[metric_name]metric_val
PodToModelMapping map[string]map[string]struct{} // pod_name: map[model_name]struct{}
ModelToPodMapping map[string]map[string]*v1.Pod // model_name: map[pod_name]*v1.Pod
requestTrace map[string]map[string]int // model_name: map[Log2(input_token)-Log2(output_token)]request_count
PodMetrics map[string]map[string]metrics.MetricValue // pod_name: map[metric_name]metric_val
PodModelMetrics map[string]map[string]map[string]metrics.MetricValue // pod_name: map[model_name]map[metric_name]metric_val
PodToModelMapping map[string]map[string]struct{} // pod_name: map[model_name]struct{}
ModelToPodMapping map[string]map[string]*v1.Pod // model_name: map[pod_name]*v1.Pod
requestTrace map[string]map[string]int // model_name: map[Log2(input_token)-Log2(output_token)]request_count
}

const (
Expand Down Expand Up @@ -194,6 +195,7 @@ func NewCache(config *rest.Config, stopCh <-chan struct{}, redisClient *redis.Cl
prometheusApi: prometheusApi,
Pods: map[string]*v1.Pod{},
PodMetrics: map[string]map[string]metrics.MetricValue{},
PodModelMetrics: map[string]map[string]map[string]metrics.MetricValue{},
PodToModelMapping: map[string]map[string]struct{}{},
ModelToPodMapping: map[string]map[string]*v1.Pod{},
requestTrace: map[string]map[string]int{},
Expand Down Expand Up @@ -511,22 +513,69 @@ func (c *Cache) GetPodMetric(podName, metricName string) (metrics.MetricValue, e
return metricVal, nil
}

func (c *Cache) GetPodModelMetric(podName, modelName string, metricName string) (metrics.MetricValue, error) {
c.mu.RLock()
defer c.mu.RUnlock()

podMetrics, ok := c.PodModelMetrics[podName]
if !ok {
return nil, fmt.Errorf("pod does not exist in the podMetrics cache")
}

modelMetrics, ok := podMetrics[modelName]
if !ok {
return nil, fmt.Errorf("model does not exist in the podMetrics cache")
}

metricVal, ok := modelMetrics[metricName]
if !ok {
return nil, fmt.Errorf("no metric available for %v", metricName)
}

return metricVal, nil
}

// Update `PodMetrics` and `PodModelMetrics` according to the metric scope
func (c *Cache) updatePodRecord(podName string, modelName string, metricName string, scope metrics.MetricScope, metricValue metrics.MetricValue) error {
if scope == metrics.PodMetricScope {
if modelName != "" {
return fmt.Errorf("modelName should be empty for scope %v", scope)
}
c.PodMetrics[podName][metricName] = metricValue
} else if scope == metrics.PodModelMetricScope {
if modelName == "" {
return fmt.Errorf("modelName should not be empty for scope %v", scope)
}
if len(c.PodModelMetrics[podName][modelName]) == 0 {
c.PodModelMetrics[podName][modelName] = map[string]metrics.MetricValue{}
}
c.PodModelMetrics[podName][modelName][metricName] = metricValue
} else {
return fmt.Errorf("scope %v is not supported", scope)
}
return nil
}

func (c *Cache) updatePodMetrics() {
c.mu.Lock()
defer c.mu.Unlock()

for _, pod := range c.Pods {
// Only scrape healthy Pod
if pod.Status.PodIP == "" || utils.IsPodTerminating(pod) || utils.IsPodReady(pod) {
if pod.Status.PodIP == "" || utils.IsPodTerminating(pod) || !utils.IsPodReady(pod) {
continue
}
podName := pod.Name
if len(c.PodMetrics[podName]) == 0 {
c.PodMetrics[podName] = map[string]metrics.MetricValue{}
}
if len(c.PodModelMetrics[podName]) == 0 {
c.PodModelMetrics[podName] = make(map[string]map[string]metrics.MetricValue)
}

// We should use the primary container port. In the future, we can decide whether to use sidecar container's port
url := fmt.Sprintf("http://%s:%d/metrics", pod.Status.PodIP, podPort)

resp, err := http.Get(url)
if err != nil {
klog.Errorf("failed to fetch metrics from pod %s %s %d: %v", pod.Name, pod.Status.PodIP, podPort, err)
Expand All @@ -538,41 +587,83 @@ func (c *Cache) updatePodMetrics() {
}
}()

body, err := io.ReadAll(resp.Body)
// TODO: the metrics should come from those router subscribers in future
var parser expfmt.TextParser
allMetrics, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
klog.Errorf("failed to read response from pod %s %s %d: %v", pod.Name, pod.Status.PodIP, podPort, err)
continue
fmt.Printf("Error parsing metric families: %v\n", err)
}

// TODO: the metrics should come from those router subscribers in future

// parse counterGaugeMetricsNames
for _, metricName := range counterGaugeMetricNames {
metricValue, err := metrics.ParseMetricFromBody(body, metricName)
if err != nil {
klog.Errorf("failed to parse metrics from pod %s %s %d: %v", pod.Name, pod.Status.PodIP, podPort, err)
metric, exists := metrics.Metrics[metricName]
if !exists {
klog.Warningf("Cannot find %v in the metric list", metricName)
continue
}

c.PodMetrics[pod.Name][metricName] = &metrics.SimpleMetricValue{Value: metricValue}
klog.V(5).InfoS("Successfully parsed metrics", "metric", metricName, "PodIP", pod.Status.PodIP, "Port", podPort, "metricValue", metricValue)
// TODO: we should refact metricName to fit other engine
metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", metricName)]
if !exists {
klog.Warningf("Cannot find %v in the pod metrics", metricName)
continue
}
scope := metric.MetricScope
for _, familyMetric := range metricFamily.Metric {
modelName, _ := metrics.GetLabelValueForKey(familyMetric, "model_name")

metricValue, err := metrics.GetCounterGaugeValue(familyMetric, metricFamily.GetType())
if err != nil {
klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
continue
}

err = c.updatePodRecord(podName, modelName, metricName, scope, &metrics.SimpleMetricValue{Value: metricValue})
if err != nil {
klog.Errorf("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
continue
}

klog.V(5).InfoS("Successfully parsed metrics", "metric", metricName, "model", modelName, "PodIP", pod.Status.PodIP, "Port", podPort, "metricValue", metricValue)
}
}

// parse histogramMetrics
for _, metricName := range histogramMetricNames {
metricValue, err := metrics.ParseHistogramFromBody(body, metricName)
if err != nil {
klog.Errorf("failed to parse metrics from pod %s %s %d: %v", pod.Name, pod.Status.PodIP, podPort, err)
metric, exists := metrics.Metrics[metricName]
if !exists {
klog.Warningf("Cannot find %v in the metric list", metricName)
continue
}

value := metricValue.GetHistogramValue()
c.PodMetrics[pod.Name][metricName] = &metrics.HistogramMetricValue{
Sum: value.Sum,
Count: value.Count,
Buckets: value.Buckets,
metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", metricName)]
if !exists {
klog.Warningf("Cannot find %v in the pod metrics", metricName)
continue
}
scope := metric.MetricScope
for _, familyMetric := range metricFamily.Metric {
modelName, _ := metrics.GetLabelValueForKey(familyMetric, "model_name")
metricValue, err := metrics.GetHistogramValue(familyMetric)
if err != nil {
klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err)
continue
}

histogramValue := &metrics.HistogramMetricValue{
Sum: metricValue.Sum,
Count: metricValue.Count,
Buckets: metricValue.Buckets,
}
err = c.updatePodRecord(podName, modelName, metricName, scope, histogramValue)
if err != nil {
klog.Errorf("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
continue
}

klog.V(5).InfoS("Successfully parsed metrics", "metric", metricName, "model", modelName, "PodIP", pod.Status.PodIP, "Port", podPort, "metricValue", metricValue)

}
klog.V(5).InfoS("Successfully parsed metrics", "metric", metricName, "PodIP", pod.Status.PodIP, "Port", podPort, "metricValue", metricValue)
}

if c.prometheusApi == nil {
Expand All @@ -581,16 +672,18 @@ func (c *Cache) updatePodMetrics() {
}

for _, metricName := range prometheusMetricNames {
// TODO: Get another model PromQL value from the same pod
modelName := pod.Labels["model.aibrix.ai/name"]
queryLabels := map[string]string{
"model_name": modelName,
"instance": fmt.Sprintf("%s/%d", pod.Status.PodIP, podPort),
"instance": fmt.Sprintf("%s:%d", pod.Status.PodIP, podPort),
}
metric, ok := metrics.Metrics[metricName]
if !ok {
klog.Warningf("Cannot find %v in the metric list", metricName)
continue
}
scope := metric.MetricScope
query := metrics.BuildQuery(metric.PromQL, queryLabels)
// Querying metrics
result, warnings, err := c.prometheusApi.Query(context.Background(), query, time.Now())
Expand All @@ -605,7 +698,13 @@ func (c *Cache) updatePodMetrics() {

klog.Infof("Query Result:%v\n", result)
// Update metrics
c.PodMetrics[pod.Name][metricName] = &metrics.PrometheusMetricValue{Result: &result}
metricValue := &metrics.PrometheusMetricValue{Result: &result}
err = c.updatePodRecord(podName, modelName, metricName, scope, metricValue)
if err != nil {
klog.Errorf("Failed to update metrics %s from prometheus %s: %v", metricName, podName, err)
continue
}
klog.V(5).InfoS("Successfully parsed metrics from prometheus", "metric", metricName, "model", modelName, "PodIP", pod.Status.PodIP, "Port", podPort, "metricValue", metricValue)
}
}
}
Expand Down
Loading

0 comments on commit f1659ab

Please sign in to comment.