Skip to content

Commit

Permalink
Metric standardizing in ai runtime (#163)
Browse files Browse the repository at this point in the history
* feat: init standard metric framework

* feat: add engine metric scrape support

* format

* format

* adjust server run port

* mv constant config to config.py

* style

* fix: vllm metric standard rule
  • Loading branch information
brosoul authored Sep 13, 2024
1 parent c155c87 commit c9e1972
Show file tree
Hide file tree
Showing 10 changed files with 331 additions and 3 deletions.
3 changes: 3 additions & 0 deletions python/aibrix/aibrix/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


DEFAULT_METRIC_COLLECTOR_TIMEOUT = 1
2 changes: 1 addition & 1 deletion python/aibrix/aibrix/downloader/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import boto3
from boto3.s3.transfer import TransferConfig
from botocore.config import Config, MAX_POOL_CONNECTIONS
from botocore.config import MAX_POOL_CONNECTIONS, Config
from tqdm import tqdm

from aibrix import envs
Expand Down
13 changes: 13 additions & 0 deletions python/aibrix/aibrix/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def _parse_int_or_none(value: Optional[str]) -> Optional[int]:
return int(value)


# Runtime Server Config
SERVER_PORT = int(os.getenv("SERVER_PORT", "8080"))

# Model Download Related Config

# Downloader Default Directory
Expand Down Expand Up @@ -71,3 +74,13 @@ def _parse_int_or_none(value: Optional[str]) -> Optional[int]:
DOWNLOADER_AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
DOWNLOADER_AWS_ENDPOINT = os.getenv("AWS_ENDPOINT_URL")
DOWNLOADER_AWS_REGION = os.getenv("AWS_REGION")

# Metric Standardizing Related Config
# Scrape config
METRIC_SCRAPE_HOST = os.getenv("METRIC_SCRAPE_HOST", "localhost")
METRIC_SCRAPE_PORT = int(os.getenv("METRIC_SCRAPE_PORT", "8000"))
METRIC_SCRAPE_PATH = os.getenv("METRIC_SCRAPE_PATH", "/metrics")
METRIC_SCRAPE_ENGINE = os.getenv("METRIC_SCRAPE_ENGINE", "vllm")

# Runtime Metric config
PROMETHEUS_MULTIPROC_DIR = os.getenv("PROMETHEUS_MULTIPROC_DIR", "/tmp/aibrix/metrics/")
13 changes: 13 additions & 0 deletions python/aibrix/aibrix/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 The Aibrix Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
48 changes: 48 additions & 0 deletions python/aibrix/aibrix/metrics/engine_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2024 The Aibrix Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict

from aibrix.metrics.standard_rules import RenameStandardRule, StandardRule

# Standard rule accroding to https://docs.google.com/document/d/1SpSp1E6moa4HSrJnS4x3NpLuj88sMXr2tbofKlzTZpk
VLLM_METRIC_STANDARD_RULES: Dict[str, StandardRule] = {
"vllm:request_success": RenameStandardRule(
"vllm:request_success", "aibrix:request_success"
),
"vllm:num_requests_waiting": RenameStandardRule(
"vllm:num_requests_waiting", "aibrix:queue_size"
),
"vllm:time_to_first_token_seconds": RenameStandardRule(
"vllm:time_to_first_token_seconds", "aibrix:time_to_first_token_seconds"
),
"vllm:gpu_cache_usage_perc": RenameStandardRule(
"vllm:gpu_cache_usage_perc", "aibrix:gpu_cache_usage_perc"
),
"vllm:time_per_output_token_seconds": RenameStandardRule(
"vllm:time_per_output_token_seconds", "aibrix:time_per_output_token"
),
"vllm:e2e_request_latency_seconds": RenameStandardRule(
"vllm:e2e_request_latency_seconds", "aibrix:e2e_request_latency"
),
}

# TODO add more engine standard rules


def get_metric_standard_rules(engine: str) -> Dict[str, StandardRule]:
if engine == "vllm":
return VLLM_METRIC_STANDARD_RULES
else:
raise ValueError(f"Engine {engine} is not supported.")
70 changes: 70 additions & 0 deletions python/aibrix/aibrix/metrics/http_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2024 The Aibrix Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict

import requests
from prometheus_client.parser import text_string_to_metric_families
from prometheus_client.registry import Collector

from aibrix.config import DEFAULT_METRIC_COLLECTOR_TIMEOUT
from aibrix.logger import init_logger
from aibrix.metrics.standard_rules import StandardRule

logger = init_logger(__name__)


class HTTPCollector(Collector):
def __init__(
self,
endpoint: str,
metrics_rules: Dict[str, StandardRule],
keep_original_metric: bool = True,
timeout=DEFAULT_METRIC_COLLECTOR_TIMEOUT,
):
self.metric_endpoint = endpoint
self.metrics_rules = metrics_rules
self.keep_original_metric = keep_original_metric

self.timeout = timeout
self.session = requests.Session()

def _collect(self):
try:
response = self.session.get(self.metric_endpoint, timeout=self.timeout)
if response.status_code != 200:
logger.warning(
f"Failed to collect metrics from {self.metric_endpoint} "
f"with status code {response.status_code}, "
f"response: {response.text}"
)
return ""
return response.text
except Exception as e:
logger.warning(
f"Failed to collect metrics from {self.metric_endpoint}: {e}"
)
return ""

def collect(self):
metrics_text = self._collect()
for m in text_string_to_metric_families(metrics_text):
if self.keep_original_metric:
yield m

# metric standardizing rule matched
if m.name in self.metrics_rules:
new_metric = self.metrics_rules[m.name](m)
if new_metric is not None:
yield from new_metric
58 changes: 58 additions & 0 deletions python/aibrix/aibrix/metrics/standard_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2024 The Aibrix Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from abc import abstractmethod
from typing import Iterable

from prometheus_client import Metric
from prometheus_client.samples import Sample


class StandardRule:
def __init__(self, rule_type):
self.rule_type = rule_type

@abstractmethod
def __call__(self, metric: Metric) -> Iterable[Metric]:
pass


class RenameStandardRule(StandardRule):
def __init__(self, original_name, new_name):
super().__init__("RENAME")
self.original_name = original_name
self.new_name = new_name

def __call__(self, metric: Metric) -> Iterable[Metric]:
assert (
metric.name == self.original_name
), f"Metric name {metric.name} does not match Rule original name {self.original_name}"
metric.name = self.new_name

# rename all the samples
_samples = []
for s in metric.samples:
s_name = self.new_name + s.name[len(self.original_name) :]
_samples.append(
Sample(
s_name,
s.labels,
s.value,
s.timestamp,
s.exemplar,
)
)
metric.samples = _samples
yield metric
70 changes: 69 additions & 1 deletion python/aibrix/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,71 @@
import os
import shutil
from pathlib import Path

import uvicorn
from fastapi import FastAPI
from prometheus_client import CollectorRegistry, make_asgi_app, multiprocess
from starlette.routing import Mount

from aibrix import envs
from aibrix.logger import init_logger
from aibrix.metrics.engine_rules import get_metric_standard_rules
from aibrix.metrics.http_collector import HTTPCollector

logger = init_logger(__name__)


def initial_prometheus_multiproc_dir():
if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
prometheus_multiproc_dir = envs.PROMETHEUS_MULTIPROC_DIR
else:
prometheus_multiproc_dir = os.environ["PROMETHEUS_MULTIPROC_DIR"]

# Note: ensure it will be automatically cleaned up upon exit.
path = Path(prometheus_multiproc_dir)
path.mkdir(parents=True, exist_ok=True)
if path.is_dir():
for item in path.iterdir():
if item.is_dir():
shutil.rmtree(item)
else:
item.unlink()
os.environ["PROMETHEUS_MULTIPROC_DIR"] = envs.PROMETHEUS_MULTIPROC_DIR


def mount_metrics(app: FastAPI):
# setup multiprocess collector
initial_prometheus_multiproc_dir()
prometheus_multiproc_dir_path = os.environ["PROMETHEUS_MULTIPROC_DIR"]
logger.info(
f"AIBrix to use {prometheus_multiproc_dir_path} as PROMETHEUS_MULTIPROC_DIR"
)
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)

