From 8cf860e7b7eccdf4128add46781a54d9f20c768c Mon Sep 17 00:00:00 2001 From: brosoul Date: Fri, 29 Nov 2024 15:55:55 +0800 Subject: [PATCH 1/6] fix: concat indicators with the same name together --- docs/development/app/app.py | 40 ++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/docs/development/app/app.py b/docs/development/app/app.py index 9000d953..011120b7 100644 --- a/docs/development/app/app.py +++ b/docs/development/app/app.py @@ -212,7 +212,7 @@ def set_metrics(): metrics_state = {} -def generate_histogram_metric(metric_name, description, model_name, buckets, new_requests): +def generate_histogram_metric(metric_name, description, model_name, buckets, new_requests, help_header=True): """ Generate Prometheus histogram metrics with dynamically updated bucket values. @@ -222,6 +222,7 @@ def generate_histogram_metric(metric_name, description, model_name, buckets, new model_name (str): Model name. buckets (list): List of bucket boundaries. new_requests (dict): Dictionary with new requests to update bucket values. + help_header: the flag to include HELP Header Returns: str: Prometheus-formatted histogram metric. @@ -271,6 +272,10 @@ def generate_histogram_metric(metric_name, description, model_name, buckets, new vllm:{metric_name}_sum{{model_name="{model_name}"}} {value} {buckets} vllm:{metric_name}_count{{model_name="{model_name}"}} {count} +""" if help_header else """ +vllm:{metric_name}_sum{{model_name="{model_name}"}} {value} +{buckets} +vllm:{metric_name}_count{{model_name="{model_name}"}} {count} """ return histogram_template.format( @@ -283,7 +288,7 @@ def generate_histogram_metric(metric_name, description, model_name, buckets, new ) -def generate_counter_gauge_metric(metric_name, metric_type, description, model_name, value): +def generate_counter_gauge_metric(metric_name, metric_type, description, model_name, value, help_header=True): """ Generates a Prometheus metric string for counter or gauge. @@ -293,6 +298,7 @@ def generate_counter_gauge_metric(metric_name, metric_type, description, model_n description (str): The HELP description of the metric. model_name (str): The name of the model. value (float): The value of the metric. + help_header: the flag to include HELP Header Returns: str: A formatted Prometheus metric string. @@ -301,6 +307,8 @@ def generate_counter_gauge_metric(metric_name, metric_type, description, model_n # HELP vllm:{metric_name} {description} # TYPE vllm:{metric_name} {metric_type} vllm:{metric_name}{{model_name="{model_name}"}} {value} +""" if help_header else """ +vllm:{metric_name}{{model_name="{model_name}"}} {value} """ return counter_gauge_template.format( @@ -386,17 +394,12 @@ def metrics(): ] # Generate all metrics - metrics_output = "".join( - generate_counter_gauge_metric(metric["name"], metric["type"], metric["description"], model_name, - metric["value"]) - for metric in simple_metrics - ) - - lora_metrics_output = "".join( - generate_counter_gauge_metric(metric["name"], metric["type"], metric["description"], "lora-model-1", - metric["value"]) - for metric in simple_metrics - ) + metrics_output = "" + for metric in simple_metrics: + metrics_output += generate_counter_gauge_metric(metric["name"], metric["type"], metric["description"], + model_name, metric["value"]) + metrics_output += generate_counter_gauge_metric(metric["name"], metric["type"], metric["description"], + "lora-model-1", metric["value"], help_header=False) histogram_metrics = [ { @@ -464,20 +467,17 @@ def metrics(): buckets=metric["buckets"], new_requests=new_requests ) - - lora_histogram_metrics_output = "" - for metric in histogram_metrics: - # Simulate random new requests for the metric new_requests = {bucket: random.randint(0, 5) for bucket in metric["buckets"]} - lora_histogram_metrics_output += generate_histogram_metric( + histogram_metrics_output += generate_histogram_metric( metric_name=metric["name"], description=metric["description"], model_name="lora-model-1", buckets=metric["buckets"], - new_requests=new_requests + new_requests=new_requests, + help_header=False ) - return Response(metrics_output + lora_metrics_output + histogram_metrics_output + lora_histogram_metrics_output, mimetype='text/plain') + return Response(metrics_output + histogram_metrics_output, mimetype='text/plain') if __name__ == '__main__': From ec8dc2e627b2e8932d97526471c5deed12c336a1 Mon Sep 17 00:00:00 2001 From: brosoul Date: Fri, 29 Nov 2024 17:39:00 +0800 Subject: [PATCH 2/6] refact: add model args into Router --- .../gateway/algorithms/least_request.go | 2 +- pkg/plugins/gateway/algorithms/random.go | 2 +- pkg/plugins/gateway/algorithms/router.go | 2 +- pkg/plugins/gateway/algorithms/router_test.go | 21 +++++++++++-------- pkg/plugins/gateway/algorithms/throughput.go | 2 +- pkg/plugins/gateway/gateway.go | 6 +++--- 6 files changed, 19 insertions(+), 16 deletions(-) diff --git a/pkg/plugins/gateway/algorithms/least_request.go b/pkg/plugins/gateway/algorithms/least_request.go index 036d5a00..b80e91e2 100644 --- a/pkg/plugins/gateway/algorithms/least_request.go +++ b/pkg/plugins/gateway/algorithms/least_request.go @@ -43,7 +43,7 @@ func NewLeastRequestRouter() Router { } } -func (r leastRequestRouter) Route(ctx context.Context, pods map[string]*v1.Pod) (string, error) { +func (r leastRequestRouter) Route(ctx context.Context, pods map[string]*v1.Pod, model string) (string, error) { var targetPodIP string minCount := math.MaxFloat64 diff --git a/pkg/plugins/gateway/algorithms/random.go b/pkg/plugins/gateway/algorithms/random.go index a82630e0..0ef564b1 100644 --- a/pkg/plugins/gateway/algorithms/random.go +++ b/pkg/plugins/gateway/algorithms/random.go @@ -33,7 +33,7 @@ func NewRandomRouter() Router { return randomRouter{} } -func (r randomRouter) Route(ctx context.Context, pods map[string]*v1.Pod) (string, error) { +func (r randomRouter) Route(ctx context.Context, pods map[string]*v1.Pod, model string) (string, error) { var targetPodIP string if len(pods) == 0 { return "", fmt.Errorf("no pods to forward request") diff --git a/pkg/plugins/gateway/algorithms/router.go b/pkg/plugins/gateway/algorithms/router.go index 0b2cfc38..d929883f 100644 --- a/pkg/plugins/gateway/algorithms/router.go +++ b/pkg/plugins/gateway/algorithms/router.go @@ -30,7 +30,7 @@ const podMetricPort = "8000" // Router defines the interface for routing logic to select target pods. type Router interface { // Route returns the target pod - Route(ctx context.Context, pods map[string]*v1.Pod) (string, error) + Route(ctx context.Context, pods map[string]*v1.Pod, model string) (string, error) } // selectRandomPodWithRand selects a random pod from the provided pod map. diff --git a/pkg/plugins/gateway/algorithms/router_test.go b/pkg/plugins/gateway/algorithms/router_test.go index 6a29ff22..0bbdd2b8 100644 --- a/pkg/plugins/gateway/algorithms/router_test.go +++ b/pkg/plugins/gateway/algorithms/router_test.go @@ -32,21 +32,22 @@ import ( func TestNoPods(t *testing.T) { c := cache.Cache{} r1 := randomRouter{} - targetPodIP, err := r1.Route(context.TODO(), c.Pods) + model := "" + targetPodIP, err := r1.Route(context.TODO(), c.Pods, model) assert.Empty(t, targetPodIP, "targetPodIP must be empty") assert.Error(t, err, "no pod has IP") r2 := leastRequestRouter{ cache: &c, } - targetPodIP, err = r2.Route(context.TODO(), c.Pods) + targetPodIP, err = r2.Route(context.TODO(), c.Pods, model) assert.Empty(t, targetPodIP, "targetPodIP must be empty") assert.Error(t, err, "no pod has IP") r3 := throughputRouter{ cache: &c, } - targetPodIP, err = r3.Route(context.TODO(), c.Pods) + targetPodIP, err = r3.Route(context.TODO(), c.Pods, model) assert.Empty(t, targetPodIP, "targetPodIP must be empty") assert.Error(t, err, "no pod has IP") } @@ -66,23 +67,24 @@ func TestWithNoIPPods(t *testing.T) { }, }, } + model := "" r1 := randomRouter{} - targetPodIP, err := r1.Route(context.TODO(), c.Pods) + targetPodIP, err := r1.Route(context.TODO(), c.Pods, model) assert.Empty(t, targetPodIP, "targetPodIP must be empty") assert.Error(t, err, "no pod has IP") r2 := leastRequestRouter{ cache: &c, } - targetPodIP, err = r2.Route(context.TODO(), c.Pods) + targetPodIP, err = r2.Route(context.TODO(), c.Pods, model) assert.Empty(t, targetPodIP, "targetPodIP must be empty") assert.Error(t, err, "no pod has IP") r3 := throughputRouter{ cache: &c, } - targetPodIP, err = r3.Route(context.TODO(), c.Pods) + targetPodIP, err = r3.Route(context.TODO(), c.Pods, model) assert.Empty(t, targetPodIP, "targetPodIP must be empty") assert.Error(t, err, "no pod has IP") } @@ -140,23 +142,24 @@ func TestWithIPPods(t *testing.T) { }, }, } + model := "" r1 := randomRouter{} - targetPodIP, err := r1.Route(context.TODO(), c.Pods) + targetPodIP, err := r1.Route(context.TODO(), c.Pods, model) assert.NotEmpty(t, targetPodIP, "targetPodIP is not empty") assert.NoError(t, err) r2 := leastRequestRouter{ cache: &c, } - targetPodIP, err = r2.Route(context.TODO(), c.Pods) + targetPodIP, err = r2.Route(context.TODO(), c.Pods, model) assert.NotEmpty(t, targetPodIP, "targetPodIP is not empty") assert.NoError(t, err) r3 := throughputRouter{ cache: &c, } - targetPodIP, err = r3.Route(context.TODO(), c.Pods) + targetPodIP, err = r3.Route(context.TODO(), c.Pods, model) assert.NotEmpty(t, targetPodIP, "targetPodIP is not empty") assert.NoError(t, err) } diff --git a/pkg/plugins/gateway/algorithms/throughput.go b/pkg/plugins/gateway/algorithms/throughput.go index 1249f781..1de9092d 100644 --- a/pkg/plugins/gateway/algorithms/throughput.go +++ b/pkg/plugins/gateway/algorithms/throughput.go @@ -43,7 +43,7 @@ func NewThroughputRouter() Router { } } -func (r throughputRouter) Route(ctx context.Context, pods map[string]*v1.Pod) (string, error) { +func (r throughputRouter) Route(ctx context.Context, pods map[string]*v1.Pod, model string) (string, error) { var targetPodIP string minCount := math.MaxFloat64 diff --git a/pkg/plugins/gateway/gateway.go b/pkg/plugins/gateway/gateway.go index fd06bd48..44e3ebf9 100644 --- a/pkg/plugins/gateway/gateway.go +++ b/pkg/plugins/gateway/gateway.go @@ -268,7 +268,7 @@ func (s *Server) HandleRequestBody(ctx context.Context, requestID string, req *e fmt.Sprintf("error on getting pods for model %s", model)), targetPodIP, stream } - targetPodIP, err = s.selectTargetPod(ctx, routingStrategy, pods) + targetPodIP, err = s.selectTargetPod(ctx, routingStrategy, pods, model) if err != nil { return generateErrorResponse( envoyTypePb.StatusCode_InternalServerError, @@ -519,7 +519,7 @@ func (s *Server) checkTPM(ctx context.Context, username string, tpmLimit int64) return envoyTypePb.StatusCode_OK, nil } -func (s *Server) selectTargetPod(ctx context.Context, routingStrategy string, pods map[string]*v1.Pod) (string, error) { +func (s *Server) selectTargetPod(ctx context.Context, routingStrategy string, pods map[string]*v1.Pod, model string) (string, error) { var route routing.Router switch routingStrategy { case "least-request": @@ -530,7 +530,7 @@ func (s *Server) selectTargetPod(ctx context.Context, routingStrategy string, po route = s.routers["random"] } - return route.Route(ctx, pods) + return route.Route(ctx, pods, model) } func validateRoutingStrategy(routingStrategy string) bool { From 861df8a2d6b639911d946a8f2f87cf3e9a1097b1 Mon Sep 17 00:00:00 2001 From: brosoul Date: Sun, 1 Dec 2024 19:40:00 +0800 Subject: [PATCH 3/6] feat: support label for multi model in pod metrics --- pkg/cache/cache.go | 152 ++++++++++++++++++++++++++++++++++------- pkg/metrics/metrics.go | 14 ++++ pkg/metrics/types.go | 10 +++ pkg/metrics/utils.go | 37 ++++++++++ 4 files changed, 187 insertions(+), 26 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 5a89b830..2ef5fc3e 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -21,7 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "math" "net/http" "os" @@ -46,6 +45,7 @@ import ( v1alpha1scheme "github.com/aibrix/aibrix/pkg/client/clientset/versioned/scheme" "github.com/aibrix/aibrix/pkg/metrics" prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/expfmt" "k8s.io/client-go/kubernetes/scheme" ) @@ -61,10 +61,11 @@ type Cache struct { metrics map[string]interface{} ModelMetrics map[string]map[string]interface{} Pods map[string]*v1.Pod - PodMetrics map[string]map[string]metrics.MetricValue // pod_name: map[metric_name]metric_val - PodToModelMapping map[string]map[string]struct{} // pod_name: map[model_name]struct{} - ModelToPodMapping map[string]map[string]*v1.Pod // model_name: map[pod_name]*v1.Pod - requestTrace map[string]map[string]int // model_name: map[Log2(input_token)-Log2(output_token)]request_count + PodMetrics map[string]map[string]metrics.MetricValue // pod_name: map[metric_name]metric_val + PodModelMetrics map[string]map[string]map[string]metrics.MetricValue // pod_name: map[model_name]map[metric_name]metric_val + PodToModelMapping map[string]map[string]struct{} // pod_name: map[model_name]struct{} + ModelToPodMapping map[string]map[string]*v1.Pod // model_name: map[pod_name]*v1.Pod + requestTrace map[string]map[string]int // model_name: map[Log2(input_token)-Log2(output_token)]request_count } const ( @@ -194,6 +195,7 @@ func NewCache(config *rest.Config, stopCh <-chan struct{}, redisClient *redis.Cl prometheusApi: prometheusApi, Pods: map[string]*v1.Pod{}, PodMetrics: map[string]map[string]metrics.MetricValue{}, + PodModelMetrics: map[string]map[string]map[string]metrics.MetricValue{}, PodToModelMapping: map[string]map[string]struct{}{}, ModelToPodMapping: map[string]map[string]*v1.Pod{}, requestTrace: map[string]map[string]int{}, @@ -511,6 +513,28 @@ func (c *Cache) GetPodMetric(podName, metricName string) (metrics.MetricValue, e return metricVal, nil } +func (c *Cache) GetPodModelMetric(podName, modelName string, metricName string) (metrics.MetricValue, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + podMetrics, ok := c.PodModelMetrics[podName] + if !ok { + return nil, fmt.Errorf("pod does not exist in the podMetrics cache") + } + + modelMetrics, ok := podMetrics[modelName] + if !ok { + return nil, fmt.Errorf("model does not exist in the podMetrics cache") + } + + metricVal, ok := modelMetrics[metricName] + if !ok { + return nil, fmt.Errorf("no metric available for %v", metricName) + } + + return metricVal, nil +} + func (c *Cache) updatePodMetrics() { c.mu.Lock() defer c.mu.Unlock() @@ -524,6 +548,9 @@ func (c *Cache) updatePodMetrics() { if len(c.PodMetrics[podName]) == 0 { c.PodMetrics[podName] = map[string]metrics.MetricValue{} } + if len(c.PodModelMetrics[podName]) == 0 { + c.PodModelMetrics[podName] = make(map[string]map[string]metrics.MetricValue) + } // We should use the primary container port. In the future, we can decide whether to use sidecar container's port url := fmt.Sprintf("http://%s:%d/metrics", pod.Status.PodIP, podPort) @@ -538,41 +565,110 @@ func (c *Cache) updatePodMetrics() { } }() - body, err := io.ReadAll(resp.Body) + // TODO: the metrics should come from those router subscribers in future + var parser expfmt.TextParser + allMetrics, err := parser.TextToMetricFamilies(resp.Body) if err != nil { - klog.Errorf("failed to read response from pod %s %s %d: %v", pod.Name, pod.Status.PodIP, podPort, err) - continue + fmt.Printf("Error parsing metric families: %v\n", err) } - // TODO: the metrics should come from those router subscribers in future - // parse counterGaugeMetricsNames for _, metricName := range counterGaugeMetricNames { - metricValue, err := metrics.ParseMetricFromBody(body, metricName) - if err != nil { - klog.Errorf("failed to parse metrics from pod %s %s %d: %v", pod.Name, pod.Status.PodIP, podPort, err) + metric, exists := metrics.Metrics[metricName] + if !exists { + klog.Errorf("failed to parse metrics: Metrics not contains %s", metricName) continue } - c.PodMetrics[pod.Name][metricName] = &metrics.SimpleMetricValue{Value: metricValue} - klog.V(5).InfoS("Successfully parsed metrics", "metric", metricName, "PodIP", pod.Status.PodIP, "Port", podPort, "metricValue", metricValue) + // TODO: we should refact metricName to fit other engine + metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", metricName)] + if !exists { + klog.Errorf("failed to parse metrics from pod %s %s %d: Pod metrics not contains %s", pod.Name, pod.Status.PodIP, podPort, metricName) + continue + } + scope := metric.MetricScope + for _, familyMetric := range metricFamily.Metric { + modelName, err := metrics.GetLabelValueForKey(familyMetric, "model_name") + + metricValue, err := metrics.GetCounterGaugeValue(familyMetric, metricFamily.GetType()) + if err != nil { + klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err) + continue + } + + if scope == metrics.PodMetricScope { + if modelName != "" { + klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err) + continue + } + c.PodMetrics[pod.Name][metricName] = &metrics.SimpleMetricValue{Value: metricValue} + } else if scope == metrics.PodModelMetricScope { + if modelName == "" { + klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err) + continue + } + if len(c.PodModelMetrics[podName][modelName]) == 0 { + c.PodModelMetrics[podName][modelName] = map[string]metrics.MetricValue{} + } + c.PodModelMetrics[pod.Name][modelName][metricName] = &metrics.SimpleMetricValue{Value: metricValue} + } else { + klog.Errorf("failed to parse metrics %s from pod %s %s %d: Scope %v not supported", metricName, pod.Name, pod.Status.PodIP, podPort, scope) + continue + } + klog.V(5).InfoS("Successfully parsed metrics", "metric", metricName, "model", modelName, "PodIP", pod.Status.PodIP, "Port", podPort, "metricValue", metricValue) + } } // parse histogramMetrics for _, metricName := range histogramMetricNames { - metricValue, err := metrics.ParseHistogramFromBody(body, metricName) - if err != nil { - klog.Errorf("failed to parse metrics from pod %s %s %d: %v", pod.Name, pod.Status.PodIP, podPort, err) + metric, exists := metrics.Metrics[metricName] + if !exists { + klog.Errorf("failed to parse metrics: Metrics not contains %s", metricName) + continue + } + + metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", metricName)] + if !exists { + klog.Errorf("failed to parse metrics from pod %s %s %d: Pod metrics not contains %s", pod.Name, pod.Status.PodIP, podPort, metricName) continue } + scope := metric.MetricScope + for _, familyMetric := range metricFamily.Metric { + modelName, err := metrics.GetLabelValueForKey(familyMetric, "model_name") + metricValue, err := metrics.GetHistogramValue(familyMetric) + if err != nil { + klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err) + continue + } + + histogramValue := &metrics.HistogramMetricValue{ + Sum: metricValue.Sum, + Count: metricValue.Count, + Buckets: metricValue.Buckets, + } + + if scope == metrics.PodMetricScope { + if modelName != "" { + klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err) + continue + } + c.PodMetrics[pod.Name][metricName] = histogramValue + } else if scope == metrics.PodModelMetricScope { + if modelName == "" { + klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err) + continue + } + if len(c.PodModelMetrics[podName][modelName]) == 0 { + c.PodModelMetrics[podName][modelName] = map[string]metrics.MetricValue{} + } + c.PodModelMetrics[pod.Name][modelName][metricName] = histogramValue + } else { + klog.Errorf("failed to parse metrics %s from pod %s %s %d: Scope %v not supported", metricName, pod.Name, pod.Status.PodIP, podPort, scope) + continue + } + klog.V(5).InfoS("Successfully parsed metrics", "metric", metricName, "model", modelName, "PodIP", pod.Status.PodIP, "Port", podPort, "metricValue", metricValue) - value := metricValue.GetHistogramValue() - c.PodMetrics[pod.Name][metricName] = &metrics.HistogramMetricValue{ - Sum: value.Sum, - Count: value.Count, - Buckets: value.Buckets, } - klog.V(5).InfoS("Successfully parsed metrics", "metric", metricName, "PodIP", pod.Status.PodIP, "Port", podPort, "metricValue", metricValue) } if c.prometheusApi == nil { @@ -581,10 +677,11 @@ func (c *Cache) updatePodMetrics() { } for _, metricName := range prometheusMetricNames { + // TODO: Get another model PromQL value from the same pod modelName := pod.Labels["model.aibrix.ai/name"] queryLabels := map[string]string{ "model_name": modelName, - "instance": fmt.Sprintf("%s/%d", pod.Status.PodIP, podPort), + "instance": fmt.Sprintf("%s:%d", pod.Status.PodIP, podPort), } metric, ok := metrics.Metrics[metricName] if !ok { @@ -605,7 +702,10 @@ func (c *Cache) updatePodMetrics() { klog.Infof("Query Result:%v\n", result) // Update metrics - c.PodMetrics[pod.Name][metricName] = &metrics.PrometheusMetricValue{Result: &result} + if len(c.PodModelMetrics[podName][modelName]) == 0 { + c.PodModelMetrics[podName][modelName] = map[string]metrics.MetricValue{} + } + c.PodModelMetrics[pod.Name][modelName][metricName] = &metrics.PrometheusMetricValue{Result: &result} } } } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 17e18a7a..e37addf8 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -38,6 +38,7 @@ var ( Metrics = map[string]Metric{ // Counter metrics NumRequestsRunning: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Counter, @@ -45,6 +46,7 @@ var ( Description: "Number of running requests", }, NumRequestsWaiting: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Counter, @@ -52,6 +54,7 @@ var ( Description: "Number of waiting requests", }, NumRequestsSwapped: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Counter, @@ -60,6 +63,7 @@ var ( }, // Gauge metrics AvgPromptThroughputToksPerS: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Gauge, @@ -67,6 +71,7 @@ var ( Description: "Average prompt throughput in tokens per second", }, AvgGenerationThroughputToksPerS: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Gauge, @@ -75,6 +80,7 @@ var ( }, // Histogram metrics IterationTokensTotal: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Histogram, @@ -82,6 +88,7 @@ var ( Description: "Total iteration tokens", }, TimeToFirstTokenSeconds: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Histogram, @@ -89,6 +96,7 @@ var ( Description: "Time to first token in seconds", }, TimePerOutputTokenSeconds: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Histogram, @@ -96,6 +104,7 @@ var ( Description: "Time per output token in seconds", }, E2ERequestLatencySeconds: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Histogram, @@ -103,6 +112,7 @@ var ( Description: "End-to-end request latency in seconds", }, RequestQueueTimeSeconds: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Histogram, @@ -110,6 +120,7 @@ var ( Description: "Request queue time in seconds", }, RequestInferenceTimeSeconds: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Histogram, @@ -117,6 +128,7 @@ var ( Description: "Request inference time in seconds", }, RequestDecodeTimeSeconds: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Histogram, @@ -124,6 +136,7 @@ var ( Description: "Request decode time in seconds", }, RequestPrefillTimeSeconds: { + MetricScope: PodModelMetricScope, MetricSource: PodRawMetrics, MetricType: MetricType{ Raw: Histogram, @@ -132,6 +145,7 @@ var ( }, // Query-based metrics P95TTFT5m: { + MetricScope: PodModelMetricScope, MetricSource: PrometheusEndpoint, MetricType: MetricType{ Query: PromQL, diff --git a/pkg/metrics/types.go b/pkg/metrics/types.go index 14440656..3bd2995a 100644 --- a/pkg/metrics/types.go +++ b/pkg/metrics/types.go @@ -65,12 +65,22 @@ func (m MetricType) IsQuery() bool { return m.Query != "" } +// MetricScope defines the scope of a metric (e.g., model or pod or podmodel). +type MetricScope string + +const ( + ModelMetricScope MetricScope = "Model" + PodMetricScope MetricScope = "Pod" + PodModelMetricScope MetricScope = "PodModel" // model in pod +) + // Metric defines a unique metric with metadata. type Metric struct { MetricSource MetricSource MetricType MetricType PromQL string // Optional: Only applicable for PromQL-based metrics Description string + MetricScope MetricScope } // MetricValue is the interface for all metric values. diff --git a/pkg/metrics/utils.go b/pkg/metrics/utils.go index c3d7d88a..1a2bb355 100644 --- a/pkg/metrics/utils.go +++ b/pkg/metrics/utils.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/client_golang/api" prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1" + dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/config" ) @@ -159,3 +160,39 @@ func InitializePrometheusAPI(endpoint, username, password string) (prometheusv1. return prometheusv1.NewAPI(client), nil } + +func GetLabelValueForKey(metric *dto.Metric, key string) (string, error) { + for _, labelPair := range metric.Label { + if labelPair.GetName() == key { + return labelPair.GetValue(), nil + } + } + return "", fmt.Errorf("Label %s not found", key) +} + +func GetCounterGaugeValue(metric *dto.Metric, metricType dto.MetricType) (float64, error) { + if metricType == dto.MetricType_COUNTER { + return metric.GetCounter().GetValue(), nil + } else if metricType == dto.MetricType_GAUGE { + return metric.GetGauge().GetValue(), nil + } + return 0, fmt.Errorf("Metric type not supported: %v", metricType) +} + +func GetHistogramValue(metric *dto.Metric) (*HistogramMetricValue, error) { + histogram := &HistogramMetricValue{ + Buckets: make(map[string]float64), + } + histogramMetric := metric.GetHistogram() + if histogramMetric == nil { + return nil, fmt.Errorf("Histogram metric not found") + } + + histogram.Sum = histogramMetric.GetSampleSum() + histogram.Count = float64(histogramMetric.GetSampleCount()) + for _, bucket := range histogramMetric.GetBucket() { + bound := fmt.Sprintf("%f", bucket.GetUpperBound()) + histogram.Buckets[bound] = float64(bucket.GetCumulativeCount()) + } + return histogram, nil +} From 21ec6c70c5e198efe8f4207401e97e03e7629652 Mon Sep 17 00:00:00 2001 From: brosoul Date: Sun, 1 Dec 2024 19:40:50 +0800 Subject: [PATCH 4/6] fix: pod is not ready to skip update metrics --- pkg/cache/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 2ef5fc3e..d55ed7d7 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -541,7 +541,7 @@ func (c *Cache) updatePodMetrics() { for _, pod := range c.Pods { // Only scrape healthy Pod - if pod.Status.PodIP == "" || utils.IsPodTerminating(pod) || utils.IsPodReady(pod) { + if pod.Status.PodIP == "" || utils.IsPodTerminating(pod) || !utils.IsPodReady(pod) { continue } podName := pod.Name From 04d4ef4155d19a9dddd5c89902f846479152616f Mon Sep 17 00:00:00 2001 From: brosoul Date: Sun, 1 Dec 2024 21:34:13 +0800 Subject: [PATCH 5/6] fix --- pkg/cache/cache.go | 89 +++++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 45 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index d55ed7d7..b4afb6c9 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -535,6 +535,30 @@ func (c *Cache) GetPodModelMetric(podName, modelName string, metricName string) return metricVal, nil } +// Update `PodMetrics` and `PodModelMetrics` according to the metric scope +func (c *Cache) updatePodRecord(podName string, modelName string, metricName string, scope metrics.MetricScope, metricValue metrics.MetricValue) error { + c.mu.RLock() + defer c.mu.RUnlock() + + if scope == metrics.PodMetricScope { + if modelName != "" { + return fmt.Errorf("modelName should be empty for scope %v", scope) + } + c.PodMetrics[podName][metricName] = metricValue + } else if scope == metrics.PodModelMetricScope { + if modelName == "" { + return fmt.Errorf("modelName should not be empty for scope %v", scope) + } + if len(c.PodModelMetrics[podName][modelName]) == 0 { + c.PodModelMetrics[podName][modelName] = map[string]metrics.MetricValue{} + } + c.PodModelMetrics[podName][modelName][metricName] = metricValue + } else { + return fmt.Errorf("scope %v is not supported", scope) + } + return nil +} + func (c *Cache) updatePodMetrics() { c.mu.Lock() defer c.mu.Unlock() @@ -576,45 +600,32 @@ func (c *Cache) updatePodMetrics() { for _, metricName := range counterGaugeMetricNames { metric, exists := metrics.Metrics[metricName] if !exists { - klog.Errorf("failed to parse metrics: Metrics not contains %s", metricName) + klog.Warningf("Cannot find %v in the metric list", metricName) continue } // TODO: we should refact metricName to fit other engine metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", metricName)] if !exists { - klog.Errorf("failed to parse metrics from pod %s %s %d: Pod metrics not contains %s", pod.Name, pod.Status.PodIP, podPort, metricName) + klog.Warningf("Cannot find %v in the pod metrics", metricName) continue } scope := metric.MetricScope for _, familyMetric := range metricFamily.Metric { - modelName, err := metrics.GetLabelValueForKey(familyMetric, "model_name") + modelName, _ := metrics.GetLabelValueForKey(familyMetric, "model_name") metricValue, err := metrics.GetCounterGaugeValue(familyMetric, metricFamily.GetType()) if err != nil { - klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err) + klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err) continue } - if scope == metrics.PodMetricScope { - if modelName != "" { - klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err) - continue - } - c.PodMetrics[pod.Name][metricName] = &metrics.SimpleMetricValue{Value: metricValue} - } else if scope == metrics.PodModelMetricScope { - if modelName == "" { - klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err) - continue - } - if len(c.PodModelMetrics[podName][modelName]) == 0 { - c.PodModelMetrics[podName][modelName] = map[string]metrics.MetricValue{} - } - c.PodModelMetrics[pod.Name][modelName][metricName] = &metrics.SimpleMetricValue{Value: metricValue} - } else { - klog.Errorf("failed to parse metrics %s from pod %s %s %d: Scope %v not supported", metricName, pod.Name, pod.Status.PodIP, podPort, scope) + err = c.updatePodRecord(podName, modelName, metricName, scope, &metrics.SimpleMetricValue{Value: metricValue}) + if err != nil { + klog.Errorf("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err) continue } + klog.V(5).InfoS("Successfully parsed metrics", "metric", metricName, "model", modelName, "PodIP", pod.Status.PodIP, "Port", podPort, "metricValue", metricValue) } } @@ -623,18 +634,18 @@ func (c *Cache) updatePodMetrics() { for _, metricName := range histogramMetricNames { metric, exists := metrics.Metrics[metricName] if !exists { - klog.Errorf("failed to parse metrics: Metrics not contains %s", metricName) + klog.Warningf("Cannot find %v in the metric list", metricName) continue } metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", metricName)] if !exists { - klog.Errorf("failed to parse metrics from pod %s %s %d: Pod metrics not contains %s", pod.Name, pod.Status.PodIP, podPort, metricName) + klog.Warningf("Cannot find %v in the pod metrics", metricName) continue } scope := metric.MetricScope for _, familyMetric := range metricFamily.Metric { - modelName, err := metrics.GetLabelValueForKey(familyMetric, "model_name") + modelName, _ := metrics.GetLabelValueForKey(familyMetric, "model_name") metricValue, err := metrics.GetHistogramValue(familyMetric) if err != nil { klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err) @@ -646,26 +657,12 @@ func (c *Cache) updatePodMetrics() { Count: metricValue.Count, Buckets: metricValue.Buckets, } - - if scope == metrics.PodMetricScope { - if modelName != "" { - klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err) - continue - } - c.PodMetrics[pod.Name][metricName] = histogramValue - } else if scope == metrics.PodModelMetricScope { - if modelName == "" { - klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err) - continue - } - if len(c.PodModelMetrics[podName][modelName]) == 0 { - c.PodModelMetrics[podName][modelName] = map[string]metrics.MetricValue{} - } - c.PodModelMetrics[pod.Name][modelName][metricName] = histogramValue - } else { - klog.Errorf("failed to parse metrics %s from pod %s %s %d: Scope %v not supported", metricName, pod.Name, pod.Status.PodIP, podPort, scope) + err = c.updatePodRecord(podName, modelName, metricName, scope, histogramValue) + if err != nil { + klog.Errorf("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err) continue } + klog.V(5).InfoS("Successfully parsed metrics", "metric", metricName, "model", modelName, "PodIP", pod.Status.PodIP, "Port", podPort, "metricValue", metricValue) } @@ -688,6 +685,7 @@ func (c *Cache) updatePodMetrics() { klog.Warningf("Cannot find %v in the metric list", metricName) continue } + scope := metric.MetricScope query := metrics.BuildQuery(metric.PromQL, queryLabels) // Querying metrics result, warnings, err := c.prometheusApi.Query(context.Background(), query, time.Now()) @@ -702,10 +700,11 @@ func (c *Cache) updatePodMetrics() { klog.Infof("Query Result:%v\n", result) // Update metrics - if len(c.PodModelMetrics[podName][modelName]) == 0 { - c.PodModelMetrics[podName][modelName] = map[string]metrics.MetricValue{} + err = c.updatePodRecord(podName, modelName, metricName, scope, &metrics.PrometheusMetricValue{Result: &result}) + if err != nil { + klog.Errorf("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err) + continue } - c.PodModelMetrics[pod.Name][modelName][metricName] = &metrics.PrometheusMetricValue{Result: &result} } } } From e98d1a742febdddeb52e75012d03582020b28ff0 Mon Sep 17 00:00:00 2001 From: brosoul Date: Sun, 1 Dec 2024 21:58:55 +0800 Subject: [PATCH 6/6] fix --- pkg/cache/cache.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index b4afb6c9..9423a052 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -537,9 +537,6 @@ func (c *Cache) GetPodModelMetric(podName, modelName string, metricName string) // Update `PodMetrics` and `PodModelMetrics` according to the metric scope func (c *Cache) updatePodRecord(podName string, modelName string, metricName string, scope metrics.MetricScope, metricValue metrics.MetricValue) error { - c.mu.RLock() - defer c.mu.RUnlock() - if scope == metrics.PodMetricScope { if modelName != "" { return fmt.Errorf("modelName should be empty for scope %v", scope) @@ -578,6 +575,7 @@ func (c *Cache) updatePodMetrics() { // We should use the primary container port. In the future, we can decide whether to use sidecar container's port url := fmt.Sprintf("http://%s:%d/metrics", pod.Status.PodIP, podPort) + resp, err := http.Get(url) if err != nil { klog.Errorf("failed to fetch metrics from pod %s %s %d: %v", pod.Name, pod.Status.PodIP, podPort, err) @@ -700,11 +698,13 @@ func (c *Cache) updatePodMetrics() { klog.Infof("Query Result:%v\n", result) // Update metrics - err = c.updatePodRecord(podName, modelName, metricName, scope, &metrics.PrometheusMetricValue{Result: &result}) + metricValue := &metrics.PrometheusMetricValue{Result: &result} + err = c.updatePodRecord(podName, modelName, metricName, scope, metricValue) if err != nil { - klog.Errorf("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err) + klog.Errorf("Failed to update metrics %s from prometheus %s: %v", metricName, podName, err) continue } + klog.V(5).InfoS("Successfully parsed metrics from prometheus", "metric", metricName, "model", modelName, "PodIP", pod.Status.PodIP, "Port", podPort, "metricValue", metricValue) } } }