From 777b6efb56d5c3cd7b39f3679d86710f22706500 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 6 Nov 2024 12:56:35 -0600 Subject: [PATCH 01/10] skip Signed-off-by: Edward Oakes --- python/ray/serve/tests/BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/serve/tests/BUILD b/python/ray/serve/tests/BUILD index 42702b576c08c..369b9a339c6f5 100644 --- a/python/ray/serve/tests/BUILD +++ b/python/ray/serve/tests/BUILD @@ -458,6 +458,7 @@ py_test_module_list( name_suffix = "_with_local_testing_mode", tags = [ "exclusive", + "no_windows", "team:serve", ], deps = [ From 4b404ba277feadb67b33e1a96b0c87f948e1293f Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 20 Nov 2024 10:50:21 -0600 Subject: [PATCH 02/10] WIP Signed-off-by: Edward Oakes --- python/ray/serve/_private/constants.py | 3 +- python/ray/serve/_private/logging_utils.py | 6 +- python/ray/serve/_private/proxy.py | 2 + .../serve/_private/proxy_request_response.py | 3 +- python/ray/serve/_private/replica.py | 62 ++++++++++++++----- 5 files changed, 52 insertions(+), 24 deletions(-) diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index 8fae61c953299..f4c2adac37d1e 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -194,9 +194,8 @@ # Logging format with record key to format string dict SERVE_LOG_RECORD_FORMAT = { SERVE_LOG_REQUEST_ID: "%(request_id)s", - SERVE_LOG_ROUTE: "%(route)s", SERVE_LOG_APPLICATION: "%(application)s", - SERVE_LOG_MESSAGE: "%(filename)s:%(lineno)d - %(message)s", + SERVE_LOG_MESSAGE: "-- %(message)s", SERVE_LOG_LEVEL_NAME: "%(levelname)s", SERVE_LOG_TIME: "%(asctime)s", } diff --git a/python/ray/serve/_private/logging_utils.py b/python/ray/serve/_private/logging_utils.py index 207f0574c317d..5081829670bce 100644 --- a/python/ray/serve/_private/logging_utils.py +++ b/python/ray/serve/_private/logging_utils.py @@ -141,8 +141,6 @@ def format(self, record: logging.LogRecord) -> str: record_formats_attrs = [] if SERVE_LOG_REQUEST_ID in record.__dict__: record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_REQUEST_ID]) - if SERVE_LOG_ROUTE in record.__dict__: - record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_ROUTE]) record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_MESSAGE]) record_format += " ".join(record_formats_attrs) @@ -153,9 +151,9 @@ def format(self, record: logging.LogRecord) -> str: return formatter.format(record) -def access_log_msg(*, method: str, status: str, latency_ms: float): +def access_log_msg(*, method: str, route: str, status: str, latency_ms: float): """Returns a formatted message for an HTTP or ServeHandle access log.""" - return f"{method.upper()} {status.upper()} {latency_ms:.1f}ms" + return f"{method} {route} {status} {latency_ms:.1f}ms" def log_to_stderr_filter(record: logging.LogRecord) -> bool: diff --git a/python/ray/serve/_private/proxy.py b/python/ray/serve/_private/proxy.py index fc4d51b9b181a..b39d9636784b0 100644 --- a/python/ray/serve/_private/proxy.py +++ b/python/ray/serve/_private/proxy.py @@ -476,9 +476,11 @@ async def proxy_request(self, proxy_request: ProxyRequest) -> ResponseGenerator: latency_ms = (time.time() - start_time) * 1000.0 if response_handler_info.should_record_access_log: + request_context = ray.serve.context._serve_request_context.get() logger.info( access_log_msg( method=proxy_request.method, + route=request_context.route, status=str(status.code), latency_ms=latency_ms, ), diff --git a/python/ray/serve/_private/proxy_request_response.py b/python/ray/serve/_private/proxy_request_response.py index 8050c4be215d3..0ca2235fd3dd5 100644 --- a/python/ray/serve/_private/proxy_request_response.py +++ b/python/ray/serve/_private/proxy_request_response.py @@ -58,7 +58,8 @@ def request_type(self) -> str: @property def method(self) -> str: - return self.scope.get("method", "websocket").upper() + # WebSocket messages don't have a 'method' field. + return self.scope.get("method", "WS").upper() @property def route_path(self) -> str: diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index b90c837b6cc0a..e529e1472beb8 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -10,7 +10,7 @@ from contextlib import contextmanager from functools import wraps from importlib import import_module -from typing import Any, AsyncGenerator, Callable, Dict, Optional, Tuple, Union +from typing import Any, AsyncGenerator, Callable, Dict, Generator, Optional, Tuple, Union import starlette.responses from starlette.types import ASGIApp, Message @@ -341,20 +341,23 @@ def get_num_ongoing_requests(self) -> int: """ return self._metrics_manager.get_num_ongoing_requests() - def _maybe_get_asgi_route( + def _maybe_get_http_route_and_method( self, request_metadata: RequestMetadata, request_args: Tuple[Any] - ) -> Optional[str]: + ) -> Tuple[Optional[str], Optional[str]]: """Get the matched route string for ASGI apps to be used in logs & metrics. If this replica does not wrap an ASGI app or there is no matching for the request, returns the existing route from the request metadata. """ route = request_metadata.route + method = None if ( request_metadata.is_http_request and self._user_callable_asgi_app is not None ): req: StreamingHTTPRequest = request_args[0] + # WebSocket messages don't have a 'method' field. + method = req.asgi_scope.get("method", "WS") try: matched_route = get_asgi_route_name( self._user_callable_asgi_app, req.asgi_scope @@ -372,22 +375,24 @@ def _maybe_get_asgi_route( if matched_route is not None: route = matched_route - return route + return route, method @contextmanager def _wrap_user_method_call( self, request_metadata: RequestMetadata, request_args: Tuple[Any] - ): + ) -> Generator[Callable[[str], None], None, None]: """Context manager that wraps user method calls. 1) Sets the request context var with appropriate metadata. 2) Records the access log message (if not disabled). 3) Records per-request metrics via the metrics manager. """ - route = self._maybe_get_asgi_route(request_metadata, request_args) + http_route, http_method = self._maybe_get_http_route_and_method( + request_metadata, request_args + ) ray.serve.context._serve_request_context.set( ray.serve.context._RequestContext( - route=route, + route=http_route, request_id=request_metadata.request_id, _internal_request_id=request_metadata.internal_request_id, app_name=self._deployment_id.app_name, @@ -396,11 +401,16 @@ def _wrap_user_method_call( ) ) + status_code = None + def _write_status_code(s: str): + nonlocal status_code + status_code = s + start_time = time.time() user_exception = None try: self._metrics_manager.inc_num_ongoing_requests() - yield + yield _write_status_code except asyncio.CancelledError as e: user_exception = e @@ -430,14 +440,16 @@ def _wrap_user_method_call( logger.info( access_log_msg( - method=request_metadata.call_method, - status=status_str, + method=http_method or "CALL", + route=http_route or request_metadata.call_method, + # Prefer the HTTP status code if it was populated. + status=status_code or status_str, latency_ms=latency_ms, ), extra={"serve_access_log": True}, ) self._metrics_manager.record_request_metrics( - route=route, + route=http_route, status_str=status_str, latency_ms=latency_ms, was_error=user_exception is not None, @@ -451,6 +463,7 @@ async def _call_user_generator( request_metadata: RequestMetadata, request_args: Tuple[Any], request_kwargs: Dict[str, Any], + write_status_code, ) -> AsyncGenerator[Any, None]: """Calls a user method for a streaming call and yields its results. @@ -459,6 +472,7 @@ async def _call_user_generator( """ call_user_method_future = None wait_for_message_task = None + first_message_peeked = False try: result_queue = MessageQueue() @@ -492,7 +506,15 @@ def _enqueue_thread_safe(item: Any): # and use vanilla pickle (we know it's safe because these messages # only contain primitive Python types). if request_metadata.is_http_request: - yield pickle.dumps(messages) + if not first_message_peeked: + first_message = messages[0] + first_message_peeked = True + if first_message["type"] == "http.response.start": + # HTTP responses begin with exactly one + # "http.response.start" message containing the "status" + # field. Other response types (e.g., WebSockets) may not. + write_status_code(str(first_message["status"])) + yield pickle.dumps(messages) else: for msg in messages: yield msg @@ -523,7 +545,7 @@ async def handle_request( ) -> Tuple[bytes, Any]: """Entrypoint for `stream=False` calls.""" request_metadata = pickle.loads(pickled_request_metadata) - with self._wrap_user_method_call(request_metadata, request_args): + with self._wrap_user_method_call(request_metadata, request_args) as write_status_code: return await asyncio.wrap_future( self._user_callable_wrapper.call_user_method( request_metadata, request_args, request_kwargs @@ -538,7 +560,7 @@ async def handle_request_streaming( ) -> AsyncGenerator[Any, None]: """Generator that is the entrypoint for all `stream=True` handle calls.""" request_metadata = pickle.loads(pickled_request_metadata) - with self._wrap_user_method_call(request_metadata, request_args): + with self._wrap_user_method_call(request_metadata, request_args) as write_status_code: async for result in self._call_user_generator( request_metadata, request_args, @@ -580,7 +602,7 @@ async def handle_request_with_rejection( ) return - with self._wrap_user_method_call(request_metadata, request_args): + with self._wrap_user_method_call(request_metadata, request_args) as write_status_code: yield pickle.dumps( ReplicaQueueLengthInfo( accepted=True, @@ -595,6 +617,7 @@ async def handle_request_with_rejection( request_metadata, request_args, request_kwargs, + write_status_code=write_status_code, ): yield result else: @@ -622,7 +645,7 @@ async def handle_request_from_java( multiplexed_model_id=proto.multiplexed_model_id, route=proto.route, ) - with self._wrap_user_method_call(request_metadata, request_args): + with self._wrap_user_method_call(request_metadata, request_args) as write_status_code: return await asyncio.wrap_future( self._user_callable_wrapper.call_user_method( request_metadata, request_args, request_kwargs @@ -1239,7 +1262,12 @@ async def call_user_method( ) except Exception: - if request_metadata.is_http_request and asgi_args is not None: + if ( + request_metadata.is_http_request + and asgi_args is not None + # If the callable is an ASGI app, it already sent a 500 status response. + and not is_asgi_app + ): await self._send_user_result_over_asgi( starlette.responses.Response( "Internal Server Error", status_code=500 From 9f42aecb921ad7d5ed23da2d0bce871009268ca4 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 20 Nov 2024 10:52:51 -0600 Subject: [PATCH 03/10] skip Signed-off-by: Edward Oakes --- python/ray/serve/_private/replica.py | 42 ++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index e529e1472beb8..c373b86da7a95 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -10,7 +10,16 @@ from contextlib import contextmanager from functools import wraps from importlib import import_module -from typing import Any, AsyncGenerator, Callable, Dict, Generator, Optional, Tuple, Union +from typing import ( + Any, + AsyncGenerator, + Callable, + Dict, + Generator, + Optional, + Tuple, + Union, +) import starlette.responses from starlette.types import ASGIApp, Message @@ -231,6 +240,9 @@ def _add_autoscaling_metrics_point(self) -> None: ) +StatusCodeCallback = Callable[[str], None] + + class ReplicaActor: """Actor definition for replicas of Ray Serve deployments. @@ -380,7 +392,7 @@ def _maybe_get_http_route_and_method( @contextmanager def _wrap_user_method_call( self, request_metadata: RequestMetadata, request_args: Tuple[Any] - ) -> Generator[Callable[[str], None], None, None]: + ) -> Generator[StatusCodeCallback, None, None]: """Context manager that wraps user method calls. 1) Sets the request context var with appropriate metadata. @@ -402,7 +414,8 @@ def _wrap_user_method_call( ) status_code = None - def _write_status_code(s: str): + + def _status_code_callback(s: str): nonlocal status_code status_code = s @@ -410,7 +423,7 @@ def _write_status_code(s: str): user_exception = None try: self._metrics_manager.inc_num_ongoing_requests() - yield _write_status_code + yield _status_code_callback except asyncio.CancelledError as e: user_exception = e @@ -463,7 +476,7 @@ async def _call_user_generator( request_metadata: RequestMetadata, request_args: Tuple[Any], request_kwargs: Dict[str, Any], - write_status_code, + status_code_callback: StatusCodeCallback, ) -> AsyncGenerator[Any, None]: """Calls a user method for a streaming call and yields its results. @@ -512,8 +525,8 @@ def _enqueue_thread_safe(item: Any): if first_message["type"] == "http.response.start": # HTTP responses begin with exactly one # "http.response.start" message containing the "status" - # field. Other response types (e.g., WebSockets) may not. - write_status_code(str(first_message["status"])) + # field. Other response types like WebSockets may not. + status_code_callback(str(first_message["status"])) yield pickle.dumps(messages) else: for msg in messages: @@ -545,7 +558,7 @@ async def handle_request( ) -> Tuple[bytes, Any]: """Entrypoint for `stream=False` calls.""" request_metadata = pickle.loads(pickled_request_metadata) - with self._wrap_user_method_call(request_metadata, request_args) as write_status_code: + with self._wrap_user_method_call(request_metadata, request_args): return await asyncio.wrap_future( self._user_callable_wrapper.call_user_method( request_metadata, request_args, request_kwargs @@ -560,11 +573,14 @@ async def handle_request_streaming( ) -> AsyncGenerator[Any, None]: """Generator that is the entrypoint for all `stream=True` handle calls.""" request_metadata = pickle.loads(pickled_request_metadata) - with self._wrap_user_method_call(request_metadata, request_args) as write_status_code: + with self._wrap_user_method_call( + request_metadata, request_args + ) as status_code_callback: async for result in self._call_user_generator( request_metadata, request_args, request_kwargs, + status_code_callback=status_code_callback, ): yield result @@ -602,7 +618,9 @@ async def handle_request_with_rejection( ) return - with self._wrap_user_method_call(request_metadata, request_args) as write_status_code: + with self._wrap_user_method_call( + request_metadata, request_args + ) as status_code_callback: yield pickle.dumps( ReplicaQueueLengthInfo( accepted=True, @@ -617,7 +635,7 @@ async def handle_request_with_rejection( request_metadata, request_args, request_kwargs, - write_status_code=write_status_code, + status_code_callback=status_code_callback, ): yield result else: @@ -645,7 +663,7 @@ async def handle_request_from_java( multiplexed_model_id=proto.multiplexed_model_id, route=proto.route, ) - with self._wrap_user_method_call(request_metadata, request_args) as write_status_code: + with self._wrap_user_method_call(request_metadata, request_args): return await asyncio.wrap_future( self._user_callable_wrapper.call_user_method( request_metadata, request_args, request_kwargs From 657e170668241a5683b9d3ab5bb2dc369bfa812b Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 20 Nov 2024 11:13:40 -0600 Subject: [PATCH 04/10] fix some tests Signed-off-by: Edward Oakes --- python/ray/serve/tests/test_logging.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/ray/serve/tests/test_logging.py b/python/ray/serve/tests/test_logging.py index b5b723a6d9871..749d5e665ecac 100644 --- a/python/ray/serve/tests/test_logging.py +++ b/python/ray/serve/tests/test_logging.py @@ -122,7 +122,7 @@ def check_log(replica_id: ReplicaID, method_name: str, fail: bool = False): [ name in s, _get_expected_replica_log_content(replica_id) in s, - method_name.upper() in s, + method_name in s, ("ERROR" if fail else "OK") in s, "ms" in s, ("blah blah blah" in s and "RuntimeError" in s) @@ -288,14 +288,14 @@ def __call__(self, req: starlette.requests.Request): # Check the component log expected_log_infos = [ - f"{resp['request_id']} {resp['route']} replica.py", - f"{resp2['request_id']} {resp2['route']} replica.py", + f"{resp['request_id']} -- ", + f"{resp2['request_id']} -- ", ] # Check User log user_log_regexes = [ - f".*{resp['request_id']} {resp['route']}.* user func.*", - f".*{resp2['request_id']} {resp2['route']}.* user log " + f".*{resp['request_id']} -- user func.*", + f".*{resp2['request_id']} -- user log.*" "message from class method.*", ] @@ -344,10 +344,10 @@ def check_log(): ) else: user_method_log_regex = ( - f".*{resp['request_id']} {resp['route']}.* user func.*" + f".*{resp['request_id']} -- user func.*" ) user_class_method_log_regex = ( - f".*{resp2['request_id']} {resp2['route']}.* " + f".*{resp2['request_id']} -- .*" "user log message from class method.*" ) From 82b4183aebc801da1e75b432673e3e443d630aa3 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 20 Nov 2024 11:13:51 -0600 Subject: [PATCH 05/10] fix some tests Signed-off-by: Edward Oakes --- python/ray/serve/tests/test_logging.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/python/ray/serve/tests/test_logging.py b/python/ray/serve/tests/test_logging.py index 749d5e665ecac..8aa20781594e2 100644 --- a/python/ray/serve/tests/test_logging.py +++ b/python/ray/serve/tests/test_logging.py @@ -295,8 +295,7 @@ def __call__(self, req: starlette.requests.Request): # Check User log user_log_regexes = [ f".*{resp['request_id']} -- user func.*", - f".*{resp2['request_id']} -- user log.*" - "message from class method.*", + f".*{resp2['request_id']} -- user log.*" "message from class method.*", ] def check_log(): @@ -343,12 +342,9 @@ def check_log(): f'"component_name": "replica".*' ) else: - user_method_log_regex = ( - f".*{resp['request_id']} -- user func.*" - ) + user_method_log_regex = f".*{resp['request_id']} -- user func.*" user_class_method_log_regex = ( - f".*{resp2['request_id']} -- .*" - "user log message from class method.*" + f".*{resp2['request_id']} -- .*" "user log message from class method.*" ) def check_log_file(log_file: str, expected_regex: list): From ae8a67bf230441f832fc3b1550bec59887fbbdee Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 20 Nov 2024 11:45:59 -0600 Subject: [PATCH 06/10] fix Signed-off-by: Edward Oakes --- python/ray/serve/_private/proxy.py | 15 +++--- python/ray/serve/tests/test_metrics.py | 73 ++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 6 deletions(-) diff --git a/python/ray/serve/_private/proxy.py b/python/ray/serve/_private/proxy.py index 69e55939baefd..415cbbed69588 100644 --- a/python/ray/serve/_private/proxy.py +++ b/python/ray/serve/_private/proxy.py @@ -990,8 +990,7 @@ async def send_request_to_replica( status_code = str(asgi_message["status"]) status = ResponseStatus( code=status_code, - # TODO(edoakes): we need a more nuanced check than this. - is_error=status_code != "200", + is_error=not status_code.startswith("2"), ) expecting_trailers = asgi_message.get("trailers", False) elif asgi_message["type"] == "websocket.accept": @@ -1013,11 +1012,15 @@ async def send_request_to_replica( if not asgi_message.get("more_trailers", False): response_generator.stop_checking_for_disconnect() elif asgi_message["type"] == "websocket.disconnect": - status = ResponseStatus( - code=str(asgi_message["code"]), - # TODO(edoakes): we need a more nuanced check than this. - is_error=False, + status_code = str(asgi_message["code"]) + is_error = ( + # 1xxx status codes are considered errors aside from: + # 1000 (CLOSE_NORMAL), 1001 (CLOSE_GOING_AWAY). + status_code.startswith("1") + and status_code not in ["1000", "1001"] ) + + status = ResponseStatus(code=status_code, is_error=is_error) response_generator.stop_checking_for_disconnect() yield asgi_message diff --git a/python/ray/serve/tests/test_metrics.py b/python/ray/serve/tests/test_metrics.py index 6b3e674e79077..9263d303d1435 100644 --- a/python/ray/serve/tests/test_metrics.py +++ b/python/ray/serve/tests/test_metrics.py @@ -7,6 +7,8 @@ import pytest import requests from fastapi import FastAPI +from starlette.responses import PlainTextResponse +from starlette.requests import Request import ray import ray.util.state as state_api @@ -583,6 +585,77 @@ def f(*args): print("serve_grpc_request_latency_ms_sum working as expected.") +def test_proxy_metrics_http_status_code_is_error(serve_start_shutdown): + """Verify that 2xx status codes aren't errors, others are.""" + def check_request_count_metrics( + expected_error_count: int, + expected_success_count: int, + ): + resp = requests.get("http://127.0.0.1:9999").text + error_count = 0 + success_count = 0 + for line in resp.split("\n"): + if line.startswith("ray_serve_num_http_error_requests_total"): + error_count += int(float(line.split(" ")[-1])) + if line.startswith("ray_serve_num_http_requests_total"): + success_count += int(float(line.split(" ")[-1])) + + assert error_count == expected_error_count + assert success_count == expected_success_count + return True + + @serve.deployment + async def return_status_code(request: Request): + code = int((await request.body()).decode("utf-8")) + return PlainTextResponse("", status_code=code) + + serve.run(return_status_code.bind()) + + # 200 is not an error. + r = requests.get("http://127.0.0.1:8000/", data=b"200") + assert r.status_code == 200 + wait_for_condition( + check_request_count_metrics, + expected_error_count=0, + expected_success_count=1, + ) + + # 2xx is not an error. + r = requests.get("http://127.0.0.1:8000/", data=b"250") + assert r.status_code == 250 + wait_for_condition( + check_request_count_metrics, + expected_error_count=0, + expected_success_count=2, + ) + + # 3xx is an error. + r = requests.get("http://127.0.0.1:8000/", data=b"300") + assert r.status_code == 300 + wait_for_condition( + check_request_count_metrics, + expected_error_count=1, + expected_success_count=3, + ) + + # 4xx is an error. + r = requests.get("http://127.0.0.1:8000/", data=b"400") + assert r.status_code == 400 + wait_for_condition( + check_request_count_metrics, + expected_error_count=2, + expected_success_count=4, + ) + + # 5xx is an error. + r = requests.get("http://127.0.0.1:8000/", data=b"500") + assert r.status_code == 500 + wait_for_condition( + check_request_count_metrics, + expected_error_count=3, + expected_success_count=5, + ) + def test_replica_metrics_fields(serve_start_shutdown): """Test replica metrics fields""" From aff8fe95aa5f3330fafd0ea191b177020dffb3bf Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 20 Nov 2024 11:47:48 -0600 Subject: [PATCH 07/10] fix Signed-off-by: Edward Oakes --- python/ray/serve/_private/constants.py | 3 +- python/ray/serve/_private/logging_utils.py | 6 +- python/ray/serve/_private/proxy.py | 2 - .../serve/_private/proxy_request_response.py | 3 +- python/ray/serve/_private/replica.py | 76 ++++--------------- python/ray/serve/tests/test_logging.py | 18 +++-- 6 files changed, 33 insertions(+), 75 deletions(-) diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index f4c2adac37d1e..8fae61c953299 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -194,8 +194,9 @@ # Logging format with record key to format string dict SERVE_LOG_RECORD_FORMAT = { SERVE_LOG_REQUEST_ID: "%(request_id)s", + SERVE_LOG_ROUTE: "%(route)s", SERVE_LOG_APPLICATION: "%(application)s", - SERVE_LOG_MESSAGE: "-- %(message)s", + SERVE_LOG_MESSAGE: "%(filename)s:%(lineno)d - %(message)s", SERVE_LOG_LEVEL_NAME: "%(levelname)s", SERVE_LOG_TIME: "%(asctime)s", } diff --git a/python/ray/serve/_private/logging_utils.py b/python/ray/serve/_private/logging_utils.py index 5081829670bce..207f0574c317d 100644 --- a/python/ray/serve/_private/logging_utils.py +++ b/python/ray/serve/_private/logging_utils.py @@ -141,6 +141,8 @@ def format(self, record: logging.LogRecord) -> str: record_formats_attrs = [] if SERVE_LOG_REQUEST_ID in record.__dict__: record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_REQUEST_ID]) + if SERVE_LOG_ROUTE in record.__dict__: + record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_ROUTE]) record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_MESSAGE]) record_format += " ".join(record_formats_attrs) @@ -151,9 +153,9 @@ def format(self, record: logging.LogRecord) -> str: return formatter.format(record) -def access_log_msg(*, method: str, route: str, status: str, latency_ms: float): +def access_log_msg(*, method: str, status: str, latency_ms: float): """Returns a formatted message for an HTTP or ServeHandle access log.""" - return f"{method} {route} {status} {latency_ms:.1f}ms" + return f"{method.upper()} {status.upper()} {latency_ms:.1f}ms" def log_to_stderr_filter(record: logging.LogRecord) -> bool: diff --git a/python/ray/serve/_private/proxy.py b/python/ray/serve/_private/proxy.py index 415cbbed69588..94bea67b55d64 100644 --- a/python/ray/serve/_private/proxy.py +++ b/python/ray/serve/_private/proxy.py @@ -454,11 +454,9 @@ async def proxy_request(self, proxy_request: ProxyRequest) -> ResponseGenerator: latency_ms = (time.time() - start_time) * 1000.0 if response_handler_info.should_record_access_log: - request_context = ray.serve.context._serve_request_context.get() logger.info( access_log_msg( method=proxy_request.method, - route=request_context.route, status=str(status.code), latency_ms=latency_ms, ), diff --git a/python/ray/serve/_private/proxy_request_response.py b/python/ray/serve/_private/proxy_request_response.py index 0ca2235fd3dd5..8050c4be215d3 100644 --- a/python/ray/serve/_private/proxy_request_response.py +++ b/python/ray/serve/_private/proxy_request_response.py @@ -58,8 +58,7 @@ def request_type(self) -> str: @property def method(self) -> str: - # WebSocket messages don't have a 'method' field. - return self.scope.get("method", "WS").upper() + return self.scope.get("method", "websocket").upper() @property def route_path(self) -> str: diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index c373b86da7a95..b90c837b6cc0a 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -10,16 +10,7 @@ from contextlib import contextmanager from functools import wraps from importlib import import_module -from typing import ( - Any, - AsyncGenerator, - Callable, - Dict, - Generator, - Optional, - Tuple, - Union, -) +from typing import Any, AsyncGenerator, Callable, Dict, Optional, Tuple, Union import starlette.responses from starlette.types import ASGIApp, Message @@ -240,9 +231,6 @@ def _add_autoscaling_metrics_point(self) -> None: ) -StatusCodeCallback = Callable[[str], None] - - class ReplicaActor: """Actor definition for replicas of Ray Serve deployments. @@ -353,23 +341,20 @@ def get_num_ongoing_requests(self) -> int: """ return self._metrics_manager.get_num_ongoing_requests() - def _maybe_get_http_route_and_method( + def _maybe_get_asgi_route( self, request_metadata: RequestMetadata, request_args: Tuple[Any] - ) -> Tuple[Optional[str], Optional[str]]: + ) -> Optional[str]: """Get the matched route string for ASGI apps to be used in logs & metrics. If this replica does not wrap an ASGI app or there is no matching for the request, returns the existing route from the request metadata. """ route = request_metadata.route - method = None if ( request_metadata.is_http_request and self._user_callable_asgi_app is not None ): req: StreamingHTTPRequest = request_args[0] - # WebSocket messages don't have a 'method' field. - method = req.asgi_scope.get("method", "WS") try: matched_route = get_asgi_route_name( self._user_callable_asgi_app, req.asgi_scope @@ -387,24 +372,22 @@ def _maybe_get_http_route_and_method( if matched_route is not None: route = matched_route - return route, method + return route @contextmanager def _wrap_user_method_call( self, request_metadata: RequestMetadata, request_args: Tuple[Any] - ) -> Generator[StatusCodeCallback, None, None]: + ): """Context manager that wraps user method calls. 1) Sets the request context var with appropriate metadata. 2) Records the access log message (if not disabled). 3) Records per-request metrics via the metrics manager. """ - http_route, http_method = self._maybe_get_http_route_and_method( - request_metadata, request_args - ) + route = self._maybe_get_asgi_route(request_metadata, request_args) ray.serve.context._serve_request_context.set( ray.serve.context._RequestContext( - route=http_route, + route=route, request_id=request_metadata.request_id, _internal_request_id=request_metadata.internal_request_id, app_name=self._deployment_id.app_name, @@ -413,17 +396,11 @@ def _wrap_user_method_call( ) ) - status_code = None - - def _status_code_callback(s: str): - nonlocal status_code - status_code = s - start_time = time.time() user_exception = None try: self._metrics_manager.inc_num_ongoing_requests() - yield _status_code_callback + yield except asyncio.CancelledError as e: user_exception = e @@ -453,16 +430,14 @@ def _status_code_callback(s: str): logger.info( access_log_msg( - method=http_method or "CALL", - route=http_route or request_metadata.call_method, - # Prefer the HTTP status code if it was populated. - status=status_code or status_str, + method=request_metadata.call_method, + status=status_str, latency_ms=latency_ms, ), extra={"serve_access_log": True}, ) self._metrics_manager.record_request_metrics( - route=http_route, + route=route, status_str=status_str, latency_ms=latency_ms, was_error=user_exception is not None, @@ -476,7 +451,6 @@ async def _call_user_generator( request_metadata: RequestMetadata, request_args: Tuple[Any], request_kwargs: Dict[str, Any], - status_code_callback: StatusCodeCallback, ) -> AsyncGenerator[Any, None]: """Calls a user method for a streaming call and yields its results. @@ -485,7 +459,6 @@ async def _call_user_generator( """ call_user_method_future = None wait_for_message_task = None - first_message_peeked = False try: result_queue = MessageQueue() @@ -519,15 +492,7 @@ def _enqueue_thread_safe(item: Any): # and use vanilla pickle (we know it's safe because these messages # only contain primitive Python types). if request_metadata.is_http_request: - if not first_message_peeked: - first_message = messages[0] - first_message_peeked = True - if first_message["type"] == "http.response.start": - # HTTP responses begin with exactly one - # "http.response.start" message containing the "status" - # field. Other response types like WebSockets may not. - status_code_callback(str(first_message["status"])) - yield pickle.dumps(messages) + yield pickle.dumps(messages) else: for msg in messages: yield msg @@ -573,14 +538,11 @@ async def handle_request_streaming( ) -> AsyncGenerator[Any, None]: """Generator that is the entrypoint for all `stream=True` handle calls.""" request_metadata = pickle.loads(pickled_request_metadata) - with self._wrap_user_method_call( - request_metadata, request_args - ) as status_code_callback: + with self._wrap_user_method_call(request_metadata, request_args): async for result in self._call_user_generator( request_metadata, request_args, request_kwargs, - status_code_callback=status_code_callback, ): yield result @@ -618,9 +580,7 @@ async def handle_request_with_rejection( ) return - with self._wrap_user_method_call( - request_metadata, request_args - ) as status_code_callback: + with self._wrap_user_method_call(request_metadata, request_args): yield pickle.dumps( ReplicaQueueLengthInfo( accepted=True, @@ -635,7 +595,6 @@ async def handle_request_with_rejection( request_metadata, request_args, request_kwargs, - status_code_callback=status_code_callback, ): yield result else: @@ -1280,12 +1239,7 @@ async def call_user_method( ) except Exception: - if ( - request_metadata.is_http_request - and asgi_args is not None - # If the callable is an ASGI app, it already sent a 500 status response. - and not is_asgi_app - ): + if request_metadata.is_http_request and asgi_args is not None: await self._send_user_result_over_asgi( starlette.responses.Response( "Internal Server Error", status_code=500 diff --git a/python/ray/serve/tests/test_logging.py b/python/ray/serve/tests/test_logging.py index 8aa20781594e2..b5b723a6d9871 100644 --- a/python/ray/serve/tests/test_logging.py +++ b/python/ray/serve/tests/test_logging.py @@ -122,7 +122,7 @@ def check_log(replica_id: ReplicaID, method_name: str, fail: bool = False): [ name in s, _get_expected_replica_log_content(replica_id) in s, - method_name in s, + method_name.upper() in s, ("ERROR" if fail else "OK") in s, "ms" in s, ("blah blah blah" in s and "RuntimeError" in s) @@ -288,14 +288,15 @@ def __call__(self, req: starlette.requests.Request): # Check the component log expected_log_infos = [ - f"{resp['request_id']} -- ", - f"{resp2['request_id']} -- ", + f"{resp['request_id']} {resp['route']} replica.py", + f"{resp2['request_id']} {resp2['route']} replica.py", ] # Check User log user_log_regexes = [ - f".*{resp['request_id']} -- user func.*", - f".*{resp2['request_id']} -- user log.*" "message from class method.*", + f".*{resp['request_id']} {resp['route']}.* user func.*", + f".*{resp2['request_id']} {resp2['route']}.* user log " + "message from class method.*", ] def check_log(): @@ -342,9 +343,12 @@ def check_log(): f'"component_name": "replica".*' ) else: - user_method_log_regex = f".*{resp['request_id']} -- user func.*" + user_method_log_regex = ( + f".*{resp['request_id']} {resp['route']}.* user func.*" + ) user_class_method_log_regex = ( - f".*{resp2['request_id']} -- .*" "user log message from class method.*" + f".*{resp2['request_id']} {resp2['route']}.* " + "user log message from class method.*" ) def check_log_file(log_file: str, expected_regex: list): From 613e474510295f20d1a31c00c852f94e2cf6092c Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 20 Nov 2024 11:47:54 -0600 Subject: [PATCH 08/10] fix Signed-off-by: Edward Oakes --- python/ray/serve/tests/test_metrics.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/serve/tests/test_metrics.py b/python/ray/serve/tests/test_metrics.py index 9263d303d1435..6094f39ab80ea 100644 --- a/python/ray/serve/tests/test_metrics.py +++ b/python/ray/serve/tests/test_metrics.py @@ -7,8 +7,8 @@ import pytest import requests from fastapi import FastAPI -from starlette.responses import PlainTextResponse from starlette.requests import Request +from starlette.responses import PlainTextResponse import ray import ray.util.state as state_api @@ -587,6 +587,7 @@ def f(*args): def test_proxy_metrics_http_status_code_is_error(serve_start_shutdown): """Verify that 2xx status codes aren't errors, others are.""" + def check_request_count_metrics( expected_error_count: int, expected_success_count: int, @@ -656,6 +657,7 @@ async def return_status_code(request: Request): expected_success_count=5, ) + def test_replica_metrics_fields(serve_start_shutdown): """Test replica metrics fields""" From ad3634735972b7430a2b41907248233cac295e1d Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 20 Nov 2024 11:48:50 -0600 Subject: [PATCH 09/10] fix Signed-off-by: Edward Oakes --- python/ray/serve/_private/proxy.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/ray/serve/_private/proxy.py b/python/ray/serve/_private/proxy.py index 94bea67b55d64..fed1ec9489cd0 100644 --- a/python/ray/serve/_private/proxy.py +++ b/python/ray/serve/_private/proxy.py @@ -1011,14 +1011,15 @@ async def send_request_to_replica( response_generator.stop_checking_for_disconnect() elif asgi_message["type"] == "websocket.disconnect": status_code = str(asgi_message["code"]) - is_error = ( + status = ResponseStatus( + code=status_code, # 1xxx status codes are considered errors aside from: # 1000 (CLOSE_NORMAL), 1001 (CLOSE_GOING_AWAY). - status_code.startswith("1") - and status_code not in ["1000", "1001"] + is_error=( + status_code.startswith("1") + and status_code not in ["1000", "1001"] + ), ) - - status = ResponseStatus(code=status_code, is_error=is_error) response_generator.stop_checking_for_disconnect() yield asgi_message From dacde33e895f48a7b247d825a2dae6b4ce1f0409 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 20 Nov 2024 13:34:49 -0600 Subject: [PATCH 10/10] fix Signed-off-by: Edward Oakes --- python/ray/serve/_private/proxy.py | 12 ++-- python/ray/serve/tests/test_metrics.py | 86 +++++++++++++++++++++++++- 2 files changed, 91 insertions(+), 7 deletions(-) diff --git a/python/ray/serve/_private/proxy.py b/python/ray/serve/_private/proxy.py index fed1ec9489cd0..54f6908072c68 100644 --- a/python/ray/serve/_private/proxy.py +++ b/python/ray/serve/_private/proxy.py @@ -1009,16 +1009,16 @@ async def send_request_to_replica( # the trailers message has been sent. if not asgi_message.get("more_trailers", False): response_generator.stop_checking_for_disconnect() - elif asgi_message["type"] == "websocket.disconnect": + elif asgi_message["type"] in [ + "websocket.close", + "websocket.disconnect", + ]: status_code = str(asgi_message["code"]) status = ResponseStatus( code=status_code, - # 1xxx status codes are considered errors aside from: + # All status codes are considered errors aside from: # 1000 (CLOSE_NORMAL), 1001 (CLOSE_GOING_AWAY). - is_error=( - status_code.startswith("1") - and status_code not in ["1000", "1001"] - ), + is_error=status_code not in ["1000", "1001"], ) response_generator.stop_checking_for_disconnect() diff --git a/python/ray/serve/tests/test_metrics.py b/python/ray/serve/tests/test_metrics.py index 6094f39ab80ea..6f64666a96ba7 100644 --- a/python/ray/serve/tests/test_metrics.py +++ b/python/ray/serve/tests/test_metrics.py @@ -6,9 +6,11 @@ import grpc import pytest import requests -from fastapi import FastAPI +from fastapi import FastAPI, WebSocket from starlette.requests import Request from starlette.responses import PlainTextResponse +from websockets.exceptions import ConnectionClosed +from websockets.sync.client import connect import ray import ray.util.state as state_api @@ -658,6 +660,88 @@ async def return_status_code(request: Request): ) +def test_proxy_metrics_websocket_status_code_is_error(serve_start_shutdown): + """Verify that status codes aisde from 1000 or 1001 are errors.""" + + def check_request_count_metrics( + expected_error_count: int, + expected_success_count: int, + ): + resp = requests.get("http://127.0.0.1:9999").text + error_count = 0 + success_count = 0 + for line in resp.split("\n"): + if line.startswith("ray_serve_num_http_error_requests_total"): + error_count += int(float(line.split(" ")[-1])) + if line.startswith("ray_serve_num_http_requests_total"): + success_count += int(float(line.split(" ")[-1])) + + assert error_count == expected_error_count + assert success_count == expected_success_count + return True + + fastapi_app = FastAPI() + + @serve.deployment + @serve.ingress(fastapi_app) + class WebSocketServer: + @fastapi_app.websocket("/") + async def accept_then_close(self, ws: WebSocket): + await ws.accept() + code = int(await ws.receive_text()) + await ws.close(code=code) + + serve.run(WebSocketServer.bind()) + + # Regular disconnect (1000) is not an error. + with connect("ws://localhost:8000/") as ws: + with pytest.raises(ConnectionClosed): + ws.send("1000") + ws.recv() + + wait_for_condition( + check_request_count_metrics, + expected_error_count=0, + expected_success_count=1, + ) + + # Goaway disconnect (1001) is not an error. + with connect("ws://localhost:8000/") as ws: + with pytest.raises(ConnectionClosed): + ws.send("1001") + ws.recv() + + wait_for_condition( + check_request_count_metrics, + expected_error_count=0, + expected_success_count=2, + ) + + # Other codes are errors. + with connect("ws://localhost:8000/") as ws: + with pytest.raises(ConnectionClosed): + ws.send("1011") + ws.recv() + + wait_for_condition( + check_request_count_metrics, + expected_error_count=1, + expected_success_count=3, + ) + + # Other codes are errors. + with connect("ws://localhost:8000/") as ws: + with pytest.raises(ConnectionClosed): + ws.send("3000") + ws.recv() + + wait_for_condition( + check_request_count_metrics, + expected_error_count=2, + expected_success_count=4, + ) + + def test_replica_metrics_fields(serve_start_shutdown): """Test replica metrics fields"""