# construct scrape metric config
engine = envs.METRIC_SCRAPE_ENGINE
scrape_host = envs.METRIC_SCRAPE_HOST
scrape_port = envs.METRIC_SCRAPE_PORT
scrape_path = envs.METRIC_SCRAPE_PATH
scrape_endpoint = f"http://{scrape_host}:{scrape_port}{scrape_path}"
collector = HTTPCollector(scrape_endpoint, get_metric_standard_rules(engine))
registry.register(collector)
logger.info(
f"AIBrix to scrape metrics from {scrape_endpoint}, use {engine} standard rules"
)

# Add prometheus asgi middleware to route /metrics requests
metrics_route = Mount("/metrics", make_asgi_app(registry=registry))

app.routes.append(metrics_route)


def build_app():
app = FastAPI(debug=False)
mount_metrics(app)
return app


app = FastAPI(debug=False)
app = build_app()
uvicorn.run(app, port=envs.SERVER_PORT)
55 changes: 54 additions & 1 deletion python/aibrix/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions python/aibrix/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ boto3 = "^1.35.5"
fastapi = "^0.112.2"
gunicorn = "^23.0.0"
uvicorn = "^0.30.6"
prometheus-client = "^0.20.0"
types-requests = "^2.31.0"


[tool.poetry.group.dev.dependencies]
Expand Down

0 comments on commit c9e1972

Please sign in to comment.