Skip to content

Commit

Permalink
Update routers to use GetPodModelMetric api and misc cleanup in metri… (
Browse files Browse the repository at this point in the history
#590)

* Update routers to use GetPodModelMetric api and misc cleanup in metrics collection in cache

* nit namespace name fix in httproute delete

* nit
  • Loading branch information
varungup90 authored Jan 23, 2025
1 parent 94bd959 commit 58d3781
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 74 deletions.
4 changes: 2 additions & 2 deletions config/gateway/gateway-plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ spec:
value: aibrix-redis-master
- name: REDIS_PORT
value: "6379"
# - name: ROUTING_ALGORITHM
# value: "least-request" # random, least-request, throughput
- name: AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS
value: "10000" # 10 seconds, only local testing
- name: POD_NAME
valueFrom:
fieldRef:
Expand Down
21 changes: 12 additions & 9 deletions development/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
# Global storage for overridden values
overrides = {}

MODEL_NAME = os.getenv('MODEL_NAME', 'llama2-70b')
DEPLOYMENT_NAME = os.getenv('DEPLOYMENT_NAME', 'llama2-70b')
MODEL_NAME = os.getenv('MODEL_NAME', 'llama2-7b')
DEPLOYMENT_NAME = os.getenv('DEPLOYMENT_NAME', 'llama2-7b')
NAMESPACE = os.getenv('POD_NAMESPACE', 'default')
DEFAULT_REPLICAS = int(os.getenv('DEFAULT_REPLICAS', '1'))
SIMULATION = os.getenv('SIMULATION', 'disabled')
Expand Down Expand Up @@ -566,16 +566,19 @@ def metrics():
]

# Generate all metrics
metrics_output = """
# HELP vllm:lora_requests_info Running stats on lora requests.
# TYPE vllm:lora_requests_info gauge
vllm:lora_requests_info{max_lora="1",running_lora_adapters="text2sql-lora-1",waiting_lora_adapters=""} 1
"""
metrics_output = ""
for metric in simple_metrics:
metrics_output += generate_counter_gauge_metric(metric["name"], metric["type"], metric["description"],
model_name, metric["value"])
metrics_output += generate_counter_gauge_metric(metric["name"], metric["type"], metric["description"],
"lora-1", metric["value"], help_header=False)
"text2sql-lora-2", metric["value"], help_header=False)


metrics_output += """
# HELP vllm:lora_requests_info Running stats on lora requests.
# TYPE vllm:lora_requests_info gauge
vllm:lora_requests_info{max_lora="1",running_lora_adapters="text2sql-lora-2",waiting_lora_adapters=""} 1
"""

