Skip to content

Commit

Permalink
[Serve][Doc] Direct Ingress
Browse files Browse the repository at this point in the history
Signed-off-by: Sihan Wang <sihanwang41@gmail.com>
  • Loading branch information
sihanwang41 committed Oct 7, 2022
1 parent e142be0 commit a7d9c6d
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 0 deletions.
1 change: 1 addition & 0 deletions doc/source/_toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ parts:
- file: serve/handling-dependencies
- file: serve/managing-java-deployments
- file: serve/migration
- file: serve/direct-ingress
- file: serve/architecture
- file: serve/tutorials/index
sections:
Expand Down
101 changes: 101 additions & 0 deletions doc/source/serve/direct-ingress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Direct Ingress

In the 2.1, Serve provides alpha version gRPC ingress. In this section, you will learn how to

* use Serve internal gRPC schema to send traffic
* bring your own gRPC schema into serve application

## Use Serve Schema

Internally, serve provides a simple gRPC schema.
```
message PredictRequest {
map<string, bytes> input = 2;
}
message PredictResponse {
bytes prediction = 1;
}
service PredictAPIsService {
rpc Predict(PredictRequest) returns (PredictResponse);
}
```

Code example:

Server
```{literalinclude} ../serve/doc_code/direct_ingress.py
:start-after: __begin_server__
:end-before: __end_server__
:language: python
```

Client
```{literalinclude} ../serve/doc_code/direct_ingress.py
:start-after: __begin_client__
:end-before: __end_client__
:language: python
```

:::{note}
* `input` is a dictionary of `map<string, bytes> ` following the schema described
* User input data needs to be serialized to bytes type and feed into the `input`.
* Response will be under bytes type, which means user code is responsible for to serialize the output into bytes.
* By default, the gRPC port is 9000.
:::

### Client Schema code generation
There are lots of ways to generate client schema code. Here is a simple step to generate the code.
* Insintall the gRPC code generation tools
```
pip install grpcio-tools
```

* Generate gRPC code based on the schema
```
python -m grpc_tools.protoc --proto_path=src/ray/protobuf/ --python_out=. --grpc_python_out=. src/ray/protobuf/serve.proto
```
After the two steps above, you should have `serve_pb2.py` and `serve_pb2_grpc.py` files generated.(The steps that show above work for all the schemas files generations.)

## Bring your own Schema

If you have customized schema to use, serve also support it!

Assume you have the schema and generated the corresponding code,

```
message PingRequest {
bool no_reply = 1;
}
message PingReply {
}
message PingTimeoutRequest {}
message PingTimeoutReply {}
service TestService {
rpc Ping(PingRequest) returns (PingReply);
rpc PingTimeout(PingTimeoutRequest) returns (PingTimeoutReply);
}
```

Server:
```{literalinclude} ../serve/doc_code/direct_ingress_with_customized_schema.py
:start-after: __begin_server__
:end-before: __end_server__
:language: python
```

Client
```{literalinclude} ../serve/doc_code/direct_ingress_with_customized_schema.py
:start-after: __begin_client__
:end-before: __end_client__
:language: python
```

:::{note}
* To use your own schema, you need to write your driver class `MyDriver` to deploy.
* `is_driver_deployment` is needed to mark the class as driver, serve will make sure the driver class deployment gets deployed one replica per node.
* `gRPCIngress` is used for starting a gRPC server. Your driver class needs to inherit from it.
:::
28 changes: 28 additions & 0 deletions doc/source/serve/doc_code/direct_ingress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# flake8: noqa

# __begin_server__
from ray import serve
from ray.serve.drivers import DefaultgRPCDriver


@serve.deployment
class D1:
def __call__(self, input):
# Do something with input
return input["a"]


my_deployment = DefaultgRPCDriver.bind(D1.bind())

serve.run(my_deployment)
# __end_server__

from ray.serve.generated import serve_pb2, serve_pb2_grpc

# __begin_client__
import grpc

channel = grpc.aio.insecure_channel("localhost:9000")
stub = serve_pb2_grpc.PredictAPIsServiceStub(channel)
response = stub.Predict(serve_pb2.PredictRequest(input={"a": bytes("123", "utf-8")}))
# __end_client__
32 changes: 32 additions & 0 deletions doc/source/serve/doc_code/direct_ingress_with_customized_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# flake8: noqa

# __begin_server__
from ray import serve
from ray.serve.drivers import gRPCIngress
import test_service_pb2_grpc, test_service_pb2


@serve.deployment(is_driver_deployment=True)
class MyDriver(test_service_pb2_grpc.TestServiceServicer, gRPCIngress):
def __init__(self):
super().__init__()

async def Ping(self, request, context):
# play with your dag and then reply
return test_service_pb2.PingReply()


my_deployment = MyDriver.bind()

serve.run(my_deployment)
# __end_server__


# __begin_client__
import grpc
import test_service_pb2_grpc, test_service_pb2

channel = grpc.aio.insecure_channel("localhost:9000")
stub = test_service_pb2_grpc.TestServiceStub(channel)
response = stub.Ping(test_service_pb2.PingRequest())
# __end_client__
85 changes: 85 additions & 0 deletions doc/source/serve/doc_code/test_service_pb2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# flake8: noqa

# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: src/ray/protobuf/test_service.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database

# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n#src/ray/protobuf/test_service.proto\x12\x07ray.rpc"(\n\x0bPingRequest\x12\x19\n\x08no_reply\x18\x01 \x01(\x08R\x07noReply"\x0b\n\tPingReply"\x14\n\x12PingTimeoutRequest"\x12\n\x10PingTimeoutReply2\x86\x01\n\x0bTestService\x12\x30\n\x04Ping\x12\x14.ray.rpc.PingRequest\x1a\x12.ray.rpc.PingReply\x12\x45\n\x0bPingTimeout\x12\x1b.ray.rpc.PingTimeoutRequest\x1a\x19.ray.rpc.PingTimeoutReplyb\x06proto3'
)


_PINGREQUEST = DESCRIPTOR.message_types_by_name["PingRequest"]
_PINGREPLY = DESCRIPTOR.message_types_by_name["PingReply"]
_PINGTIMEOUTREQUEST = DESCRIPTOR.message_types_by_name["PingTimeoutRequest"]
_PINGTIMEOUTREPLY = DESCRIPTOR.message_types_by_name["PingTimeoutReply"]
PingRequest = _reflection.GeneratedProtocolMessageType(
"PingRequest",
(_message.Message,),
{
"DESCRIPTOR": _PINGREQUEST,
"__module__": "src.ray.protobuf.test_service_pb2"
# @@protoc_insertion_point(class_scope:ray.rpc.PingRequest)
},
)
_sym_db.RegisterMessage(PingRequest)

PingReply = _reflection.GeneratedProtocolMessageType(
"PingReply",
(_message.Message,),
{
"DESCRIPTOR": _PINGREPLY,
"__module__": "src.ray.protobuf.test_service_pb2"
# @@protoc_insertion_point(class_scope:ray.rpc.PingReply)
},
)
_sym_db.RegisterMessage(PingReply)

PingTimeoutRequest = _reflection.GeneratedProtocolMessageType(
"PingTimeoutRequest",
(_message.Message,),
{
"DESCRIPTOR": _PINGTIMEOUTREQUEST,
"__module__": "src.ray.protobuf.test_service_pb2"
# @@protoc_insertion_point(class_scope:ray.rpc.PingTimeoutRequest)
},
)
_sym_db.RegisterMessage(PingTimeoutRequest)

PingTimeoutReply = _reflection.GeneratedProtocolMessageType(
"PingTimeoutReply",
(_message.Message,),
{
"DESCRIPTOR": _PINGTIMEOUTREPLY,
"__module__": "src.ray.protobuf.test_service_pb2"
# @@protoc_insertion_point(class_scope:ray.rpc.PingTimeoutReply)
},
)
_sym_db.RegisterMessage(PingTimeoutReply)

_TESTSERVICE = DESCRIPTOR.services_by_name["TestService"]
if _descriptor._USE_C_DESCRIPTORS == False:

DESCRIPTOR._options = None
_PINGREQUEST._serialized_start = 48
_PINGREQUEST._serialized_end = 88
_PINGREPLY._serialized_start = 90
_PINGREPLY._serialized_end = 101
_PINGTIMEOUTREQUEST._serialized_start = 103
_PINGTIMEOUTREQUEST._serialized_end = 123
_PINGTIMEOUTREPLY._serialized_start = 125
_PINGTIMEOUTREPLY._serialized_end = 143
_TESTSERVICE._serialized_start = 146
_TESTSERVICE._serialized_end = 280
# @@protoc_insertion_point(module_scope)
126 changes: 126 additions & 0 deletions doc/source/serve/doc_code/test_service_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# flake8: noqa

# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

import test_service_pb2 as src_dot_ray_dot_protobuf_dot_test__service__pb2


class TestServiceStub(object):
"""Missing associated documentation comment in .proto file."""

def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Ping = channel.unary_unary(
"/ray.rpc.TestService/Ping",
request_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingRequest.SerializeToString,
response_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingReply.FromString,
)
self.PingTimeout = channel.unary_unary(
"/ray.rpc.TestService/PingTimeout",
request_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutRequest.SerializeToString,
response_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutReply.FromString,
)


class TestServiceServicer(object):
"""Missing associated documentation comment in .proto file."""

def Ping(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")

def PingTimeout(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")


def add_TestServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
"Ping": grpc.unary_unary_rpc_method_handler(
servicer.Ping,
request_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingRequest.FromString,
response_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingReply.SerializeToString,
),
"PingTimeout": grpc.unary_unary_rpc_method_handler(
servicer.PingTimeout,
request_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutRequest.FromString,
response_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutReply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
"ray.rpc.TestService", rpc_method_handlers
)
server.add_generic_rpc_handlers((generic_handler,))


# This class is part of an EXPERIMENTAL API.
class TestService(object):
"""Missing associated documentation comment in .proto file."""

@staticmethod
def Ping(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/ray.rpc.TestService/Ping",
src_dot_ray_dot_protobuf_dot_test__service__pb2.PingRequest.SerializeToString,
src_dot_ray_dot_protobuf_dot_test__service__pb2.PingReply.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)

@staticmethod
def PingTimeout(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/ray.rpc.TestService/PingTimeout",
src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutRequest.SerializeToString,
src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutReply.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
1 change: 1 addition & 0 deletions doc/source/serve/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ This user guide will help you navigate the Ray Serve project and show you how to
- [Handling Dependencies](handling-dependencies)
- [Experimental Java API](managing-java-deployments)
- [1.x to 2.x API Migration Guide](migration)
- [Direct Ingress](direct-ingress)

0 comments on commit a7d9c6d

Please sign in to comment.