Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Document WebSocket support in Serve #36735

Merged
10 changes: 7 additions & 3 deletions doc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,20 @@ py_test_run_all_subdirectory(
"source/serve/doc_code/distilbert.py",
"source/serve/doc_code/stable_diffusion.py",
"source/serve/doc_code/object_detection.py",
"source/serve/doc_code/streaming_example.py",
"source/serve/doc_code/vllm_example.py", # Requires GPU and CUDA
"source/serve/doc_code/http_guide/streaming_example.py",
"source/serve/doc_code/http_guide/websockets_example.py",
"source/serve/doc_code/vllm_example.py",
],
extra_srcs = [],
tags = ["exclusive", "team:serve"],
)

py_test_run_all_subdirectory(
size = "medium",
include = ["source/serve/doc_code/streaming_example.py"],
include = [
"source/serve/doc_code/http_guide/streaming_example.py",
"source/serve/doc_code/http_guide/websockets_example.py",
],
exclude = [],
extra_srcs = [],
tags = ["exclusive", "team:serve"],
Expand Down
4 changes: 2 additions & 2 deletions doc/source/serve/doc_code/batching_guide.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ def __call__(self, request: Request) -> StreamingResponse:
class StreamingResponder:
@serve.batch(max_batch_size=5, batch_wait_timeout_s=0.1)
async def generate_numbers(
self, max_list: List[Union[str, StopIteration]]
) -> AsyncGenerator[List[int], None]:
self, max_list: List[str]
) -> AsyncGenerator[List[Union[int, StopIteration]], None]:
for i in range(max(max_list)):
next_numbers = []
for requested_max in max_list:
Expand Down
39 changes: 39 additions & 0 deletions doc/source/serve/doc_code/http_guide/websockets_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# flake8: noqa

# __websocket_serve_app_start__
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

from ray import serve


app = FastAPI()


@serve.deployment
@serve.ingress(app)
class EchoServer:
@app.websocket("/")
async def echo(self, ws: WebSocket):
await ws.accept()

try:
while True:
text = await ws.receive_text()
await ws.send_text(text)
except WebSocketDisconnect:
print("Client disconnected.")


serve_app = serve.run(EchoServer.bind())
# __websocket_serve_app_end__

# __websocket_serve_client_start__
from websockets.sync.client import connect

with connect("ws://localhost:8000") as websocket:
websocket.send("Eureka!")
assert websocket.recv() == "Eureka!"

websocket.send("I've found it!")
assert websocket.recv() == "I've found it!"
# __websocket_serve_client_end__
38 changes: 29 additions & 9 deletions doc/source/serve/http-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ Considering your use case, you can choose the right level of abstraction:

(serve-http)=
## Calling Deployments via HTTP
When you deploy a Serve application, the [ingress deployment](serve-key-concepts-ingress-deployment) (the one passed to `serve.run`) will be exposed over HTTP.
When you deploy a Serve application, the [ingress deployment](serve-key-concepts-ingress-deployment) (the one passed to `serve.run`) is exposed over HTTP.

```{literalinclude} doc_code/http_guide.py
```{literalinclude} doc_code/http_guide/http_guide.py
:start-after: __begin_starlette__
:end-before: __end_starlette__
:language: python
Expand All @@ -31,7 +31,7 @@ Requests to the Serve HTTP server at `/` are routed to the deployment's `__call_

Often for ML models, you just need the API to accept a `numpy` array. You can use Serve's `DAGDriver` to simplify the request parsing.

```{literalinclude} doc_code/http_guide.py
```{literalinclude} doc_code/http_guide/http_guide.py
:start-after: __begin_dagdriver__
:end-before: __end_dagdriver__
:language: python
Expand All @@ -46,23 +46,23 @@ Serve provides a library of HTTP adapters to help you avoid boilerplate code. Th

If you want to define more complex HTTP handling logic, Serve integrates with [FastAPI](https://fastapi.tiangolo.com/). This allows you to define a Serve deployment using the {mod}`@serve.ingress <ray.serve.api.ingress>` decorator that wraps a FastAPI app with its full range of features. The most basic example of this is shown below, but for more details on all that FastAPI has to offer such as variable routes, automatic type validation, dependency injection (e.g., for database connections), and more, please check out [their documentation](https://fastapi.tiangolo.com/).

```{literalinclude} doc_code/http_guide.py
```{literalinclude} doc_code/http_guide/http_guide.py
:start-after: __begin_fastapi__
:end-before: __end_fastapi__
:language: python
```

Now if you send a request to `/hello`, this will be routed to the `root` method of our deployment. We can also easily leverage FastAPI to define multiple routes with different HTTP methods:

```{literalinclude} doc_code/http_guide.py
```{literalinclude} doc_code/http_guide/http_guide.py
:start-after: __begin_fastapi_multi_routes__
:end-before: __end_fastapi_multi_routes__
:language: python
```

You can also pass in an existing FastAPI app to a deployment to serve it as-is:

```{literalinclude} doc_code/http_guide.py
```{literalinclude} doc_code/http_guide/http_guide.py
:start-after: __begin_byo_fastapi__
:end-before: __end_byo_fastapi__
:language: python
Expand All @@ -71,8 +71,28 @@ You can also pass in an existing FastAPI app to a deployment to serve it as-is:
This is useful for scaling out an existing FastAPI app with no modifications necessary.
Existing middlewares, **automatic OpenAPI documentation generation**, and other advanced FastAPI features should work as-is.

```{note}
Serve currently does not support WebSockets. If you have a use case that requires it, please [let us know](https://github.com/ray-project/ray/issues/new/choose)!
### WebSockets

```{warning}
Support for WebSockets is experimental. To enable this feature, set `RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING=1` on the cluster before starting Ray. If you encounter any issues, [file an issue on GitHub](https://github.com/ray-project/ray/issues/new/choose).
```

Serve supports WebSockets via FastAPI:

```{literalinclude} doc_code/http_guide/websockets_example.py
:start-after: __websocket_serve_app_start__
:end-before: __websocket_serve_app_end__
:language: python
```

Decorate the function that handles WebSocket requests with `@app.websocket`. Read more about FastAPI WebSockets in the [FastAPI documentation](https://fastapi.tiangolo.com/advanced/websockets/).

Query the deployment using the `websockets` package (`pip install websockets`):

```{literalinclude} doc_code/http_guide/websockets_example.py
:start-after: __websocket_serve_client_start__
:end-before: __websocket_serve_client_end__
:language: python
```

(serve-http-streaming-response)=
Expand All @@ -93,7 +113,7 @@ The code below defines a Serve application that incrementally streams numbers up
The client-side code is also updated to handle the streaming outputs.
This code uses the `stream=True` option to the [requests](https://requests.readthedocs.io/en/latest/user/advanced/#streaming-requests) library.

```{literalinclude} doc_code/streaming_example.py
```{literalinclude} doc_code/http_guide/streaming_example.py
:start-after: __begin_example__
:end-before: __end_example__
:language: python
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/_private/http_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,8 @@ def _stop_proxies_if_needed(self) -> bool:
to_stop.append(node_id)
elif proxy_state.status == HTTPProxyStatus.UNHEALTHY:
logger.info(
f"HTTP proxy on node '{node_id}' UNHEALTHY. Shut down the unhealthy"
" proxy and restart a new one."
f"HTTP proxy on node '{node_id}' UNHEALTHY. Shutting down "
"the unhealthy proxy and starting a new one."
)
to_stop.append(node_id)

Expand Down