From ff7014b0c1a79672978d5b0a23af6c5ae1158b3b Mon Sep 17 00:00:00 2001 From: Alexa Griffith Date: Fri, 23 Sep 2022 15:56:19 -0400 Subject: [PATCH] add prometheus metrics python sdk (#2425) * adding prom metrics in python sdk custom transformer, tests, and documentation Signed-off-by: alexagriffith Signed-off-by: alexagriffith Signed-off-by: alexagriffith * lint Signed-off-by: alexagriffith Signed-off-by: alexagriffith Signed-off-by: alexagriffith Signed-off-by: alexagriffith --- .gitignore | 1 + .../torchserve_image_transformer/README.md | 28 ++++++++++ .../image_transformer/__main__.py | 9 ++- .../image_transformer/image_transformer.py | 7 ++- .../local_config.properties | 1 + python/kserve/Makefile | 2 +- python/kserve/README.md | 13 +++++ python/kserve/kserve/model.py | 56 ++++++++++++++++--- python/kserve/kserve/model_server.py | 33 ++++++++++- python/kserve/requirements.txt | 1 + python/kserve/test/test_server.py | 22 ++++++++ 11 files changed, 159 insertions(+), 14 deletions(-) create mode 100644 docs/samples/v1beta1/transformer/torchserve_image_transformer/local_config.properties diff --git a/.gitignore b/.gitignore index f1f432da408..f63b6d53fd4 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ *.so *.dylib *.pyc +*.egg-info/ bin # Test binary, build with `go test -c` diff --git a/docs/samples/v1beta1/transformer/torchserve_image_transformer/README.md b/docs/samples/v1beta1/transformer/torchserve_image_transformer/README.md index 4919d47f6ee..6702c8ecec9 100644 --- a/docs/samples/v1beta1/transformer/torchserve_image_transformer/README.md +++ b/docs/samples/v1beta1/transformer/torchserve_image_transformer/README.md @@ -142,3 +142,31 @@ Handling connection for 8080 * Connection #0 to host localhost left intact {"predictions": [2]} ``` + + +## Testing with a local transformer & predictor in Kubernetes (Kserve developer guide) + +If you are making changes to the code and would like to test running a transformer on your local machine with the predictor running in Kubernetes, + +(1) Follow the above steps to deploy the transformer.yaml to Kubernetes. You can comment out the transformer section in the YAML if you wish to only deploy the predictor to Kubernetes. + +(2) Run `pip install -e .` in this directory. If you are using a local version of Kserve, you should also run this command in the appropriate directory with a setup.py file so that you pick up any changes from there. + +(3) Port-forward the predictor pod by running +``` +kubectl port-forward pods/{name-of-predictor-pod} 8081:8080 +``` +Since the predictor pod will expose 8080, pick another port to use with localhost. Here, 8081 is used. + +(4) Use `localhost:8081` as the {predictor-url} to run the following command from this directory. Pass in the `-config_path="local_config.properties"` to use a local config file. If you deploy the transformer to Kubernetes, it will pull the file from GCS. + +```bash +python3 -m image_transformer --predictor_host={predictor-url} --workers=1 --config_path="local_config.properties" +``` + +Now your service is running and you can send a request via localhost! + +```bash +>> curl localhost:8080/v1/models/mnist:predict --data @./input.json +{"predictions": [2]} +``` diff --git a/docs/samples/v1beta1/transformer/torchserve_image_transformer/image_transformer/__main__.py b/docs/samples/v1beta1/transformer/torchserve_image_transformer/image_transformer/__main__.py index a3a8104a4aa..3049f26a5cf 100644 --- a/docs/samples/v1beta1/transformer/torchserve_image_transformer/image_transformer/__main__.py +++ b/docs/samples/v1beta1/transformer/torchserve_image_transformer/image_transformer/__main__.py @@ -18,22 +18,25 @@ from .transformer_model_repository import TransformerModelRepository DEFAULT_MODEL_NAME = "model" +CONFIG_PATH = "/mnt/models/config/config.properties" parser = argparse.ArgumentParser(parents=[kserve.model_server.parser]) parser.add_argument( "--predictor_host", help="The URL for the model predict function", required=True ) +parser.add_argument( + "--config_path", help="The path to the config file containing the list of model names", + required=False, default=CONFIG_PATH +) args, _ = parser.parse_known_args() -CONFIG_PATH = "/mnt/models/config/config.properties" - def parse_config(): separator = "=" keys = {} - with open(CONFIG_PATH) as f: + with open(args.config_path) as f: for line in f: if separator in line: diff --git a/docs/samples/v1beta1/transformer/torchserve_image_transformer/image_transformer/image_transformer.py b/docs/samples/v1beta1/transformer/torchserve_image_transformer/image_transformer/image_transformer.py index 0398a8c3c2c..be6889d501a 100644 --- a/docs/samples/v1beta1/transformer/torchserve_image_transformer/image_transformer/image_transformer.py +++ b/docs/samples/v1beta1/transformer/torchserve_image_transformer/image_transformer/image_transformer.py @@ -64,6 +64,7 @@ def __init__(self, name: str, predictor_host: str): Args: name (str): Name of the model. predictor_host (str): The host in which the predictor runs. + log_latency (bool): Whether to log the latency metrics per request. """ super().__init__(name) self.predictor_host = predictor_host @@ -73,11 +74,12 @@ def __init__(self, name: str, predictor_host: str): logging.info("EXPLAINER URL %s", self.explainer_host) self.timeout = 100 - def preprocess(self, inputs: Dict) -> Dict: + def preprocess(self, inputs: Dict, headers: Dict[str, str] = None) -> Dict: """Pre-process activity of the Image Input data. Args: inputs (Dict): KServe http request + headers (Dict): Kserve http request headers Returns: Dict: Returns the request input after converting it into a tensor @@ -97,11 +99,12 @@ def postprocess(self, inputs: Dict) -> Dict: """ return inputs - async def explain(self, request: Dict) -> Dict: + async def explain(self, request: Dict, headers: Dict[str, str] = None) -> Dict: """Returns the captum explanations for the input request Args: request (Dict): http input request + headers (Dict): http request headers Raises: NotImplementedError: If the explainer host is not specified. diff --git a/docs/samples/v1beta1/transformer/torchserve_image_transformer/local_config.properties b/docs/samples/v1beta1/transformer/torchserve_image_transformer/local_config.properties new file mode 100644 index 00000000000..95aa03eae4a --- /dev/null +++ b/docs/samples/v1beta1/transformer/torchserve_image_transformer/local_config.properties @@ -0,0 +1 @@ +model_snapshot={"name":"startup.cfg","modelCount":1,"models":{"mnist":{"1.0":{"defaultVersion":true,"marName":"mnist.mar","minWorkers":1,"maxWorkers":5,"batchSize":1,"responseTimeout":120}}}} diff --git a/python/kserve/Makefile b/python/kserve/Makefile index ed5dd676645..bf1470d5183 100644 --- a/python/kserve/Makefile +++ b/python/kserve/Makefile @@ -5,7 +5,7 @@ dev_install: pip install -e .[test] test: - pytest -W ignore + cd ../ && pytest -W ignore kserve/test type_check: mypy --ignore-missing-imports kserve diff --git a/python/kserve/README.md b/python/kserve/README.md index 1ba412c864e..d30dc87e32d 100644 --- a/python/kserve/README.md +++ b/python/kserve/README.md @@ -54,6 +54,19 @@ It supports the following storage providers: * `https://.com/model.joblib` * `http://.com/model.joblib` +### Metrics + +For latency metrics, send a request to `/metrics`. Prometheus latency histograms are emitted for each of the steps (pre/postprocessing, explain, predict). +Additionally, the latencies of each step are logged per request. + +| Metric Name | Description | Type | +|------------------------------------|--------------------------------|-----------| +| request_preprocessing_seconds | pre-processing request latency | Histogram | +| request_explain_processing_seconds | explain request latency | Histogram | +| request_predict_processing_seconds | prediction request latency | Histogram | +| request_postprocessing_seconds | pre-processing request latency | Histogram | + + ## KServe Client ### Getting Started diff --git a/python/kserve/kserve/model.py b/python/kserve/kserve/model.py index 27491e99566..2b0fbaab0d1 100644 --- a/python/kserve/kserve/model.py +++ b/python/kserve/kserve/model.py @@ -12,11 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. + from typing import Dict, Union +import logging +import time import sys import inspect import json import tornado.web +import tornado.log from tornado.httpclient import AsyncHTTPClient from cloudevents.http import CloudEvent from http import HTTPStatus @@ -25,13 +29,20 @@ import grpc from tritonclient.grpc import InferResult, service_pb2_grpc from tritonclient.grpc.service_pb2 import ModelInferRequest, ModelInferResponse +from prometheus_client import Histogram +tornado.log.enable_pretty_logging() PREDICTOR_URL_FORMAT = "http://{0}/v1/models/{1}:predict" EXPLAINER_URL_FORMAT = "http://{0}/v1/models/{1}:explain" PREDICTOR_V2_URL_FORMAT = "http://{0}/v2/models/{1}/infer" EXPLAINER_V2_URL_FORMAT = "http://{0}/v2/models/{1}/explain" +PRE_HIST_TIME = Histogram('request_preprocessing_seconds', 'pre-processing request latency') +POST_HIST_TIME = Histogram('request_postprocessing_seconds', 'post-processing request latency') +PREDICT_HIST_TIME = Histogram('request_predict_processing_seconds', 'prediction request latency') +EXPLAIN_HIST_TIME = Histogram('request_explain_processing_seconds', 'explain request latency') + class ModelType(Enum): EXPLAINER = 1 @@ -60,6 +71,10 @@ def __str__(self): return self.reason +def get_latency_ms(start, end): + return round((end - start) * 1000, 9) + + # Model is intended to be subclassed by various components within KServe. class Model: def __init__(self, name: str): @@ -74,20 +89,47 @@ def __init__(self, name: str): self.timeout = 600 self._http_client_instance = None self._grpc_client_stub = None + self.enable_latency_logging = False async def __call__(self, body, model_type: ModelType = ModelType.PREDICTOR, headers: Dict[str, str] = None): - payload = await self.preprocess(body, headers) if inspect.iscoroutinefunction(self.preprocess) \ - else self.preprocess(body, headers) + request_id = headers.get("X-Request-Id") + + # latency vars + preprocess_ms = 0 + explain_ms = 0 + predict_ms = 0 + postprocess_ms = 0 + + with PRE_HIST_TIME.time(): + start = time.time() + payload = await self.preprocess(body, headers) if inspect.iscoroutinefunction(self.preprocess) \ + else self.preprocess(body, headers) + preprocess_ms = get_latency_ms(start, time.time()) payload = self.validate(payload) if model_type == ModelType.EXPLAINER: - response = (await self.explain(payload, headers)) if inspect.iscoroutinefunction(self.explain) \ - else self.explain(payload, headers) + with EXPLAIN_HIST_TIME.time(): + start = time.time() + response = (await self.explain(payload, headers)) if inspect.iscoroutinefunction(self.explain) \ + else self.explain(payload, headers) + explain_ms = get_latency_ms(start, time.time()) elif model_type == ModelType.PREDICTOR: - response = (await self.predict(payload, headers)) if inspect.iscoroutinefunction(self.predict) \ - else self.predict(payload, headers) + with PREDICT_HIST_TIME.time(): + start = time.time() + response = (await self.predict(payload, headers)) if inspect.iscoroutinefunction(self.predict) \ + else self.predict(payload, headers) + predict_ms = get_latency_ms(start, time.time()) else: raise NotImplementedError - response = self.postprocess(response) + + with POST_HIST_TIME.time(): + start = time.time() + response = self.postprocess(response) + postprocess_ms = get_latency_ms(start, time.time()) + + if self.enable_latency_logging is True: + logging.info(f"requestId: {request_id}, preprocess_ms: {preprocess_ms}, explain_ms: {explain_ms}, " + f"predict_ms: {predict_ms}, postprocess_ms: {postprocess_ms}") + return response @property diff --git a/python/kserve/kserve/model_server.py b/python/kserve/kserve/model_server.py index 60c2f0f2f53..a5e9bca8d5e 100644 --- a/python/kserve/kserve/model_server.py +++ b/python/kserve/kserve/model_server.py @@ -29,6 +29,9 @@ from kserve.model_repository import ModelRepository from ray.serve.api import Deployment, RayServeHandle from ray import serve +from tornado.web import RequestHandler +from prometheus_client import REGISTRY +from prometheus_client.exposition import choose_encoder DEFAULT_HTTP_PORT = 8080 DEFAULT_GRPC_PORT = 8081 @@ -45,19 +48,31 @@ help='The number of works to fork') parser.add_argument('--max_asyncio_workers', default=None, type=int, help='Max number of asyncio workers to spawn') +parser.add_argument( + "--enable_latency_logging", help="Output a log per request with latency metrics", + required=False, default=False +) args, _ = parser.parse_known_args() tornado.log.enable_pretty_logging() +class MetricsHandler(RequestHandler): + def get(self): + encoder, content_type = choose_encoder(self.request.headers.get('accept')) + self.set_header("Content-Type", content_type) + self.write(encoder(REGISTRY)) + + class ModelServer: def __init__(self, http_port: int = args.http_port, grpc_port: int = args.grpc_port, max_buffer_size: int = args.max_buffer_size, workers: int = args.workers, max_asyncio_workers: int = args.max_asyncio_workers, - registered_models: ModelRepository = ModelRepository()): + registered_models: ModelRepository = ModelRepository(), + enable_latency_logging: bool = args.enable_latency_logging): self.registered_models = registered_models self.http_port = http_port self.grpc_port = grpc_port @@ -65,9 +80,11 @@ def __init__(self, http_port: int = args.http_port, self.workers = workers self.max_asyncio_workers = max_asyncio_workers self._http_server: Optional[tornado.httpserver.HTTPServer] = None + self.enable_latency_logging = validate_enable_latency_logging(enable_latency_logging) def create_application(self): return tornado.web.Application([ + (r"/metrics", MetricsHandler), # Server Liveness API returns 200 if server is alive. (r"/", handlers.LivenessHandler), (r"/v2/health/live", handlers.LivenessHandler), @@ -99,6 +116,8 @@ def start(self, models: Union[List[Model], Dict[str, Deployment]], nest_asyncio: for model in models: if isinstance(model, Model): self.register_model(model) + # pass whether to log request latency into the model + model.enable_latency_logging = self.enable_latency_logging else: raise RuntimeError("Model type should be Model") elif isinstance(models, dict): @@ -148,3 +167,15 @@ def register_model(self, model: Model): "Failed to register model, model.name must be provided.") self.registered_models.update(model) logging.info("Registering model: %s", model.name) + + +def validate_enable_latency_logging(enable_latency_logging): + if isinstance(enable_latency_logging, str): + if enable_latency_logging.lower() == "true": + enable_latency_logging = True + elif enable_latency_logging.lower() == "false": + enable_latency_logging = False + if not isinstance(enable_latency_logging, bool): + raise TypeError(f"enable_latency_logging must be one of [True, true, False, false], " + f"got {enable_latency_logging} instead.") + return enable_latency_logging diff --git a/python/kserve/requirements.txt b/python/kserve/requirements.txt index c612c4d648f..aaa78390455 100644 --- a/python/kserve/requirements.txt +++ b/python/kserve/requirements.txt @@ -21,3 +21,4 @@ ray[serve]==2.0.0 grpcio>=1.34.0 tritonclient==2.18.0 protobuf~=3.19.0 +prometheus-client>=0.13.1 \ No newline at end of file diff --git a/python/kserve/test/test_server.py b/python/kserve/test/test_server.py index 5c71a5c9240..de9d10bc5a4 100644 --- a/python/kserve/test/test_server.py +++ b/python/kserve/test/test_server.py @@ -22,6 +22,7 @@ import io import pytest from cloudevents.http import CloudEvent, to_binary, to_structured +from kserve.model_server import validate_enable_latency_logging from kserve import Model from kserve import ModelServer from kserve import ModelRepository @@ -234,6 +235,11 @@ async def test_unknown_path(self, http_server_client): assert err.value.code == 404 assert err.value.response.body == b'{"error": "invalid path"}' + async def test_metrics(self, http_server_client): + resp = await http_server_client.fetch('/metrics') + assert resp.code == 200 + assert resp.body is not None + class TestRayServer: @pytest.fixture(scope="class") @@ -524,3 +530,19 @@ async def test_predict_ce_avro_binary(self, http_server_client): assert resp.headers['ce-type'] == "io.kserve.inference.response" assert resp.headers['ce-time'] > "2021-01-28T21:04:43.144141+00:00" assert resp.body == b'{"predictions": [["foo", 1, "pink"]]}' + + +def test_validate_enable_latency_logging_bool(): + tests = ["False", "false", "True", "true", True, False] + expected = [False, False, True, True, True, False] + + for i, test in enumerate(tests): + assert expected[i] == validate_enable_latency_logging(test) + + +def test_validate_enable_latency_logging_raises(): + tests = ["F", "t", "wrong"] + + for test in tests: + with pytest.raises(TypeError): + validate_enable_latency_logging(test)