Skip to content

Commit

Permalink
add prometheus metrics python sdk (kubeflow#2425)
Browse files Browse the repository at this point in the history
* adding prom metrics in python sdk custom transformer, tests, and documentation

Signed-off-by: alexagriffith <agriffith50@bloomberg.net>
Signed-off-by: alexagriffith <agriffith96@gmail.com>
Signed-off-by: alexagriffith <agriffith50@bloomberg.net>

* lint
Signed-off-by: alexagriffith <agriffith50@bloomberg.net>

Signed-off-by: alexagriffith <agriffith50@bloomberg.net>

Signed-off-by: alexagriffith <agriffith50@bloomberg.net>
Signed-off-by: alexagriffith <agriffith96@gmail.com>
  • Loading branch information
alexagriffith authored Sep 23, 2022
1 parent 15821f3 commit ff7014b
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 14 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*.so
*.dylib
*.pyc
*.egg-info/
bin

# Test binary, build with `go test -c`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,31 @@ Handling connection for 8080
* Connection #0 to host localhost left intact
{"predictions": [2]}
```


## Testing with a local transformer & predictor in Kubernetes (Kserve developer guide)

If you are making changes to the code and would like to test running a transformer on your local machine with the predictor running in Kubernetes,

(1) Follow the above steps to deploy the transformer.yaml to Kubernetes. You can comment out the transformer section in the YAML if you wish to only deploy the predictor to Kubernetes.

(2) Run `pip install -e .` in this directory. If you are using a local version of Kserve, you should also run this command in the appropriate directory with a setup.py file so that you pick up any changes from there.

(3) Port-forward the predictor pod by running
```
kubectl port-forward pods/{name-of-predictor-pod} 8081:8080
```
Since the predictor pod will expose 8080, pick another port to use with localhost. Here, 8081 is used.
(4) Use `localhost:8081` as the {predictor-url} to run the following command from this directory. Pass in the `-config_path="local_config.properties"` to use a local config file. If you deploy the transformer to Kubernetes, it will pull the file from GCS.
```bash
python3 -m image_transformer --predictor_host={predictor-url} --workers=1 --config_path="local_config.properties"
```

Now your service is running and you can send a request via localhost!

```bash
>> curl localhost:8080/v1/models/mnist:predict --data @./input.json
{"predictions": [2]}
```
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@
from .transformer_model_repository import TransformerModelRepository

DEFAULT_MODEL_NAME = "model"
CONFIG_PATH = "/mnt/models/config/config.properties"

parser = argparse.ArgumentParser(parents=[kserve.model_server.parser])
parser.add_argument(
"--predictor_host", help="The URL for the model predict function", required=True
)
parser.add_argument(
"--config_path", help="The path to the config file containing the list of model names",
required=False, default=CONFIG_PATH
)

args, _ = parser.parse_known_args()

CONFIG_PATH = "/mnt/models/config/config.properties"


def parse_config():
separator = "="
keys = {}

with open(CONFIG_PATH) as f:
with open(args.config_path) as f:

for line in f:
if separator in line:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def __init__(self, name: str, predictor_host: str):
Args:
name (str): Name of the model.
predictor_host (str): The host in which the predictor runs.
log_latency (bool): Whether to log the latency metrics per request.
"""
super().__init__(name)
self.predictor_host = predictor_host
Expand All @@ -73,11 +74,12 @@ def __init__(self, name: str, predictor_host: str):
logging.info("EXPLAINER URL %s", self.explainer_host)
self.timeout = 100

def preprocess(self, inputs: Dict) -> Dict:
def preprocess(self, inputs: Dict, headers: Dict[str, str] = None) -> Dict:
"""Pre-process activity of the Image Input data.
Args:
inputs (Dict): KServe http request
headers (Dict): Kserve http request headers
Returns:
Dict: Returns the request input after converting it into a tensor
Expand All @@ -97,11 +99,12 @@ def postprocess(self, inputs: Dict) -> Dict:
"""
return inputs

async def explain(self, request: Dict) -> Dict:
async def explain(self, request: Dict, headers: Dict[str, str] = None) -> Dict:
"""Returns the captum explanations for the input request
Args:
request (Dict): http input request
headers (Dict): http request headers
Raises:
NotImplementedError: If the explainer host is not specified.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
model_snapshot={"name":"startup.cfg","modelCount":1,"models":{"mnist":{"1.0":{"defaultVersion":true,"marName":"mnist.mar","minWorkers":1,"maxWorkers":5,"batchSize":1,"responseTimeout":120}}}}
2 changes: 1 addition & 1 deletion python/kserve/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dev_install:
pip install -e .[test]

test:
pytest -W ignore
cd ../ && pytest -W ignore kserve/test

type_check:
mypy --ignore-missing-imports kserve
13 changes: 13 additions & 0 deletions python/kserve/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ It supports the following storage providers:
* `https://<some_url>.com/model.joblib`
* `http://<some_url>.com/model.joblib`

### Metrics

For latency metrics, send a request to `/metrics`. Prometheus latency histograms are emitted for each of the steps (pre/postprocessing, explain, predict).
Additionally, the latencies of each step are logged per request.

| Metric Name | Description | Type |
|------------------------------------|--------------------------------|-----------|
| request_preprocessing_seconds | pre-processing request latency | Histogram |
| request_explain_processing_seconds | explain request latency | Histogram |
| request_predict_processing_seconds | prediction request latency | Histogram |
| request_postprocessing_seconds | pre-processing request latency | Histogram |


## KServe Client

### Getting Started
Expand Down
56 changes: 49 additions & 7 deletions python/kserve/kserve/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from typing import Dict, Union
import logging
import time
import sys
import inspect
import json
import tornado.web
import tornado.log
from tornado.httpclient import AsyncHTTPClient
from cloudevents.http import CloudEvent
from http import HTTPStatus
Expand All @@ -25,13 +29,20 @@
import grpc
from tritonclient.grpc import InferResult, service_pb2_grpc
from tritonclient.grpc.service_pb2 import ModelInferRequest, ModelInferResponse
from prometheus_client import Histogram

tornado.log.enable_pretty_logging()

PREDICTOR_URL_FORMAT = "http://{0}/v1/models/{1}:predict"
EXPLAINER_URL_FORMAT = "http://{0}/v1/models/{1}:explain"
PREDICTOR_V2_URL_FORMAT = "http://{0}/v2/models/{1}/infer"
EXPLAINER_V2_URL_FORMAT = "http://{0}/v2/models/{1}/explain"

PRE_HIST_TIME = Histogram('request_preprocessing_seconds', 'pre-processing request latency')
POST_HIST_TIME = Histogram('request_postprocessing_seconds', 'post-processing request latency')
PREDICT_HIST_TIME = Histogram('request_predict_processing_seconds', 'prediction request latency')
EXPLAIN_HIST_TIME = Histogram('request_explain_processing_seconds', 'explain request latency')


class ModelType(Enum):
EXPLAINER = 1
Expand Down Expand Up @@ -60,6 +71,10 @@ def __str__(self):
return self.reason


def get_latency_ms(start, end):
return round((end - start) * 1000, 9)


# Model is intended to be subclassed by various components within KServe.
class Model:
def __init__(self, name: str):
Expand All @@ -74,20 +89,47 @@ def __init__(self, name: str):
self.timeout = 600
self._http_client_instance = None
self._grpc_client_stub = None
self.enable_latency_logging = False

async def __call__(self, body, model_type: ModelType = ModelType.PREDICTOR, headers: Dict[str, str] = None):
payload = await self.preprocess(body, headers) if inspect.iscoroutinefunction(self.preprocess) \
else self.preprocess(body, headers)
request_id = headers.get("X-Request-Id")

# latency vars
preprocess_ms = 0
explain_ms = 0
predict_ms = 0
postprocess_ms = 0

with PRE_HIST_TIME.time():
start = time.time()
payload = await self.preprocess(body, headers) if inspect.iscoroutinefunction(self.preprocess) \
else self.preprocess(body, headers)
preprocess_ms = get_latency_ms(start, time.time())
payload = self.validate(payload)
if model_type == ModelType.EXPLAINER:
response = (await self.explain(payload, headers)) if inspect.iscoroutinefunction(self.explain) \
else self.explain(payload, headers)
with EXPLAIN_HIST_TIME.time():
start = time.time()
response = (await self.explain(payload, headers)) if inspect.iscoroutinefunction(self.explain) \
else self.explain(payload, headers)
explain_ms = get_latency_ms(start, time.time())
elif model_type == ModelType.PREDICTOR:
response = (await self.predict(payload, headers)) if inspect.iscoroutinefunction(self.predict) \
else self.predict(payload, headers)
with PREDICT_HIST_TIME.time():
start = time.time()
response = (await self.predict(payload, headers)) if inspect.iscoroutinefunction(self.predict) \
else self.predict(payload, headers)
predict_ms = get_latency_ms(start, time.time())
else:
raise NotImplementedError
response = self.postprocess(response)

with POST_HIST_TIME.time():
start = time.time()
response = self.postprocess(response)
postprocess_ms = get_latency_ms(start, time.time())

if self.enable_latency_logging is True:
logging.info(f"requestId: {request_id}, preprocess_ms: {preprocess_ms}, explain_ms: {explain_ms}, "
f"predict_ms: {predict_ms}, postprocess_ms: {postprocess_ms}")

return response

@property
Expand Down
33 changes: 32 additions & 1 deletion python/kserve/kserve/model_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
from kserve.model_repository import ModelRepository
from ray.serve.api import Deployment, RayServeHandle
from ray import serve
from tornado.web import RequestHandler
from prometheus_client import REGISTRY
from prometheus_client.exposition import choose_encoder

DEFAULT_HTTP_PORT = 8080
DEFAULT_GRPC_PORT = 8081
Expand All @@ -45,29 +48,43 @@
help='The number of works to fork')
parser.add_argument('--max_asyncio_workers', default=None, type=int,
help='Max number of asyncio workers to spawn')
parser.add_argument(
"--enable_latency_logging", help="Output a log per request with latency metrics",
required=False, default=False
)

