From a7d9c6dd0b217047265003397b1202f4c52011a0 Mon Sep 17 00:00:00 2001 From: Sihan Wang Date: Thu, 6 Oct 2022 12:09:59 -0700 Subject: [PATCH] [Serve][Doc] Direct Ingress Signed-off-by: Sihan Wang --- doc/source/_toc.yml | 1 + doc/source/serve/direct-ingress.md | 101 ++++++++++++++ doc/source/serve/doc_code/direct_ingress.py | 28 ++++ .../direct_ingress_with_customized_schema.py | 32 +++++ doc/source/serve/doc_code/test_service_pb2.py | 85 ++++++++++++ .../serve/doc_code/test_service_pb2_grpc.py | 126 ++++++++++++++++++ doc/source/serve/user-guide.md | 1 + 7 files changed, 374 insertions(+) create mode 100644 doc/source/serve/direct-ingress.md create mode 100644 doc/source/serve/doc_code/direct_ingress.py create mode 100644 doc/source/serve/doc_code/direct_ingress_with_customized_schema.py create mode 100755 doc/source/serve/doc_code/test_service_pb2.py create mode 100755 doc/source/serve/doc_code/test_service_pb2_grpc.py diff --git a/doc/source/_toc.yml b/doc/source/_toc.yml index fdfd73e934b7..0d925aa21b07 100644 --- a/doc/source/_toc.yml +++ b/doc/source/_toc.yml @@ -194,6 +194,7 @@ parts: - file: serve/handling-dependencies - file: serve/managing-java-deployments - file: serve/migration + - file: serve/direct-ingress - file: serve/architecture - file: serve/tutorials/index sections: diff --git a/doc/source/serve/direct-ingress.md b/doc/source/serve/direct-ingress.md new file mode 100644 index 000000000000..c9071a58e937 --- /dev/null +++ b/doc/source/serve/direct-ingress.md @@ -0,0 +1,101 @@ +# Direct Ingress + +In the 2.1, Serve provides alpha version gRPC ingress. In this section, you will learn how to + +* use Serve internal gRPC schema to send traffic +* bring your own gRPC schema into serve application + +## Use Serve Schema + +Internally, serve provides a simple gRPC schema. +``` +message PredictRequest { + map input = 2; +} + +message PredictResponse { + bytes prediction = 1; +} + +service PredictAPIsService { + rpc Predict(PredictRequest) returns (PredictResponse); +} +``` + +Code example: + +Server +```{literalinclude} ../serve/doc_code/direct_ingress.py +:start-after: __begin_server__ +:end-before: __end_server__ +:language: python +``` + +Client +```{literalinclude} ../serve/doc_code/direct_ingress.py +:start-after: __begin_client__ +:end-before: __end_client__ +:language: python +``` + +:::{note} +* `input` is a dictionary of `map ` following the schema described +* User input data needs to be serialized to bytes type and feed into the `input`. +* Response will be under bytes type, which means user code is responsible for to serialize the output into bytes. +* By default, the gRPC port is 9000. +::: + +### Client Schema code generation +There are lots of ways to generate client schema code. Here is a simple step to generate the code. +* Insintall the gRPC code generation tools +``` +pip install grpcio-tools +``` + +* Generate gRPC code based on the schema +``` +python -m grpc_tools.protoc --proto_path=src/ray/protobuf/ --python_out=. --grpc_python_out=. src/ray/protobuf/serve.proto +``` +After the two steps above, you should have `serve_pb2.py` and `serve_pb2_grpc.py` files generated.(The steps that show above work for all the schemas files generations.) + +## Bring your own Schema + +If you have customized schema to use, serve also support it! + +Assume you have the schema and generated the corresponding code, + +``` +message PingRequest { + bool no_reply = 1; +} +message PingReply { +} + +message PingTimeoutRequest {} +message PingTimeoutReply {} + +service TestService { + rpc Ping(PingRequest) returns (PingReply); + rpc PingTimeout(PingTimeoutRequest) returns (PingTimeoutReply); +} +``` + +Server: +```{literalinclude} ../serve/doc_code/direct_ingress_with_customized_schema.py +:start-after: __begin_server__ +:end-before: __end_server__ +:language: python +``` + +Client +```{literalinclude} ../serve/doc_code/direct_ingress_with_customized_schema.py +:start-after: __begin_client__ +:end-before: __end_client__ +:language: python +``` + +:::{note} +* To use your own schema, you need to write your driver class `MyDriver` to deploy. +* `is_driver_deployment` is needed to mark the class as driver, serve will make sure the driver class deployment gets deployed one replica per node. +* `gRPCIngress` is used for starting a gRPC server. Your driver class needs to inherit from it. +::: diff --git a/doc/source/serve/doc_code/direct_ingress.py b/doc/source/serve/doc_code/direct_ingress.py new file mode 100644 index 000000000000..98e2407f555d --- /dev/null +++ b/doc/source/serve/doc_code/direct_ingress.py @@ -0,0 +1,28 @@ +# flake8: noqa + +# __begin_server__ +from ray import serve +from ray.serve.drivers import DefaultgRPCDriver + + +@serve.deployment +class D1: + def __call__(self, input): + # Do something with input + return input["a"] + + +my_deployment = DefaultgRPCDriver.bind(D1.bind()) + +serve.run(my_deployment) +# __end_server__ + +from ray.serve.generated import serve_pb2, serve_pb2_grpc + +# __begin_client__ +import grpc + +channel = grpc.aio.insecure_channel("localhost:9000") +stub = serve_pb2_grpc.PredictAPIsServiceStub(channel) +response = stub.Predict(serve_pb2.PredictRequest(input={"a": bytes("123", "utf-8")})) +# __end_client__ diff --git a/doc/source/serve/doc_code/direct_ingress_with_customized_schema.py b/doc/source/serve/doc_code/direct_ingress_with_customized_schema.py new file mode 100644 index 000000000000..ec2e88615794 --- /dev/null +++ b/doc/source/serve/doc_code/direct_ingress_with_customized_schema.py @@ -0,0 +1,32 @@ +# flake8: noqa + +# __begin_server__ +from ray import serve +from ray.serve.drivers import gRPCIngress +import test_service_pb2_grpc, test_service_pb2 + + +@serve.deployment(is_driver_deployment=True) +class MyDriver(test_service_pb2_grpc.TestServiceServicer, gRPCIngress): + def __init__(self): + super().__init__() + + async def Ping(self, request, context): + # play with your dag and then reply + return test_service_pb2.PingReply() + + +my_deployment = MyDriver.bind() + +serve.run(my_deployment) +# __end_server__ + + +# __begin_client__ +import grpc +import test_service_pb2_grpc, test_service_pb2 + +channel = grpc.aio.insecure_channel("localhost:9000") +stub = test_service_pb2_grpc.TestServiceStub(channel) +response = stub.Ping(test_service_pb2.PingRequest()) +# __end_client__ diff --git a/doc/source/serve/doc_code/test_service_pb2.py b/doc/source/serve/doc_code/test_service_pb2.py new file mode 100755 index 000000000000..bb6e98e81dc9 --- /dev/null +++ b/doc/source/serve/doc_code/test_service_pb2.py @@ -0,0 +1,85 @@ +# flake8: noqa + +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: src/ray/protobuf/test_service.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n#src/ray/protobuf/test_service.proto\x12\x07ray.rpc"(\n\x0bPingRequest\x12\x19\n\x08no_reply\x18\x01 \x01(\x08R\x07noReply"\x0b\n\tPingReply"\x14\n\x12PingTimeoutRequest"\x12\n\x10PingTimeoutReply2\x86\x01\n\x0bTestService\x12\x30\n\x04Ping\x12\x14.ray.rpc.PingRequest\x1a\x12.ray.rpc.PingReply\x12\x45\n\x0bPingTimeout\x12\x1b.ray.rpc.PingTimeoutRequest\x1a\x19.ray.rpc.PingTimeoutReplyb\x06proto3' +) + + +_PINGREQUEST = DESCRIPTOR.message_types_by_name["PingRequest"] +_PINGREPLY = DESCRIPTOR.message_types_by_name["PingReply"] +_PINGTIMEOUTREQUEST = DESCRIPTOR.message_types_by_name["PingTimeoutRequest"] +_PINGTIMEOUTREPLY = DESCRIPTOR.message_types_by_name["PingTimeoutReply"] +PingRequest = _reflection.GeneratedProtocolMessageType( + "PingRequest", + (_message.Message,), + { + "DESCRIPTOR": _PINGREQUEST, + "__module__": "src.ray.protobuf.test_service_pb2" + # @@protoc_insertion_point(class_scope:ray.rpc.PingRequest) + }, +) +_sym_db.RegisterMessage(PingRequest) + +PingReply = _reflection.GeneratedProtocolMessageType( + "PingReply", + (_message.Message,), + { + "DESCRIPTOR": _PINGREPLY, + "__module__": "src.ray.protobuf.test_service_pb2" + # @@protoc_insertion_point(class_scope:ray.rpc.PingReply) + }, +) +_sym_db.RegisterMessage(PingReply) + +PingTimeoutRequest = _reflection.GeneratedProtocolMessageType( + "PingTimeoutRequest", + (_message.Message,), + { + "DESCRIPTOR": _PINGTIMEOUTREQUEST, + "__module__": "src.ray.protobuf.test_service_pb2" + # @@protoc_insertion_point(class_scope:ray.rpc.PingTimeoutRequest) + }, +) +_sym_db.RegisterMessage(PingTimeoutRequest) + +PingTimeoutReply = _reflection.GeneratedProtocolMessageType( + "PingTimeoutReply", + (_message.Message,), + { + "DESCRIPTOR": _PINGTIMEOUTREPLY, + "__module__": "src.ray.protobuf.test_service_pb2" + # @@protoc_insertion_point(class_scope:ray.rpc.PingTimeoutReply) + }, +) +_sym_db.RegisterMessage(PingTimeoutReply) + +_TESTSERVICE = DESCRIPTOR.services_by_name["TestService"] +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _PINGREQUEST._serialized_start = 48 + _PINGREQUEST._serialized_end = 88 + _PINGREPLY._serialized_start = 90 + _PINGREPLY._serialized_end = 101 + _PINGTIMEOUTREQUEST._serialized_start = 103 + _PINGTIMEOUTREQUEST._serialized_end = 123 + _PINGTIMEOUTREPLY._serialized_start = 125 + _PINGTIMEOUTREPLY._serialized_end = 143 + _TESTSERVICE._serialized_start = 146 + _TESTSERVICE._serialized_end = 280 +# @@protoc_insertion_point(module_scope) diff --git a/doc/source/serve/doc_code/test_service_pb2_grpc.py b/doc/source/serve/doc_code/test_service_pb2_grpc.py new file mode 100755 index 000000000000..73d0d3e7edd8 --- /dev/null +++ b/doc/source/serve/doc_code/test_service_pb2_grpc.py @@ -0,0 +1,126 @@ +# flake8: noqa + +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import test_service_pb2 as src_dot_ray_dot_protobuf_dot_test__service__pb2 + + +class TestServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Ping = channel.unary_unary( + "/ray.rpc.TestService/Ping", + request_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingRequest.SerializeToString, + response_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingReply.FromString, + ) + self.PingTimeout = channel.unary_unary( + "/ray.rpc.TestService/PingTimeout", + request_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutRequest.SerializeToString, + response_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutReply.FromString, + ) + + +class TestServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def Ping(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def PingTimeout(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_TestServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + "Ping": grpc.unary_unary_rpc_method_handler( + servicer.Ping, + request_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingRequest.FromString, + response_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingReply.SerializeToString, + ), + "PingTimeout": grpc.unary_unary_rpc_method_handler( + servicer.PingTimeout, + request_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutRequest.FromString, + response_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "ray.rpc.TestService", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class TestService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def Ping( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/ray.rpc.TestService/Ping", + src_dot_ray_dot_protobuf_dot_test__service__pb2.PingRequest.SerializeToString, + src_dot_ray_dot_protobuf_dot_test__service__pb2.PingReply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def PingTimeout( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/ray.rpc.TestService/PingTimeout", + src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutRequest.SerializeToString, + src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutReply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/doc/source/serve/user-guide.md b/doc/source/serve/user-guide.md index 71264b6b677f..599b437a4f4b 100644 --- a/doc/source/serve/user-guide.md +++ b/doc/source/serve/user-guide.md @@ -13,3 +13,4 @@ This user guide will help you navigate the Ray Serve project and show you how to - [Handling Dependencies](handling-dependencies) - [Experimental Java API](managing-java-deployments) - [1.x to 2.x API Migration Guide](migration) +- [Direct Ingress](direct-ingress)