histogram_metrics = [
{
Expand Down Expand Up @@ -663,7 +666,7 @@ def metrics():
histogram_metrics_output += generate_histogram_metric(
metric_name=metric["name"],
description=metric["description"],
model_name="lora-1",
model_name="text2sql-lora-2",
buckets=metric["buckets"],
new_requests=new_requests,
help_header=False
Expand Down
95 changes: 51 additions & 44 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,17 @@ var (
)

func getPodMetricRefreshInterval() time.Duration {
value, exists := os.LookupEnv("AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS")
if exists {
value := LoadEnv("AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS", "")
if value != "" {
intValue, err := strconv.Atoi(value)
if err != nil {
klog.V(4).Infof("Invalid AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS: %s, falling back to default", value)
klog.Infof("invalid AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS: %s, falling back to default", value)
} else {
klog.V(4).Infof("Using env value for refresh interval: %d ms", intValue)
return time.Duration(intValue)
klog.Infof("using AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS env value for pod metrics refresh interval: %d ms", intValue)
return time.Duration(intValue) * time.Millisecond
}
}
klog.V(4).Infof("Using default refresh interval: %d ms", defaultPodMetricRefreshIntervalInMS)
klog.Infof("using default refresh interval: %d ms", defaultPodMetricRefreshIntervalInMS)
return defaultPodMetricRefreshIntervalInMS * time.Millisecond
}

Expand All @@ -151,7 +151,7 @@ func GetCache() (*Cache, error) {
func LoadEnv(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
klog.Warningf("Environment variable %s is not set, using default value: %s", key, defaultValue)
klog.Warningf("environment variable %s is not set, using default value: %s", key, defaultValue)
return defaultValue
}
return value
Expand Down Expand Up @@ -240,6 +240,7 @@ func NewCache(config *rest.Config, stopCh <-chan struct{}, redisClient *redis.Cl
case <-ticker.C:
instance.updatePodMetrics()
instance.updateModelMetrics()
instance.debugInfo()
case <-stopCh:
ticker.Stop()
return
Expand Down Expand Up @@ -272,7 +273,7 @@ func NewCache(config *rest.Config, stopCh <-chan struct{}, redisClient *redis.Cl
// Start ticker
traceTicker = time.NewTicker(RequestTraceWriteInterval)
}
klog.Infof("Trace ticker start at %s", time.Now())
klog.Infof("trace ticker start at %s", time.Now())
for {
select {
case <-traceTicker.C:
Expand Down Expand Up @@ -487,6 +488,13 @@ func (c *Cache) debugInfoLocked() {
klog.V(5).Infof("%v_%v_%v", podName, metricName, metricVal)
}
}
for podName, models := range c.PodModelMetrics {
for modelName, metrics := range models {
for metricName, metricVal := range metrics {
klog.V(5).Infof("%v_%v_%v_%v", podName, modelName, metricName, metricVal)
}
}
}
}

func (c *Cache) GetPod(podName string) (*v1.Pod, error) {
Expand Down Expand Up @@ -581,7 +589,8 @@ func (c *Cache) GetPodModelMetric(podName, modelName string, metricName string)
}

// Update `PodMetrics` and `PodModelMetrics` according to the metric scope
func (c *Cache) updatePodRecord(podName string, modelName string, metricName string, scope metrics.MetricScope, metricValue metrics.MetricValue) error {
// TODO: replace in-place metric update podMetrics and podModelMetrics to fresh copy for preventing stale metric keys
func (c *Cache) updatePodRecordLocked(podName string, modelName string, metricName string, scope metrics.MetricScope, metricValue metrics.MetricValue) error {
if scope == metrics.PodMetricScope {
if modelName != "" {
return fmt.Errorf("modelName should be empty for scope %v", scope)
Expand All @@ -601,7 +610,7 @@ func (c *Cache) updatePodRecord(podName string, modelName string, metricName str
return nil
}

func (c *Cache) queryUpdatePromQLMetrics(metric metrics.Metric, queryLabels map[string]string, podName string, modelName string, metricName string) error {
func (c *Cache) queryUpdatePromQLMetricsLocked(metric metrics.Metric, queryLabels map[string]string, podName string, modelName string, metricName string) error {
scope := metric.MetricScope
query := metrics.BuildQuery(metric.PromQL, queryLabels)
// Querying metrics
Expand All @@ -611,33 +620,32 @@ func (c *Cache) queryUpdatePromQLMetrics(metric metrics.Metric, queryLabels map[
return fmt.Errorf("error executing query: %v", err)
}
if len(warnings) > 0 {
klog.Warningf("Warnings: %v\n", warnings)
klog.V(4).Infof("Warnings: %v\n", warnings)
}

klog.Infof("Query Result:%v\n", result)
// Update metrics
metricValue := &metrics.PrometheusMetricValue{Result: &result}
err = c.updatePodRecord(podName, modelName, metricName, scope, metricValue)
err = c.updatePodRecordLocked(podName, modelName, metricName, scope, metricValue)
if err != nil {
return fmt.Errorf("failed to update metrics %s from prometheus %s: %v", metricName, podName, err)
}
klog.V(5).InfoS("Successfully parsed metrics from prometheus", "metric", metricName, "model", modelName, "PodName", podName, "Port", podPort, "metricValue", metricValue)
return nil
}

func (c *Cache) updateSimpleMetricFromRawMetrics(pod *v1.Pod, allMetrics map[string]*dto.MetricFamily) {
func (c *Cache) updateSimpleMetricFromRawMetricsLocked(pod *v1.Pod, allMetrics map[string]*dto.MetricFamily) {
podName := pod.Name
for _, metricName := range counterGaugeMetricNames {
metric, exists := metrics.Metrics[metricName]
if !exists {
klog.Warningf("Cannot find %v in the metric list", metricName)
klog.V(4).Infof("Cannot find %v in the metric list", metricName)
continue
}

// TODO: we should refact metricName to fit other engine
metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", metricName)]
if !exists {
klog.Warningf("Cannot find %v in the pod metrics", metricName)
klog.V(4).Infof("Cannot find %v in the pod metrics", metricName)
continue
}
scope := metric.MetricScope
Expand All @@ -646,13 +654,13 @@ func (c *Cache) updateSimpleMetricFromRawMetrics(pod *v1.Pod, allMetrics map[str

metricValue, err := metrics.GetCounterGaugeValue(familyMetric, metricFamily.GetType())
if err != nil {
klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
klog.V(4).Infof("failed to parse metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
continue
}

err = c.updatePodRecord(podName, modelName, metricName, scope, &metrics.SimpleMetricValue{Value: metricValue})
err = c.updatePodRecordLocked(podName, modelName, metricName, scope, &metrics.SimpleMetricValue{Value: metricValue})
if err != nil {
klog.Errorf("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
klog.V(4).Infof("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
continue
}

Expand All @@ -661,26 +669,26 @@ func (c *Cache) updateSimpleMetricFromRawMetrics(pod *v1.Pod, allMetrics map[str
}
}

func (c *Cache) updateHistogramMetricFromRawMetrics(pod *v1.Pod, allMetrics map[string]*dto.MetricFamily) {
func (c *Cache) updateHistogramMetricFromRawMetricsLocked(pod *v1.Pod, allMetrics map[string]*dto.MetricFamily) {
podName := pod.Name
for _, metricName := range histogramMetricNames {
metric, exists := metrics.Metrics[metricName]
if !exists {
klog.Warningf("Cannot find %v in the metric list", metricName)
klog.V(4).Infof("Cannot find %v in the metric list", metricName)
continue
}

metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", metricName)]
if !exists {
klog.Warningf("Cannot find %v in the pod metrics", metricName)
klog.V(4).Infof("Cannot find %v in the pod metrics", metricName)
continue
}
scope := metric.MetricScope
for _, familyMetric := range metricFamily.Metric {
modelName, _ := metrics.GetLabelValueForKey(familyMetric, "model_name")
metricValue, err := metrics.GetHistogramValue(familyMetric)
if err != nil {
klog.Errorf("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err)
klog.V(4).Infof("failed to parse metrics %s from pod %s %s %d: %v", metricName, pod.Name, pod.Status.PodIP, podPort, err)
continue
}

Expand All @@ -689,9 +697,9 @@ func (c *Cache) updateHistogramMetricFromRawMetrics(pod *v1.Pod, allMetrics map[
Count: metricValue.Count,
Buckets: metricValue.Buckets,
}
err = c.updatePodRecord(podName, modelName, metricName, scope, histogramValue)
err = c.updatePodRecordLocked(podName, modelName, metricName, scope, histogramValue)
if err != nil {
klog.Errorf("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
klog.V(4).Infof("Failed to update metrics %s from pod %s %s %d: %v", metricName, podName, pod.Status.PodIP, podPort, err)
continue
}

Expand All @@ -701,28 +709,28 @@ func (c *Cache) updateHistogramMetricFromRawMetrics(pod *v1.Pod, allMetrics map[
}
}

func (c *Cache) updateQueryLabelMetricFromRawMetrics(pod *v1.Pod, allMetrics map[string]*dto.MetricFamily) {
func (c *Cache) updateQueryLabelMetricFromRawMetricsLocked(pod *v1.Pod, allMetrics map[string]*dto.MetricFamily) {
podName := pod.Name

for _, labelMetricName := range labelQueryMetricNames {
metric, exists := metrics.Metrics[labelMetricName]
if !exists {
klog.Warningf("Cannot find %v in the metric list", labelMetricName)
klog.V(4).Infof("Cannot find %v in the metric list", labelMetricName)
continue
}
rawMetricName := metric.RawMetricName
scope := metric.MetricScope
metricFamily, exists := allMetrics[fmt.Sprintf("vllm:%s", rawMetricName)]
if !exists {
klog.Warningf("Cannot find %v in the pod metrics", rawMetricName)
klog.V(4).Infof("Cannot find %v in the pod metrics", rawMetricName)
continue
}
for _, familyMetric := range metricFamily.Metric {
modelName, _ := metrics.GetLabelValueForKey(familyMetric, "model_name")
labelValue, _ := metrics.GetLabelValueForKey(familyMetric, labelMetricName)
err := c.updatePodRecord(podName, modelName, labelMetricName, scope, &metrics.LabelValueMetricValue{Value: labelValue})
err := c.updatePodRecordLocked(podName, modelName, labelMetricName, scope, &metrics.LabelValueMetricValue{Value: labelValue})
if err != nil {
klog.Errorf("Failed to update metrics %s from pod %s %s %d: %v", labelMetricName, podName, pod.Status.PodIP, podPort, err)
klog.V(4).Infof("Failed to update metrics %s from pod %s %s %d: %v", labelMetricName, podName, pod.Status.PodIP, podPort, err)
continue
}

Expand All @@ -731,7 +739,7 @@ func (c *Cache) updateQueryLabelMetricFromRawMetrics(pod *v1.Pod, allMetrics map
}
}

func (c *Cache) updateMetricFromPromQL(pod *v1.Pod) {
func (c *Cache) updateMetricFromPromQLLocked(pod *v1.Pod) {
podName := pod.Name

for _, metricName := range prometheusMetricNames {
Expand All @@ -740,31 +748,31 @@ func (c *Cache) updateMetricFromPromQL(pod *v1.Pod) {
}
metric, ok := metrics.Metrics[metricName]
if !ok {
klog.Warningf("Cannot find %v in the metric list", metricName)
klog.V(4).Infof("Cannot find %v in the metric list", metricName)
continue
}
scope := metric.MetricScope
if scope == metrics.PodMetricScope {
err := c.queryUpdatePromQLMetrics(metric, queryLabels, podName, "", metricName)
err := c.queryUpdatePromQLMetricsLocked(metric, queryLabels, podName, "", metricName)
if err != nil {
klog.Errorf("Failed to query and update PromQL metrics: %v", err)
klog.V(4).Infof("Failed to query and update PromQL metrics: %v", err)
continue
}
} else if scope == metrics.PodModelMetricScope {
if modelNames, ok := c.PodToModelMapping[podName]; ok {
for modelName := range modelNames {
queryLabels["model_name"] = modelName
err := c.queryUpdatePromQLMetrics(metric, queryLabels, podName, modelName, metricName)
err := c.queryUpdatePromQLMetricsLocked(metric, queryLabels, podName, modelName, metricName)
if err != nil {
klog.Errorf("Failed to query and update PromQL metrics: %v", err)
klog.V(4).Infof("Failed to query and update PromQL metrics: %v", err)
continue
}
}
} else {
klog.Warningf("Cannot find model names for pod %s", podName)
klog.V(4).Infof("Cannot find model names for pod %s", podName)
}
} else {
klog.Warningf("Scope %v is not supported", scope)
klog.V(4).Infof("Scope %v is not supported", scope)
}
}
}
Expand All @@ -791,25 +799,24 @@ func (c *Cache) updatePodMetrics() {
url := fmt.Sprintf("http://%s:%d/metrics", pod.Status.PodIP, podPort)
allMetrics, err := metrics.ParseMetricsURL(url)
if err != nil {
klog.Warningf("Error parsing metric families: %v\n", err)
klog.V(4).Infof("Error parsing metric families: %v\n", err)
}

// parse counterGaugeMetricsNames
c.updateSimpleMetricFromRawMetrics(pod, allMetrics)
c.updateSimpleMetricFromRawMetricsLocked(pod, allMetrics)

// parse histogramMetrics
c.updateHistogramMetricFromRawMetrics(pod, allMetrics)
c.updateHistogramMetricFromRawMetricsLocked(pod, allMetrics)

// parse QueryLabel metrics
c.updateQueryLabelMetricFromRawMetrics(pod, allMetrics)
c.updateQueryLabelMetricFromRawMetricsLocked(pod, allMetrics)

if c.prometheusApi == nil {
klog.V(4).InfoS("Prometheus api is not initialized, PROMETHEUS_ENDPOINT is not configured, skip fetching prometheus metrics")
continue
}
// parse prometheus metrics
c.updateMetricFromPromQL(pod)

c.updateMetricFromPromQLLocked(pod)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/modeladapter/modeladapter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func (r *ModelAdapterReconciler) getActivePodsForModelAdapter(ctx context.Contex
func (r *ModelAdapterReconciler) schedulePod(ctx context.Context, instance *modelv1alpha1.ModelAdapter, activePods []corev1.Pod) (*corev1.Pod, error) {
// Implement your scheduling logic here to select a Pod based on the instance.Spec.PodSelector
// For the sake of example, we will just list the Pods matching the selector and pick the first one
return r.scheduler.SelectPod(ctx, activePods)
return r.scheduler.SelectPod(ctx, instance.Name, activePods)
}

func (r *ModelAdapterReconciler) reconcileLoading(ctx context.Context, instance *modelv1alpha1.ModelAdapter) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/modeladapter/scheduling/bin_pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewBinPackScheduler(c *cache.Cache) Scheduler {
}
}

func (r binPackScheduler) SelectPod(ctx context.Context, pods []v1.Pod) (*v1.Pod, error) {
func (r binPackScheduler) SelectPod(ctx context.Context, model string, pods []v1.Pod) (*v1.Pod, error) {
// Binpack algorithm: choose the pod (1) can place the adapter, (2) with the least remaining space

selectedPod := v1.Pod{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/modeladapter/scheduling/least_adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewLeastAdapters(c *cache.Cache) Scheduler {
}
}

func (r leastAdapters) SelectPod(ctx context.Context, pods []v1.Pod) (*v1.Pod, error) {
func (r leastAdapters) SelectPod(ctx context.Context, model string, pods []v1.Pod) (*v1.Pod, error) {
selectedPod := v1.Pod{}
modelAdapterCountMin := math.MaxInt

Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/modeladapter/scheduling/least_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ func NewLeastLatencyScheduler(c *cache.Cache) Scheduler {
}
}

func (r leastLatencyScheduler) SelectPod(ctx context.Context, pods []v1.Pod) (*v1.Pod, error) {
func (r leastLatencyScheduler) SelectPod(ctx context.Context, model string, pods []v1.Pod) (*v1.Pod, error) {
selectedPod := v1.Pod{}
podLatencyMin := math.MaxFloat64

for _, pod := range pods {
queueTime, err := r.cache.GetPodMetric(pod.Name, metrics.RequestQueueTimeSeconds)
queueTime, err := r.cache.GetPodModelMetric(pod.Name, model, metrics.RequestQueueTimeSeconds)
if err != nil {
return nil, err
}
inferenceTime, err := r.cache.GetPodMetric(pod.Name, metrics.RequestInferenceTimeSeconds)
inferenceTime, err := r.cache.GetPodModelMetric(pod.Name, model, metrics.RequestInferenceTimeSeconds)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 58d3781

Please sign in to comment.