args, _ = parser.parse_known_args()

tornado.log.enable_pretty_logging()


class MetricsHandler(RequestHandler):
def get(self):
encoder, content_type = choose_encoder(self.request.headers.get('accept'))
self.set_header("Content-Type", content_type)
self.write(encoder(REGISTRY))


class ModelServer:
def __init__(self, http_port: int = args.http_port,
grpc_port: int = args.grpc_port,
max_buffer_size: int = args.max_buffer_size,
workers: int = args.workers,
max_asyncio_workers: int = args.max_asyncio_workers,
registered_models: ModelRepository = ModelRepository()):
registered_models: ModelRepository = ModelRepository(),
enable_latency_logging: bool = args.enable_latency_logging):
self.registered_models = registered_models
self.http_port = http_port
self.grpc_port = grpc_port
self.max_buffer_size = max_buffer_size
self.workers = workers
self.max_asyncio_workers = max_asyncio_workers
self._http_server: Optional[tornado.httpserver.HTTPServer] = None
self.enable_latency_logging = validate_enable_latency_logging(enable_latency_logging)

def create_application(self):
return tornado.web.Application([
(r"/metrics", MetricsHandler),
# Server Liveness API returns 200 if server is alive.
(r"/", handlers.LivenessHandler),
(r"/v2/health/live", handlers.LivenessHandler),
Expand Down Expand Up @@ -99,6 +116,8 @@ def start(self, models: Union[List[Model], Dict[str, Deployment]], nest_asyncio:
for model in models:
if isinstance(model, Model):
self.register_model(model)
# pass whether to log request latency into the model
model.enable_latency_logging = self.enable_latency_logging
else:
raise RuntimeError("Model type should be Model")
elif isinstance(models, dict):
Expand Down Expand Up @@ -148,3 +167,15 @@ def register_model(self, model: Model):
"Failed to register model, model.name must be provided.")
self.registered_models.update(model)
logging.info("Registering model: %s", model.name)


def validate_enable_latency_logging(enable_latency_logging):
if isinstance(enable_latency_logging, str):
if enable_latency_logging.lower() == "true":
enable_latency_logging = True
elif enable_latency_logging.lower() == "false":
enable_latency_logging = False
if not isinstance(enable_latency_logging, bool):
raise TypeError(f"enable_latency_logging must be one of [True, true, False, false], "
f"got {enable_latency_logging} instead.")
return enable_latency_logging
1 change: 1 addition & 0 deletions python/kserve/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ ray[serve]==2.0.0
grpcio>=1.34.0
tritonclient==2.18.0
protobuf~=3.19.0
prometheus-client>=0.13.1
22 changes: 22 additions & 0 deletions python/kserve/test/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io
import pytest
from cloudevents.http import CloudEvent, to_binary, to_structured
from kserve.model_server import validate_enable_latency_logging
from kserve import Model
from kserve import ModelServer
from kserve import ModelRepository
Expand Down Expand Up @@ -234,6 +235,11 @@ async def test_unknown_path(self, http_server_client):
assert err.value.code == 404
assert err.value.response.body == b'{"error": "invalid path"}'

async def test_metrics(self, http_server_client):
resp = await http_server_client.fetch('/metrics')
assert resp.code == 200
assert resp.body is not None


class TestRayServer:
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -524,3 +530,19 @@ async def test_predict_ce_avro_binary(self, http_server_client):
assert resp.headers['ce-type'] == "io.kserve.inference.response"
assert resp.headers['ce-time'] > "2021-01-28T21:04:43.144141+00:00"
assert resp.body == b'{"predictions": [["foo", 1, "pink"]]}'


def test_validate_enable_latency_logging_bool():
tests = ["False", "false", "True", "true", True, False]
expected = [False, False, True, True, True, False]

for i, test in enumerate(tests):
assert expected[i] == validate_enable_latency_logging(test)


def test_validate_enable_latency_logging_raises():
tests = ["F", "t", "wrong"]

for test in tests:
with pytest.raises(TypeError):
validate_enable_latency_logging(test)

0 comments on commit ff7014b

Please sign in to comment.