-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Serve][Doc] Direct Ingress #29149
[Serve][Doc] Direct Ingress #29149
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
# Direct Ingress | ||
|
||
In the 2.1, Serve provides an alpha version of gRPC ingress. In this section, you will learn how to | ||
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
* use Serve's internal gRPC schema to send traffic | ||
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* bring your own gRPC schema into your Serve application | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a note saying this documentation assumes some basic understanding of gRPC There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or have a link to the gRPC documentation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added a grpc documentation above directly |
||
## Use Serve's Schema | ||
|
||
Internally, Serve provides a simple gRPC schema. | ||
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
``` | ||
message PredictRequest { | ||
map<string, bytes> input = 2; | ||
} | ||
|
||
message PredictResponse { | ||
bytes prediction = 1; | ||
} | ||
|
||
service PredictAPIsService { | ||
rpc Predict(PredictRequest) returns (PredictResponse); | ||
} | ||
``` | ||
|
||
Code example: | ||
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Server: | ||
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
```{literalinclude} ../serve/doc_code/direct_ingress.py | ||
:start-after: __begin_server__ | ||
:end-before: __end_server__ | ||
:language: python | ||
``` | ||
|
||
Client: | ||
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
```{literalinclude} ../serve/doc_code/direct_ingress.py | ||
:start-after: __begin_client__ | ||
:end-before: __end_client__ | ||
:language: python | ||
``` | ||
|
||
:::{note} | ||
* `input` is a dictionary of `map<string, bytes> ` following the schema described above. | ||
* The user input data needs to be serialized to `bytes` type and fed into the `input`. | ||
* The response will be under `bytes` type, which means the user code is responsible for serializing the output into bytes. | ||
* By default, the gRPC port is 9000. You can change it by passing port number when calling DefaultgRPCDriver bind function. | ||
::: | ||
|
||
### Client Schema code generation | ||
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
There are lots of ways to generate client schema code. Here is a simple step to generate the code. | ||
architkulkarni marked this conversation as resolved.
Show resolved
Hide resolved
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* Install the gRPC code generation tools | ||
``` | ||
pip install grpcio-tools | ||
``` | ||
|
||
* Generate gRPC code based on the schema | ||
``` | ||
python -m grpc_tools.protoc --proto_path=src/ray/protobuf/ --python_out=. --grpc_python_out=. src/ray/protobuf/serve.proto | ||
``` | ||
After the two steps above, you should have `serve_pb2.py` and `serve_pb2_grpc.py` files generated.(The steps shown above work for generation for any schema file, including "bring your own schema" described below.) | ||
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
## Bring your own Schema | ||
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
If you have a customized schema to use, Serve also supports it! | ||
|
||
Assume you have the following customized schema and have generated the corresponding gRPC code: | ||
|
||
|
||
``` | ||
message PingRequest { | ||
bool no_reply = 1; | ||
} | ||
message PingReply { | ||
} | ||
|
||
message PingTimeoutRequest {} | ||
message PingTimeoutReply {} | ||
|
||
service TestService { | ||
rpc Ping(PingRequest) returns (PingReply); | ||
rpc PingTimeout(PingTimeoutRequest) returns (PingTimeoutReply); | ||
} | ||
``` | ||
|
||
Server: | ||
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
```{literalinclude} ../serve/doc_code/direct_ingress_with_customized_schema.py | ||
:start-after: __begin_server__ | ||
:end-before: __end_server__ | ||
:language: python | ||
``` | ||
|
||
Client: | ||
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
```{literalinclude} ../serve/doc_code/direct_ingress_with_customized_schema.py | ||
:start-after: __begin_client__ | ||
:end-before: __end_client__ | ||
:language: python | ||
``` | ||
|
||
:::{note} | ||
* To use your own schema, you need to write your driver class `MyDriver` to deploy. | ||
* `is_driver_deployment` is needed to mark the class as driver, serve will make sure the driver class deployment gets deployed one replica per node. | ||
* `gRPCIngress` is used for starting a gRPC server. Your driver class needs to inherit from it. | ||
::: |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# flake8: noqa | ||
|
||
# __begin_server__ | ||
from ray import serve | ||
from ray.serve.drivers import DefaultgRPCDriver | ||
|
||
|
||
@serve.deployment | ||
class D1: | ||
def __call__(self, input): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add type hint |
||
# Do something with input | ||
return input["a"] | ||
|
||
|
||
my_deployment = DefaultgRPCDriver.bind(D1.bind()) | ||
|
||
serve.run(my_deployment) | ||
# __end_server__ | ||
|
||
from ray.serve.generated import serve_pb2, serve_pb2_grpc | ||
sihanwang41 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# __begin_client__ | ||
import grpc | ||
|
||
channel = grpc.aio.insecure_channel("localhost:9000") | ||
stub = serve_pb2_grpc.PredictAPIsServiceStub(channel) | ||
response = stub.Predict(serve_pb2.PredictRequest(input={"a": bytes("123", "utf-8")})) | ||
architkulkarni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# __end_client__ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# flake8: noqa | ||
|
||
# __begin_server__ | ||
from ray import serve | ||
from ray.serve.drivers import gRPCIngress | ||
import test_service_pb2_grpc, test_service_pb2 | ||
|
||
|
||
@serve.deployment(is_driver_deployment=True) | ||
class MyDriver(test_service_pb2_grpc.TestServiceServicer, gRPCIngress): | ||
def __init__(self): | ||
super().__init__() | ||
|
||
async def Ping(self, request, context): | ||
# play with your dag and then reply | ||
return test_service_pb2.PingReply() | ||
|
||
|
||
my_deployment = MyDriver.bind() | ||
|
||
serve.run(my_deployment) | ||
# __end_server__ | ||
|
||
|
||
# __begin_client__ | ||
import grpc | ||
import test_service_pb2_grpc, test_service_pb2 | ||
|
||
channel = grpc.aio.insecure_channel("localhost:9000") | ||
stub = test_service_pb2_grpc.TestServiceStub(channel) | ||
response = stub.Ping(test_service_pb2.PingRequest()) | ||
# __end_client__ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
# flake8: noqa | ||
architkulkarni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# -*- coding: utf-8 -*- | ||
# Generated by the protocol buffer compiler. DO NOT EDIT! | ||
# source: src/ray/protobuf/test_service.proto | ||
"""Generated protocol buffer code.""" | ||
from google.protobuf import descriptor as _descriptor | ||
from google.protobuf import descriptor_pool as _descriptor_pool | ||
from google.protobuf import message as _message | ||
from google.protobuf import reflection as _reflection | ||
from google.protobuf import symbol_database as _symbol_database | ||
|
||
# @@protoc_insertion_point(imports) | ||
|
||
_sym_db = _symbol_database.Default() | ||
|
||
|
||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( | ||
b'\n#src/ray/protobuf/test_service.proto\x12\x07ray.rpc"(\n\x0bPingRequest\x12\x19\n\x08no_reply\x18\x01 \x01(\x08R\x07noReply"\x0b\n\tPingReply"\x14\n\x12PingTimeoutRequest"\x12\n\x10PingTimeoutReply2\x86\x01\n\x0bTestService\x12\x30\n\x04Ping\x12\x14.ray.rpc.PingRequest\x1a\x12.ray.rpc.PingReply\x12\x45\n\x0bPingTimeout\x12\x1b.ray.rpc.PingTimeoutRequest\x1a\x19.ray.rpc.PingTimeoutReplyb\x06proto3' | ||
) | ||
|
||
|
||
_PINGREQUEST = DESCRIPTOR.message_types_by_name["PingRequest"] | ||
_PINGREPLY = DESCRIPTOR.message_types_by_name["PingReply"] | ||
_PINGTIMEOUTREQUEST = DESCRIPTOR.message_types_by_name["PingTimeoutRequest"] | ||
_PINGTIMEOUTREPLY = DESCRIPTOR.message_types_by_name["PingTimeoutReply"] | ||
PingRequest = _reflection.GeneratedProtocolMessageType( | ||
"PingRequest", | ||
(_message.Message,), | ||
{ | ||
"DESCRIPTOR": _PINGREQUEST, | ||
"__module__": "src.ray.protobuf.test_service_pb2" | ||
# @@protoc_insertion_point(class_scope:ray.rpc.PingRequest) | ||
}, | ||
) | ||
_sym_db.RegisterMessage(PingRequest) | ||
|
||
PingReply = _reflection.GeneratedProtocolMessageType( | ||
"PingReply", | ||
(_message.Message,), | ||
{ | ||
"DESCRIPTOR": _PINGREPLY, | ||
"__module__": "src.ray.protobuf.test_service_pb2" | ||
# @@protoc_insertion_point(class_scope:ray.rpc.PingReply) | ||
}, | ||
) | ||
_sym_db.RegisterMessage(PingReply) | ||
|
||
PingTimeoutRequest = _reflection.GeneratedProtocolMessageType( | ||
"PingTimeoutRequest", | ||
(_message.Message,), | ||
{ | ||
"DESCRIPTOR": _PINGTIMEOUTREQUEST, | ||
"__module__": "src.ray.protobuf.test_service_pb2" | ||
# @@protoc_insertion_point(class_scope:ray.rpc.PingTimeoutRequest) | ||
}, | ||
) | ||
_sym_db.RegisterMessage(PingTimeoutRequest) | ||
|
||
PingTimeoutReply = _reflection.GeneratedProtocolMessageType( | ||
"PingTimeoutReply", | ||
(_message.Message,), | ||
{ | ||
"DESCRIPTOR": _PINGTIMEOUTREPLY, | ||
"__module__": "src.ray.protobuf.test_service_pb2" | ||
# @@protoc_insertion_point(class_scope:ray.rpc.PingTimeoutReply) | ||
}, | ||
) | ||
_sym_db.RegisterMessage(PingTimeoutReply) | ||
|
||
_TESTSERVICE = DESCRIPTOR.services_by_name["TestService"] | ||
if _descriptor._USE_C_DESCRIPTORS == False: | ||
|
||
DESCRIPTOR._options = None | ||
_PINGREQUEST._serialized_start = 48 | ||
_PINGREQUEST._serialized_end = 88 | ||
_PINGREPLY._serialized_start = 90 | ||
_PINGREPLY._serialized_end = 101 | ||
_PINGTIMEOUTREQUEST._serialized_start = 103 | ||
_PINGTIMEOUTREQUEST._serialized_end = 123 | ||
_PINGTIMEOUTREPLY._serialized_start = 125 | ||
_PINGTIMEOUTREPLY._serialized_end = 143 | ||
_TESTSERVICE._serialized_start = 146 | ||
_TESTSERVICE._serialized_end = 280 | ||
# @@protoc_insertion_point(module_scope) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
# flake8: noqa | ||
|
||
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! | ||
"""Client and server classes corresponding to protobuf-defined services.""" | ||
import grpc | ||
|
||
import test_service_pb2 as src_dot_ray_dot_protobuf_dot_test__service__pb2 | ||
|
||
|
||
class TestServiceStub(object): | ||
"""Missing associated documentation comment in .proto file.""" | ||
|
||
def __init__(self, channel): | ||
"""Constructor. | ||
|
||
Args: | ||
channel: A grpc.Channel. | ||
""" | ||
self.Ping = channel.unary_unary( | ||
"/ray.rpc.TestService/Ping", | ||
request_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingRequest.SerializeToString, | ||
response_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingReply.FromString, | ||
) | ||
self.PingTimeout = channel.unary_unary( | ||
"/ray.rpc.TestService/PingTimeout", | ||
request_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutRequest.SerializeToString, | ||
response_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutReply.FromString, | ||
) | ||
|
||
|
||
class TestServiceServicer(object): | ||
"""Missing associated documentation comment in .proto file.""" | ||
|
||
def Ping(self, request, context): | ||
"""Missing associated documentation comment in .proto file.""" | ||
context.set_code(grpc.StatusCode.UNIMPLEMENTED) | ||
context.set_details("Method not implemented!") | ||
raise NotImplementedError("Method not implemented!") | ||
|
||
def PingTimeout(self, request, context): | ||
"""Missing associated documentation comment in .proto file.""" | ||
context.set_code(grpc.StatusCode.UNIMPLEMENTED) | ||
context.set_details("Method not implemented!") | ||
raise NotImplementedError("Method not implemented!") | ||
|
||
|
||
def add_TestServiceServicer_to_server(servicer, server): | ||
rpc_method_handlers = { | ||
"Ping": grpc.unary_unary_rpc_method_handler( | ||
servicer.Ping, | ||
request_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingRequest.FromString, | ||
response_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingReply.SerializeToString, | ||
), | ||
"PingTimeout": grpc.unary_unary_rpc_method_handler( | ||
servicer.PingTimeout, | ||
request_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutRequest.FromString, | ||
response_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutReply.SerializeToString, | ||
), | ||
} | ||
generic_handler = grpc.method_handlers_generic_handler( | ||
"ray.rpc.TestService", rpc_method_handlers | ||
) | ||
server.add_generic_rpc_handlers((generic_handler,)) | ||
|
||
|
||
# This class is part of an EXPERIMENTAL API. | ||
class TestService(object): | ||
"""Missing associated documentation comment in .proto file.""" | ||
|
||
@staticmethod | ||
def Ping( | ||
request, | ||
target, | ||
options=(), | ||
channel_credentials=None, | ||
call_credentials=None, | ||
insecure=False, | ||
compression=None, | ||
wait_for_ready=None, | ||
timeout=None, | ||
metadata=None, | ||
): | ||
return grpc.experimental.unary_unary( | ||
request, | ||
target, | ||
"/ray.rpc.TestService/Ping", | ||
src_dot_ray_dot_protobuf_dot_test__service__pb2.PingRequest.SerializeToString, | ||
src_dot_ray_dot_protobuf_dot_test__service__pb2.PingReply.FromString, | ||
options, | ||
channel_credentials, | ||
insecure, | ||
call_credentials, | ||
compression, | ||
wait_for_ready, | ||
timeout, | ||
metadata, | ||
) | ||
|
||
@staticmethod | ||
def PingTimeout( | ||
request, | ||
target, | ||
options=(), | ||
channel_credentials=None, | ||
call_credentials=None, | ||
insecure=False, | ||
compression=None, | ||
wait_for_ready=None, | ||
timeout=None, | ||
metadata=None, | ||
): | ||
return grpc.experimental.unary_unary( | ||
request, | ||
target, | ||
"/ray.rpc.TestService/PingTimeout", | ||
src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutRequest.SerializeToString, | ||
src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutReply.FromString, | ||
options, | ||
channel_credentials, | ||
insecure, | ||
call_credentials, | ||
compression, | ||
wait_for_ready, | ||
timeout, | ||
metadata, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.