From 746b1c21fee720e59fb50a0597600bbe08675e42 Mon Sep 17 00:00:00 2001 From: Andrew Stepanov Date: Fri, 1 Feb 2019 23:49:11 +0300 Subject: [PATCH] feat: migrate to anyio BREAKING CHANGE: Server and test now use asyncio event loop by default, this behaviour can be changed with PURERPC_BACKEND environment variable BREAKING CHANGE: purerpc.Channel is removed, migrate to purerpc.insecure_channel async context manager (now supports correct shutdown) --- commitlint.config.js | 34 ++++++++ misc/greeter/baseline_main.py | 4 +- misc/greeter/client.py | 22 +++-- misc/greeter/client_grpcio.py | 4 +- misc/greeter/generated/__init__.py | 0 misc/greeter/{ => generated}/greeter.proto | 0 misc/greeter/{ => generated}/greeter_grpc.py | 34 ++++---- misc/greeter/{ => generated}/greeter_pb2.py | 79 ++++++++++++++---- .../{ => generated}/greeter_pb2_grpc.py | 34 ++++---- misc/greeter/main.py | 4 +- misc/greeter/main_pingpong.py | 4 +- misc/greeter/main_pingpong_servicer.py | 11 +-- misc/greeter/test_perf.py | 49 ++++++----- misc/{greeter => h2load}/latency_h2load.sh | 0 .../old_logs/pypy6.curio.log.txt | 0 .../old_logs/pypy6.trio.0.3.log.txt | 0 .../old_logs/pypy6.trio.0.4.log.txt | 0 .../old_logs/python.log.txt | 0 misc/{greeter => h2load}/request.bin | Bin misc/{greeter => h2load}/run_h2load.sh | 0 ...st_bytearray.py => bytearray_perf_test.py} | 4 +- misc/pypy_tests/test_dict.py | 7 -- misc/pypy_tests/test_set.py | 7 -- setup.py | 3 +- src/purerpc/__init__.py | 7 +- src/purerpc/anyio_monkeypatch.py | 42 ++++++++++ src/purerpc/client.py | 58 ++++++++----- src/purerpc/grpc_proto.py | 1 - src/purerpc/grpc_socket.py | 71 +++++++++------- src/purerpc/grpclib/events.py | 17 ++++ src/purerpc/server.py | 65 +++++++++----- src/purerpc/test_utils.py | 4 +- src/purerpc/utils.py | 4 +- src/purerpc/wrappers.py | 13 +-- tests/test_client_server_codegen.py | 78 ++++++++--------- tests/test_client_server_errors.py | 27 +++--- tests/test_client_server_metadata.py | 19 ++--- tests/test_client_server_simple.py | 34 ++++---- tests/test_protoc_plugin.py | 3 +- tests/test_status_codes.py | 24 +++--- 40 files changed, 463 insertions(+), 304 deletions(-) create mode 100644 commitlint.config.js create mode 100644 misc/greeter/generated/__init__.py rename misc/greeter/{ => generated}/greeter.proto (100%) rename misc/greeter/{ => generated}/greeter_grpc.py (73%) rename misc/greeter/{ => generated}/greeter_pb2.py (52%) rename misc/greeter/{ => generated}/greeter_pb2_grpc.py (64%) rename misc/{greeter => h2load}/latency_h2load.sh (100%) rename misc/{greeter => h2load}/old_logs/pypy6.curio.log.txt (100%) rename misc/{greeter => h2load}/old_logs/pypy6.trio.0.3.log.txt (100%) rename misc/{greeter => h2load}/old_logs/pypy6.trio.0.4.log.txt (100%) rename misc/{greeter => h2load}/old_logs/python.log.txt (100%) rename misc/{greeter => h2load}/request.bin (100%) rename misc/{greeter => h2load}/run_h2load.sh (100%) rename misc/pypy_tests/{test_bytearray.py => bytearray_perf_test.py} (96%) delete mode 100644 misc/pypy_tests/test_dict.py delete mode 100644 misc/pypy_tests/test_set.py create mode 100644 src/purerpc/anyio_monkeypatch.py diff --git a/commitlint.config.js b/commitlint.config.js new file mode 100644 index 0000000..1ed9f16 --- /dev/null +++ b/commitlint.config.js @@ -0,0 +1,34 @@ +module.exports = { + rules: { + 'body-leading-blank': [1, 'always'], + 'footer-leading-blank': [1, 'always'], + 'header-max-length': [2, 'always', 72], + 'scope-case': [2, 'always', 'lower-case'], + 'subject-case': [ + 2, + 'never', + ['sentence-case', 'start-case', 'pascal-case', 'upper-case'] + ], + 'subject-empty': [2, 'never'], + 'subject-full-stop': [2, 'never', '.'], + 'type-case': [2, 'always', 'lower-case'], + 'type-empty': [2, 'never'], + 'type-enum': [ + 2, + 'always', + [ + 'build', + 'chore', + 'ci', + 'docs', + 'feat', + 'fix', + 'perf', + 'refactor', + 'revert', + 'style', + 'test' + ] + ] + } +}; diff --git a/misc/greeter/baseline_main.py b/misc/greeter/baseline_main.py index ae8a0aa..6837db5 100644 --- a/misc/greeter/baseline_main.py +++ b/misc/greeter/baseline_main.py @@ -5,8 +5,8 @@ import grpc -from greeter_pb2 import HelloRequest, HelloReply -from greeter_pb2_grpc import GreeterServicer, add_GreeterServicer_to_server +from generated.greeter_pb2 import HelloReply +from generated.greeter_pb2_grpc import GreeterServicer, add_GreeterServicer_to_server _ONE_DAY_IN_SECONDS = 60 * 60 * 24 diff --git a/misc/greeter/client.py b/misc/greeter/client.py index 5521752..22d2386 100644 --- a/misc/greeter/client.py +++ b/misc/greeter/client.py @@ -1,9 +1,8 @@ import curio import time -from purerpc.client import Channel, Client -from greeter_pb2 import HelloRequest, HelloReply -from greeter_grpc import GreeterStub -from purerpc.utils import print_memory_growth_statistics +import purerpc +from generated.greeter_pb2 import HelloRequest +from generated.greeter_grpc import GreeterStub async def worker(channel): @@ -16,14 +15,13 @@ async def worker(channel): async def main_coro(): # await curio.spawn(print_memory_growth_statistics(), daemon=True) - channel = Channel("localhost", 50055) - await channel.connect() - for i in range(100): - start = time.time() - async with curio.TaskGroup() as task_group: - for i in range(100): - await task_group.spawn(worker(channel)) - print("RPS: {}".format(10000 / (time.time() - start))) + async with purerpc.insecure_channel("localhost", 50055) as channel: + for i in range(100): + start = time.time() + async with curio.TaskGroup() as task_group: + for i in range(100): + await task_group.spawn(worker(channel)) + print("RPS: {}".format(10000 / (time.time() - start))) def main(): diff --git a/misc/greeter/client_grpcio.py b/misc/greeter/client_grpcio.py index 419a261..58e4621 100644 --- a/misc/greeter/client_grpcio.py +++ b/misc/greeter/client_grpcio.py @@ -1,6 +1,6 @@ import grpc -from greeter_pb2 import HelloRequest, HelloReply -from greeter_pb2_grpc import GreeterStub +from generated.greeter_pb2 import HelloRequest +from generated.greeter_pb2_grpc import GreeterStub def main(): diff --git a/misc/greeter/generated/__init__.py b/misc/greeter/generated/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/misc/greeter/greeter.proto b/misc/greeter/generated/greeter.proto similarity index 100% rename from misc/greeter/greeter.proto rename to misc/greeter/generated/greeter.proto diff --git a/misc/greeter/greeter_grpc.py b/misc/greeter/generated/greeter_grpc.py similarity index 73% rename from misc/greeter/greeter_grpc.py rename to misc/greeter/generated/greeter_grpc.py index d0021f5..b29329f 100644 --- a/misc/greeter/greeter_grpc.py +++ b/misc/greeter/generated/greeter_grpc.py @@ -1,5 +1,5 @@ import purerpc -import greeter_pb2 +import generated.greeter_pb2 class GreeterServicer(purerpc.Servicer): @@ -25,8 +25,8 @@ def service(self) -> purerpc.Service: self.SayHello, purerpc.RPCSignature( purerpc.Cardinality.UNARY_UNARY, - greeter_pb2.HelloRequest, - greeter_pb2.HelloReply, + generated.greeter_pb2.HelloRequest, + generated.greeter_pb2.HelloReply, ) ) service_obj.add_method( @@ -34,8 +34,8 @@ def service(self) -> purerpc.Service: self.SayHelloGoodbye, purerpc.RPCSignature( purerpc.Cardinality.UNARY_STREAM, - greeter_pb2.HelloRequest, - greeter_pb2.HelloReply, + generated.greeter_pb2.HelloRequest, + generated.greeter_pb2.HelloReply, ) ) service_obj.add_method( @@ -43,8 +43,8 @@ def service(self) -> purerpc.Service: self.SayHelloToMany, purerpc.RPCSignature( purerpc.Cardinality.STREAM_STREAM, - greeter_pb2.HelloRequest, - greeter_pb2.HelloReply, + generated.greeter_pb2.HelloRequest, + generated.greeter_pb2.HelloReply, ) ) service_obj.add_method( @@ -52,8 +52,8 @@ def service(self) -> purerpc.Service: self.SayHelloToManyAtOnce, purerpc.RPCSignature( purerpc.Cardinality.STREAM_UNARY, - greeter_pb2.HelloRequest, - greeter_pb2.HelloReply, + generated.greeter_pb2.HelloRequest, + generated.greeter_pb2.HelloReply, ) ) return service_obj @@ -69,31 +69,31 @@ def __init__(self, channel): "SayHello", purerpc.RPCSignature( purerpc.Cardinality.UNARY_UNARY, - greeter_pb2.HelloRequest, - greeter_pb2.HelloReply, + generated.greeter_pb2.HelloRequest, + generated.greeter_pb2.HelloReply, ) ) self.SayHelloGoodbye = self._client.get_method_stub( "SayHelloGoodbye", purerpc.RPCSignature( purerpc.Cardinality.UNARY_STREAM, - greeter_pb2.HelloRequest, - greeter_pb2.HelloReply, + generated.greeter_pb2.HelloRequest, + generated.greeter_pb2.HelloReply, ) ) self.SayHelloToMany = self._client.get_method_stub( "SayHelloToMany", purerpc.RPCSignature( purerpc.Cardinality.STREAM_STREAM, - greeter_pb2.HelloRequest, - greeter_pb2.HelloReply, + generated.greeter_pb2.HelloRequest, + generated.greeter_pb2.HelloReply, ) ) self.SayHelloToManyAtOnce = self._client.get_method_stub( "SayHelloToManyAtOnce", purerpc.RPCSignature( purerpc.Cardinality.STREAM_UNARY, - greeter_pb2.HelloRequest, - greeter_pb2.HelloReply, + generated.greeter_pb2.HelloRequest, + generated.greeter_pb2.HelloReply, ) ) \ No newline at end of file diff --git a/misc/greeter/greeter_pb2.py b/misc/greeter/generated/greeter_pb2.py similarity index 52% rename from misc/greeter/greeter_pb2.py rename to misc/greeter/generated/greeter_pb2.py index 19319ac..fbcc590 100644 --- a/misc/greeter/greeter_pb2.py +++ b/misc/greeter/generated/greeter_pb2.py @@ -1,5 +1,5 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: greeter.proto +# source: generated/greeter.proto import sys _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) @@ -7,7 +7,6 @@ from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -16,10 +15,11 @@ DESCRIPTOR = _descriptor.FileDescriptor( - name='greeter.proto', + name='generated/greeter.proto', package='', syntax='proto3', - serialized_pb=_b('\n\rgreeter.proto\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2\xd2\x01\n\x07Greeter\x12(\n\x08SayHello\x12\r.HelloRequest\x1a\x0b.HelloReply\"\x00\x12\x31\n\x0fSayHelloGoodbye\x12\r.HelloRequest\x1a\x0b.HelloReply\"\x00\x30\x01\x12\x32\n\x0eSayHelloToMany\x12\r.HelloRequest\x1a\x0b.HelloReply\"\x00(\x01\x30\x01\x12\x36\n\x14SayHelloToManyAtOnce\x12\r.HelloRequest\x1a\x0b.HelloReply\"\x00(\x01\x62\x06proto3') + serialized_options=None, + serialized_pb=_b('\n\x17generated/greeter.proto\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2\xd2\x01\n\x07Greeter\x12(\n\x08SayHello\x12\r.HelloRequest\x1a\x0b.HelloReply\"\x00\x12\x31\n\x0fSayHelloGoodbye\x12\r.HelloRequest\x1a\x0b.HelloReply\"\x00\x30\x01\x12\x32\n\x0eSayHelloToMany\x12\r.HelloRequest\x1a\x0b.HelloReply\"\x00(\x01\x30\x01\x12\x36\n\x14SayHelloToManyAtOnce\x12\r.HelloRequest\x1a\x0b.HelloReply\"\x00(\x01\x62\x06proto3') ) @@ -38,21 +38,21 @@ has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=17, - serialized_end=45, + serialized_start=27, + serialized_end=55, ) @@ -69,21 +69,21 @@ has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=47, - serialized_end=76, + serialized_start=57, + serialized_end=86, ) DESCRIPTOR.message_types_by_name['HelloRequest'] = _HELLOREQUEST @@ -92,17 +92,68 @@ HelloRequest = _reflection.GeneratedProtocolMessageType('HelloRequest', (_message.Message,), dict( DESCRIPTOR = _HELLOREQUEST, - __module__ = 'greeter_pb2' + __module__ = 'generated.greeter_pb2' # @@protoc_insertion_point(class_scope:HelloRequest) )) _sym_db.RegisterMessage(HelloRequest) HelloReply = _reflection.GeneratedProtocolMessageType('HelloReply', (_message.Message,), dict( DESCRIPTOR = _HELLOREPLY, - __module__ = 'greeter_pb2' + __module__ = 'generated.greeter_pb2' # @@protoc_insertion_point(class_scope:HelloReply) )) _sym_db.RegisterMessage(HelloReply) + +_GREETER = _descriptor.ServiceDescriptor( + name='Greeter', + full_name='Greeter', + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=89, + serialized_end=299, + methods=[ + _descriptor.MethodDescriptor( + name='SayHello', + full_name='Greeter.SayHello', + index=0, + containing_service=None, + input_type=_HELLOREQUEST, + output_type=_HELLOREPLY, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='SayHelloGoodbye', + full_name='Greeter.SayHelloGoodbye', + index=1, + containing_service=None, + input_type=_HELLOREQUEST, + output_type=_HELLOREPLY, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='SayHelloToMany', + full_name='Greeter.SayHelloToMany', + index=2, + containing_service=None, + input_type=_HELLOREQUEST, + output_type=_HELLOREPLY, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='SayHelloToManyAtOnce', + full_name='Greeter.SayHelloToManyAtOnce', + index=3, + containing_service=None, + input_type=_HELLOREQUEST, + output_type=_HELLOREPLY, + serialized_options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_GREETER) + +DESCRIPTOR.services_by_name['Greeter'] = _GREETER + # @@protoc_insertion_point(module_scope) diff --git a/misc/greeter/greeter_pb2_grpc.py b/misc/greeter/generated/greeter_pb2_grpc.py similarity index 64% rename from misc/greeter/greeter_pb2_grpc.py rename to misc/greeter/generated/greeter_pb2_grpc.py index 9fe16b7..d821efc 100644 --- a/misc/greeter/greeter_pb2_grpc.py +++ b/misc/greeter/generated/greeter_pb2_grpc.py @@ -1,7 +1,7 @@ # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! import grpc -import greeter_pb2 as greeter__pb2 +from generated import greeter_pb2 as generated_dot_greeter__pb2 class GreeterStub(object): @@ -16,23 +16,23 @@ def __init__(self, channel): """ self.SayHello = channel.unary_unary( '/Greeter/SayHello', - request_serializer=greeter__pb2.HelloRequest.SerializeToString, - response_deserializer=greeter__pb2.HelloReply.FromString, + request_serializer=generated_dot_greeter__pb2.HelloRequest.SerializeToString, + response_deserializer=generated_dot_greeter__pb2.HelloReply.FromString, ) self.SayHelloGoodbye = channel.unary_stream( '/Greeter/SayHelloGoodbye', - request_serializer=greeter__pb2.HelloRequest.SerializeToString, - response_deserializer=greeter__pb2.HelloReply.FromString, + request_serializer=generated_dot_greeter__pb2.HelloRequest.SerializeToString, + response_deserializer=generated_dot_greeter__pb2.HelloReply.FromString, ) self.SayHelloToMany = channel.stream_stream( '/Greeter/SayHelloToMany', - request_serializer=greeter__pb2.HelloRequest.SerializeToString, - response_deserializer=greeter__pb2.HelloReply.FromString, + request_serializer=generated_dot_greeter__pb2.HelloRequest.SerializeToString, + response_deserializer=generated_dot_greeter__pb2.HelloReply.FromString, ) self.SayHelloToManyAtOnce = channel.stream_unary( '/Greeter/SayHelloToManyAtOnce', - request_serializer=greeter__pb2.HelloRequest.SerializeToString, - response_deserializer=greeter__pb2.HelloReply.FromString, + request_serializer=generated_dot_greeter__pb2.HelloRequest.SerializeToString, + response_deserializer=generated_dot_greeter__pb2.HelloReply.FromString, ) @@ -73,23 +73,23 @@ def add_GreeterServicer_to_server(servicer, server): rpc_method_handlers = { 'SayHello': grpc.unary_unary_rpc_method_handler( servicer.SayHello, - request_deserializer=greeter__pb2.HelloRequest.FromString, - response_serializer=greeter__pb2.HelloReply.SerializeToString, + request_deserializer=generated_dot_greeter__pb2.HelloRequest.FromString, + response_serializer=generated_dot_greeter__pb2.HelloReply.SerializeToString, ), 'SayHelloGoodbye': grpc.unary_stream_rpc_method_handler( servicer.SayHelloGoodbye, - request_deserializer=greeter__pb2.HelloRequest.FromString, - response_serializer=greeter__pb2.HelloReply.SerializeToString, + request_deserializer=generated_dot_greeter__pb2.HelloRequest.FromString, + response_serializer=generated_dot_greeter__pb2.HelloReply.SerializeToString, ), 'SayHelloToMany': grpc.stream_stream_rpc_method_handler( servicer.SayHelloToMany, - request_deserializer=greeter__pb2.HelloRequest.FromString, - response_serializer=greeter__pb2.HelloReply.SerializeToString, + request_deserializer=generated_dot_greeter__pb2.HelloRequest.FromString, + response_serializer=generated_dot_greeter__pb2.HelloReply.SerializeToString, ), 'SayHelloToManyAtOnce': grpc.stream_unary_rpc_method_handler( servicer.SayHelloToManyAtOnce, - request_deserializer=greeter__pb2.HelloRequest.FromString, - response_serializer=greeter__pb2.HelloReply.SerializeToString, + request_deserializer=generated_dot_greeter__pb2.HelloRequest.FromString, + response_serializer=generated_dot_greeter__pb2.HelloReply.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( diff --git a/misc/greeter/main.py b/misc/greeter/main.py index d30fdab..7e3f86b 100644 --- a/misc/greeter/main.py +++ b/misc/greeter/main.py @@ -1,10 +1,8 @@ import logging import logging.config -import curio - from purerpc import Server, Service, Stream -from greeter_pb2 import HelloRequest, HelloReply +from generated.greeter_pb2 import HelloRequest, HelloReply def configure_logs(log_file=None): diff --git a/misc/greeter/main_pingpong.py b/misc/greeter/main_pingpong.py index 2be6e40..5c209a2 100644 --- a/misc/greeter/main_pingpong.py +++ b/misc/greeter/main_pingpong.py @@ -1,11 +1,9 @@ import logging import logging.config -import curio - from purerpc.server import Service, Server from purerpc.rpc import Stream -from greeter_pb2 import HelloRequest, HelloReply +from generated.greeter_pb2 import HelloRequest, HelloReply def configure_logs(log_file=None): diff --git a/misc/greeter/main_pingpong_servicer.py b/misc/greeter/main_pingpong_servicer.py index 9636ef5..4ca837f 100644 --- a/misc/greeter/main_pingpong_servicer.py +++ b/misc/greeter/main_pingpong_servicer.py @@ -1,14 +1,9 @@ import argparse import multiprocessing -import logging -import logging.config -import curio - -from purerpc.server import Service, Server -from purerpc.rpc import Stream -from greeter_pb2 import HelloRequest, HelloReply -from greeter_grpc import GreeterServicer +from purerpc.server import Server +from generated.greeter_pb2 import HelloReply +from generated.greeter_grpc import GreeterServicer """ def configure_logs(log_file=None): diff --git a/misc/greeter/test_perf.py b/misc/greeter/test_perf.py index 41b4bed..471d55b 100644 --- a/misc/greeter/test_perf.py +++ b/misc/greeter/test_perf.py @@ -5,8 +5,8 @@ import multiprocessing import purerpc -from greeter_pb2 import HelloRequest, HelloReply -from greeter_grpc import GreeterServicer, GreeterStub +from generated.greeter_pb2 import HelloRequest, HelloReply +from generated.greeter_grpc import GreeterServicer, GreeterStub @contextlib.contextmanager @@ -68,28 +68,27 @@ async def do_load_stream(stub, num_requests, message_size): async def worker(port, queue, num_concurrent_streams, num_requests_per_stream, num_rounds, message_size, load_type): - channel = purerpc.Channel("localhost", port) - await channel.connect() - stub = GreeterStub(channel) - if load_type == "unary": - load_fn = do_load_unary - elif load_type == "stream": - load_fn = do_load_stream - else: - raise ValueError(f"Unknown load type: {load_type}") - for _ in range(num_rounds): - tasks = [] - start = time.time() - async with curio.TaskGroup() as task_group: - for _ in range(num_concurrent_streams): - task = await task_group.spawn(load_fn(stub, num_requests_per_stream, message_size)) - tasks.append(task) - end = time.time() - rps = num_concurrent_streams * num_requests_per_stream / (end - start) - queue.put(rps) - queue.put([task.result for task in tasks]) - queue.close() - queue.join_thread() + async with purerpc.insecure_channel("localhost", port) as channel: + stub = GreeterStub(channel) + if load_type == "unary": + load_fn = do_load_unary + elif load_type == "stream": + load_fn = do_load_stream + else: + raise ValueError(f"Unknown load type: {load_type}") + for _ in range(num_rounds): + tasks = [] + start = time.time() + async with curio.TaskGroup() as task_group: + for _ in range(num_concurrent_streams): + task = await task_group.spawn(load_fn(stub, num_requests_per_stream, message_size)) + tasks.append(task) + end = time.time() + rps = num_concurrent_streams * num_requests_per_stream / (end - start) + queue.put(rps) + queue.put([task.result for task in tasks]) + queue.close() + queue.join_thread() if __name__ == "__main__": @@ -97,7 +96,7 @@ async def worker(port, queue, num_concurrent_streams, num_requests_per_stream, parser.add_argument("--message_size", type=int, default=1000) parser.add_argument("--num_workers", type=int, default=3) parser.add_argument("--num_concurrent_streams", type=int, default=100) - parser.add_argument("--num_requests_per_stream", type=int, default=100) + parser.add_argument("--num_requests_per_stream", type=int, default=50) parser.add_argument("--num_rounds", type=int, default=10) parser.add_argument("--load_type", choices=["unary", "stream"], required=True) diff --git a/misc/greeter/latency_h2load.sh b/misc/h2load/latency_h2load.sh similarity index 100% rename from misc/greeter/latency_h2load.sh rename to misc/h2load/latency_h2load.sh diff --git a/misc/greeter/old_logs/pypy6.curio.log.txt b/misc/h2load/old_logs/pypy6.curio.log.txt similarity index 100% rename from misc/greeter/old_logs/pypy6.curio.log.txt rename to misc/h2load/old_logs/pypy6.curio.log.txt diff --git a/misc/greeter/old_logs/pypy6.trio.0.3.log.txt b/misc/h2load/old_logs/pypy6.trio.0.3.log.txt similarity index 100% rename from misc/greeter/old_logs/pypy6.trio.0.3.log.txt rename to misc/h2load/old_logs/pypy6.trio.0.3.log.txt diff --git a/misc/greeter/old_logs/pypy6.trio.0.4.log.txt b/misc/h2load/old_logs/pypy6.trio.0.4.log.txt similarity index 100% rename from misc/greeter/old_logs/pypy6.trio.0.4.log.txt rename to misc/h2load/old_logs/pypy6.trio.0.4.log.txt diff --git a/misc/greeter/old_logs/python.log.txt b/misc/h2load/old_logs/python.log.txt similarity index 100% rename from misc/greeter/old_logs/python.log.txt rename to misc/h2load/old_logs/python.log.txt diff --git a/misc/greeter/request.bin b/misc/h2load/request.bin similarity index 100% rename from misc/greeter/request.bin rename to misc/h2load/request.bin diff --git a/misc/greeter/run_h2load.sh b/misc/h2load/run_h2load.sh similarity index 100% rename from misc/greeter/run_h2load.sh rename to misc/h2load/run_h2load.sh diff --git a/misc/pypy_tests/test_bytearray.py b/misc/pypy_tests/bytearray_perf_test.py similarity index 96% rename from misc/pypy_tests/test_bytearray.py rename to misc/pypy_tests/bytearray_perf_test.py index 0e14a31..c9a2bdd 100644 --- a/misc/pypy_tests/test_bytearray.py +++ b/misc/pypy_tests/bytearray_perf_test.py @@ -1,6 +1,4 @@ -import random import collections -from purerpc.grpclib.message_buffer import ByteBuffer class ByteBuffer: @@ -89,7 +87,7 @@ def __len__(self): def main(): - from purerpc.grpclib.message_buffer import ByteBuffer + from purerpc.grpclib.buffers import ByteBuffer b = b"\x00" * 50 x = ByteBuffer() for i in range(5000000): diff --git a/misc/pypy_tests/test_dict.py b/misc/pypy_tests/test_dict.py deleted file mode 100644 index fb40669..0000000 --- a/misc/pypy_tests/test_dict.py +++ /dev/null @@ -1,7 +0,0 @@ -import random - -s = {} -for i in range(100000000): - s[i % 1000000] = i - -print(len(s)) diff --git a/misc/pypy_tests/test_set.py b/misc/pypy_tests/test_set.py deleted file mode 100644 index b261a28..0000000 --- a/misc/pypy_tests/test_set.py +++ /dev/null @@ -1,7 +0,0 @@ -import random - -s = set() -for i in range(100000000): - s.add(i % 1000000) - -print(len(s)) diff --git a/setup.py b/setup.py index 772f481..eb17f3c 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,8 @@ def main(): install_requires=[ "h2", "protobuf", - "curio", + "anyio", + "async_exit_stack" ], entry_points={'console_scripts': console_scripts}, setup_requires=["pytest-runner"], diff --git a/src/purerpc/__init__.py b/src/purerpc/__init__.py index 7e79b7f..a54af60 100644 --- a/src/purerpc/__init__.py +++ b/src/purerpc/__init__.py @@ -1,4 +1,9 @@ -from purerpc.client import Channel, Client +from .anyio_monkeypatch import apply_monkeypatch as _apply_anyio_monkeypatch + +_apply_anyio_monkeypatch() + + +from purerpc.client import insecure_channel, Client from purerpc.server import Service, Servicer, Server from purerpc.rpc import Cardinality, RPCSignature, Stream from purerpc.grpclib.status import Status, StatusCode diff --git a/src/purerpc/anyio_monkeypatch.py b/src/purerpc/anyio_monkeypatch.py new file mode 100644 index 0000000..219314c --- /dev/null +++ b/src/purerpc/anyio_monkeypatch.py @@ -0,0 +1,42 @@ +import os +import ssl +import anyio +import logging + +from anyio import run as _anyio_run + +log = logging.getLogger(__name__) + +WantRead = (BlockingIOError, InterruptedError, ssl.SSLWantReadError) +WantWrite = (BlockingIOError, InterruptedError, ssl.SSLWantWriteError) + + +def _new_run(func, *args, backend=None, backend_options=None): + if backend is None: + backend = os.getenv("PURERPC_BACKEND", "asyncio") + log.info(f"Selected {backend} backend") + _anyio_run(func, *args, backend=backend, backend_options=backend_options) + + +async def _new_sendall(self, data, *, flags=0): + with memoryview(data).cast('B') as buffer: + total_sent = 0 + while buffer: + await self._check_cancelled() + try: + num_sent = self._raw_socket.send(buffer, flags) + total_sent += num_sent + buffer = buffer[num_sent:] + except WantWrite: + await self._wait_writable() + except WantRead: # pragma: no cover + await self._wait_readable() + except ssl.SSLEOFError: + self._raw_socket.close() + raise + + +def apply_monkeypatch(): + """Apply AnyIO monkeypatches (should merge upstream)""" + anyio.run = _new_run + anyio._networking.BaseSocket.sendall = _new_sendall \ No newline at end of file diff --git a/src/purerpc/client.py b/src/purerpc/client.py index 3dc78d4..87dbbef 100644 --- a/src/purerpc/client.py +++ b/src/purerpc/client.py @@ -1,41 +1,55 @@ -import curio -from purerpc.grpc_proto import GRPCProtoSocket import functools + +import anyio +import async_exit_stack + +from purerpc.grpc_proto import GRPCProtoSocket from purerpc.grpclib.config import GRPCConfiguration from purerpc.rpc import RPCSignature, Cardinality from purerpc.wrappers import ClientStubUnaryUnary, ClientStubStreamStream, ClientStubUnaryStream, \ ClientStubStreamUnary -class Channel: +class _Channel(async_exit_stack.AsyncExitStack): def __init__(self, host, port): - self.host = host - self.port = port - self.grpc_socket = None + super().__init__() + self._host = host + self._port = port + self._grpc_socket = None + self._task_group = None - async def connect(self): - socket = await curio.open_connection(self.host, self.port) + async def __aenter__(self): + await super().__aenter__() # Does nothing + + background_task_group = await self.enter_async_context(anyio.create_task_group()) + self.push_async_callback(background_task_group.cancel_scope.cancel) + self._task_group = await self.enter_async_context(anyio.create_task_group()) + + socket = await anyio.connect_tcp(self._host, self._port, autostart_tls=False, tls_standard_compatible=False) config = GRPCConfiguration(client_side=True) - self.grpc_socket = GRPCProtoSocket(config, socket) - await self.grpc_socket.initiate_connection() + self._grpc_socket = GRPCProtoSocket(config, socket) + await self._grpc_socket.initiate_connection(background_task_group) + return self + + +def insecure_channel(host, port): + return _Channel(host, port) class Client: - def __init__(self, service_name: str, channel: Channel): + def __init__(self, service_name: str, channel: _Channel): self.service_name = service_name self.channel = channel async def rpc(self, method_name: str, request_type, response_type, metadata=None): - if self.channel.grpc_socket is None: - await self.channel.connect() message_type = request_type.DESCRIPTOR.full_name if metadata is None: metadata = () - stream = await self.channel.grpc_socket.start_request("http", self.service_name, - method_name, message_type, - "{}:{}".format(self.channel.host, - self.channel.port), - custom_metadata=metadata) + stream = await self.channel._grpc_socket.start_request("http", self.service_name, + method_name, message_type, + "{}:{}".format(self.channel._host, + self.channel._port), + custom_metadata=metadata) stream.expect_message_type(response_type) return stream @@ -43,10 +57,10 @@ def get_method_stub(self, method_name: str, signature: RPCSignature): stream_fn = functools.partial(self.rpc, method_name, signature.request_type, signature.response_type) if signature.cardinality == Cardinality.STREAM_STREAM: - return ClientStubStreamStream(stream_fn) + return ClientStubStreamStream(stream_fn, self.channel._task_group) elif signature.cardinality == Cardinality.UNARY_STREAM: - return ClientStubUnaryStream(stream_fn) + return ClientStubUnaryStream(stream_fn, self.channel._task_group) elif signature.cardinality == Cardinality.STREAM_UNARY: - return ClientStubStreamUnary(stream_fn) + return ClientStubStreamUnary(stream_fn, self.channel._task_group) else: - return ClientStubUnaryUnary(stream_fn) + return ClientStubUnaryUnary(stream_fn, self.channel._task_group) diff --git a/src/purerpc/grpc_proto.py b/src/purerpc/grpc_proto.py index ba1bb41..0b54bfd 100644 --- a/src/purerpc/grpc_proto.py +++ b/src/purerpc/grpc_proto.py @@ -1,6 +1,5 @@ import datetime -import curio.io from purerpc.grpclib.config import GRPCConfiguration from purerpc.grpclib.events import MessageReceived, RequestEnded, ResponseEnded diff --git a/src/purerpc/grpc_socket.py b/src/purerpc/grpc_socket.py index 386257c..e8d6aab 100644 --- a/src/purerpc/grpc_socket.py +++ b/src/purerpc/grpc_socket.py @@ -1,10 +1,11 @@ -import curio +import sys import socket -import curio.io import datetime -import collections + +import anyio import h2 import h2.events +import h2.exceptions from purerpc.utils import is_darwin from purerpc.grpclib.exceptions import ProtocolError @@ -14,21 +15,25 @@ class SocketWrapper: - def __init__(self, grpc_connection: GRPCConnection, sock: curio.io.Socket): - sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + def __init__(self, grpc_connection: GRPCConnection, sock: anyio._networking.SocketStream): + self._set_raw_socket_options(sock._socket._raw_socket) + self._socket = sock._socket + self._grpc_connection = grpc_connection + self._write_fifo_lock = anyio.create_lock() + self._retry = False + + @staticmethod + def _set_raw_socket_options(raw_socket): + raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) if hasattr(socket, "TCP_KEEPIDLE"): - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 300) + raw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 300) elif is_darwin(): # Darwin specific option TCP_KEEPALIVE = 16 - sock.setsockopt(socket.IPPROTO_TCP, TCP_KEEPALIVE, 300) - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30) - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5) - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - self._socket = sock - self._grpc_connection = grpc_connection - self._write_fifo_lock = curio.Lock() - self._retry = False + raw_socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPALIVE, 300) + raw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30) + raw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5) + raw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) async def flush(self): """This maybe called from different threads.""" @@ -63,8 +68,8 @@ def __init__(self, grpc_connection: GRPCConnection, stream_id: int, socket: Sock self._grpc_connection = grpc_connection self._grpc_socket = grpc_socket self._socket = socket - self._flow_control_update_event = curio.Event() - self._incoming_events = curio.Queue() + self._flow_control_update_event = anyio.create_event() + self._incoming_events = anyio.create_queue(sys.maxsize) self._response_started = False self._closed = False @@ -80,14 +85,6 @@ def start_stream_event(self): def end_stream_event(self): return self._end_stream_event - @property - def flow_control_update_event(self): - return self._flow_control_update_event - - @property - def incoming_events(self): - return self._incoming_events - @property def stream_id(self): return self._stream_id @@ -100,6 +97,13 @@ def client_side(self): def debug_prefix(self): return "[CLIENT] " if self.client_side else "[SERVER] " + async def _set_flow_control_update(self): + await self._flow_control_update_event.set() + + async def _wait_flow_control_update(self): + await self._flow_control_update_event.wait() + self._flow_control_update_event.clear() + async def _send(self, message: bytes, compress=False): message_write_buffer = MessageWriteBuffer(self._grpc_connection.config.message_encoding, self._grpc_connection.config.max_message_length) @@ -107,8 +111,7 @@ async def _send(self, message: bytes, compress=False): while message_write_buffer: window_size = self._grpc_connection.flow_control_window(self._stream_id) if window_size <= 0: - await self._flow_control_update_event.wait() - self._flow_control_update_event.clear() + await self._wait_flow_control_update() continue num_data_to_send = min(window_size, len(message_write_buffer)) data = message_write_buffer.data_to_send(num_data_to_send) @@ -136,7 +139,11 @@ async def close(self, status=None, content_type_suffix="", custom_metadata=()): raise TypeError("Closing already closed stream") self._closed = True if self.client_side: - self._grpc_connection.end_request(self._stream_id) + try: + self._grpc_connection.end_request(self._stream_id) + except h2.exceptions.StreamClosedError: + # Remote end already closed connection, do nothing here + pass elif self._response_started: self._grpc_connection.end_response(self._stream_id, status, custom_metadata) else: @@ -158,7 +165,7 @@ async def start_response(self, content_type_suffix="", custom_metadata=()): class GRPCSocket: StreamClass = GRPCStream - def __init__(self, config: GRPCConfiguration, sock: curio.io.Socket, + def __init__(self, config: GRPCConfiguration, sock, receive_buffer_size=16384): self._grpc_connection = GRPCConnection(config=config) self._socket = SocketWrapper(self._grpc_connection, sock) @@ -197,9 +204,9 @@ async def _listen(self): if isinstance(event, h2.events.WindowUpdated): if event.stream_id == 0: for stream in self._streams.values(): - await stream.flow_control_update_event.set() + await stream._set_flow_control_update() elif event.stream_id in self._streams: - await self._streams[event.stream_id].flow_control_update_event.set() + await self._streams[event.stream_id]._set_flow_control_update() continue elif isinstance(event, RequestReceived): self._allocate_stream(event.stream_id) @@ -216,11 +223,11 @@ async def _listener_thread(self): async for _ in self._listen(): raise ProtocolError("Received request on client end") - async def initiate_connection(self): + async def initiate_connection(self, parent_task_group: anyio.abc.TaskGroup): self._grpc_connection.initiate_connection() await self._socket.flush() if self.client_side: - await curio.spawn(self._listener_thread(), daemon=True) + await parent_task_group.spawn(self._listener_thread) async def listen(self): if self.client_side: diff --git a/src/purerpc/grpclib/events.py b/src/purerpc/grpclib/events.py index cc4cb2c..040569c 100644 --- a/src/purerpc/grpclib/events.py +++ b/src/purerpc/grpclib/events.py @@ -88,6 +88,10 @@ def parse_from_stream_id_and_headers_destructive(stream_id: int, headers: Header for header in headers.extract_headers(header_name)) return event + def __repr__(self): + return (f"") + class MessageReceived(Event): def __init__(self, stream_id: int, data: bytes, flow_controlled_length: int): @@ -95,11 +99,18 @@ def __init__(self, stream_id: int, data: bytes, flow_controlled_length: int): self.data = data self.flow_controlled_length = flow_controlled_length + def __repr__(self): + return (f"") + class RequestEnded(Event): def __init__(self, stream_id: int): self.stream_id = stream_id + def __repr__(self): + return f"" + class ResponseReceived(Event): def __init__(self, stream_id: int, content_type: str): @@ -130,6 +141,9 @@ def parse_from_stream_id_and_headers_destructive(stream_id: int, headers: Header for header in headers.extract_headers(header_name)) return event + def __repr__(self): + return f"" + class ResponseEnded(Event): def __init__(self, stream_id: int, status: Status): @@ -154,3 +168,6 @@ def parse_from_stream_id_and_headers_destructive(stream_id: int, headers: Header event.custom_metadata = tuple(header for header_name in list(headers.keys()) for header in headers.extract_headers(header_name)) return event + + def __repr__(self): + return f"" diff --git a/src/purerpc/server.py b/src/purerpc/server.py index 160dc18..64f6230 100644 --- a/src/purerpc/server.py +++ b/src/purerpc/server.py @@ -1,12 +1,12 @@ import sys import inspect import warnings +import socket import collections import functools from multiprocessing import Process -import curio -import curio.meta +import anyio import typing import logging @@ -22,6 +22,7 @@ from .grpclib.connection import GRPCConfiguration +log = logging.getLogger(__name__) BoundRPCMethod = collections.namedtuple("BoundRPCMethod", ["method_fn", "signature"]) @@ -72,6 +73,29 @@ def service(self) -> Service: raise NotImplementedError() +def tcp_server_socket(host, port, family=socket.AF_INET, backlog=100, + reuse_address=True, reuse_port=False): + + raw_socket = socket.socket(family, socket.SOCK_STREAM) + try: + if reuse_address: + raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) + + if reuse_port: + try: + raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, True) + except (AttributeError, OSError) as e: + log.warning('reuse_port=True option failed', exc_info=True) + + raw_socket.bind((host, port)) + raw_socket.listen(backlog) + except Exception: + raw_socket.close() + raise + + return raw_socket + + class Server: def __init__(self, port=50055, num_processes=1): self.port = port @@ -84,14 +108,20 @@ def add_service(self, service): self.services[service.name] = service def _create_socket_and_listen(self): - return curio.tcp_server_socket('', self.port, reuse_address=True, reuse_port=True) + return tcp_server_socket('', self.port, reuse_address=True, reuse_port=True) + + async def _run_async_server(self, raw_socket): + socket = anyio._get_asynclib().Socket(raw_socket) - async def _run_async_server(self, socket): - await curio.network.run_server(socket, lambda c, a: ConnectionHandler(self)(c, a)) + # TODO: resource usage warning + async with anyio._networking.SocketStreamServer(socket, None, False, False) as tcp_server, \ + anyio.create_task_group() as task_group: + async for socket in tcp_server.accept_connections(): + await task_group.spawn(ConnectionHandler(self), socket) def _target_fn(self): socket = self._create_socket_and_listen() - curio.run(self._run_async_server, socket) + anyio.run(self._run_async_server, socket) def serve(self): if self.num_processes == 1: @@ -161,20 +191,17 @@ async def request_received(self, stream: GRPCProtoStream): logging.exception("Got exception while writing response stream") await stream.close(Status(StatusCode.CANCELLED, status_message=repr(sys.exc_info()))) - async def __call__(self, socket, addr): + async def __call__(self, socket): self.grpc_socket = GRPCProtoSocket(self.config, socket) - await self.grpc_socket.initiate_connection() + await self.grpc_socket.initiate_connection(None) + # TODO: resource usage warning # TODO: TaskGroup() uses a lot of memory if the connection is kept for a long time # TODO: do we really need it here? - # task_group = curio.TaskGroup() - # TODO: Should at least pass through GeneratorExit - try: - async for stream in self.grpc_socket.listen(): - await curio.spawn(self.request_received(stream), daemon=True) - except: - logging.exception("Got exception in main dispatch loop") - finally: - # await task_group.join() - # await self.grpc_socket.shutdown() - pass + async with anyio.create_task_group() as task_group: + # TODO: Should at least pass through GeneratorExit + try: + async for stream in self.grpc_socket.listen(): + await task_group.spawn(self.request_received, stream) + except: + logging.exception("Got exception in main dispatch loop") diff --git a/src/purerpc/test_utils.py b/src/purerpc/test_utils.py index 714cc82..0b1fe8f 100644 --- a/src/purerpc/test_utils.py +++ b/src/purerpc/test_utils.py @@ -1,7 +1,7 @@ import unittest import threading import multiprocessing -import curio +import anyio import subprocess import purerpc import tempfile @@ -34,7 +34,7 @@ def target_fn(): queue.put(socket.getsockname()[1]) queue.close() queue.join_thread() - curio.run(server._run_async_server, socket) + anyio.run(server._run_async_server, socket) process = multiprocessing.Process(target=target_fn) process.start() diff --git a/src/purerpc/utils.py b/src/purerpc/utils.py index 655455b..5e60473 100644 --- a/src/purerpc/utils.py +++ b/src/purerpc/utils.py @@ -1,5 +1,5 @@ import platform -import curio +import anyio import pdb import math @@ -26,7 +26,7 @@ async def print_memory_growth_statistics(interval_sec=10.0, set_pdb_trace_every= import objgraph while True: num_iters += 1 - await curio.sleep(interval_sec) + await anyio.sleep(interval_sec) objgraph.show_growth() if num_iters == set_pdb_trace_every: pdb.set_trace() diff --git a/src/purerpc/wrappers.py b/src/purerpc/wrappers.py index 5ef95c7..222a5de 100644 --- a/src/purerpc/wrappers.py +++ b/src/purerpc/wrappers.py @@ -1,4 +1,4 @@ -import curio.meta +import anyio from .grpclib.exceptions import ProtocolError, raise_status from .grpclib.status import Status, StatusCode from purerpc.grpc_proto import GRPCProtoStream @@ -29,7 +29,7 @@ async def stream_to_async_iterator(stream: GRPCProtoStream): async def send_multiple_messages_server(stream, agen): - async with curio.meta.finalize(agen) as tmp: + async with anyio.finalize(agen) as tmp: async for message in tmp: await stream.send_message(message) await stream.close(Status(StatusCode.OK)) @@ -42,7 +42,7 @@ async def send_single_message_server(stream, message): async def send_multiple_messages_client(stream, agen): try: - async with curio.meta.finalize(agen) as tmp: + async with anyio.finalize(agen) as tmp: async for message in tmp: await stream.send_message(message) finally: @@ -75,8 +75,9 @@ async def call_server_stream_stream(func, stream): class ClientStub: - def __init__(self, stream_fn): + def __init__(self, stream_fn, parent_task_group): self._stream_fn = stream_fn + self._parent_task_group = parent_task_group class ClientStubUnaryUnary(ClientStub): @@ -97,7 +98,7 @@ async def __call__(self, message, *, metadata=None): class ClientStubStreamUnary(ClientStub): async def __call__(self, message_aiter, *, metadata=None): stream = await self._stream_fn(metadata=metadata) - await curio.spawn(send_multiple_messages_client, stream, message_aiter, daemon=True) + await self._parent_task_group.spawn(send_multiple_messages_client, stream, message_aiter) return await extract_message_from_singleton_stream(stream) @@ -105,7 +106,7 @@ class ClientStubStreamStream(ClientStub): async def call_aiter(self, message_aiter, metadata): stream = await self._stream_fn(metadata=metadata) if message_aiter is not None: - await curio.spawn(send_multiple_messages_client, stream, message_aiter, daemon=True) + await self._parent_task_group.spawn(send_multiple_messages_client, stream, message_aiter) async for message in stream_to_async_iterator(stream): yield message diff --git a/tests/test_client_server_codegen.py b/tests/test_client_server_codegen.py index 59418be..f6d697d 100644 --- a/tests/test_client_server_codegen.py +++ b/tests/test_client_server_codegen.py @@ -1,12 +1,12 @@ import unittest -import curio +import anyio import grpc import typing import time from .greeter_pb2 import HelloReply, HelloRequest from .greeter_pb2_grpc import GreeterStub, GreeterServicer, add_GreeterServicer_to_server -from purerpc import Service, Stream, Channel, Client from purerpc.test_utils import PureRPCTestCase +import purerpc class TestClientServerCodegen(PureRPCTestCase): @@ -20,7 +20,7 @@ async def SayHello(self, message): async def SayHelloGoodbye(self, message): yield HelloReply(message=f"Hello, {message.name}") - await curio.sleep(0.05) + await anyio.sleep(0.05) yield HelloReply(message=f"Goodbye, {message.name}") async def SayHelloToManyAtOnce(self, messages): @@ -31,7 +31,7 @@ async def SayHelloToManyAtOnce(self, messages): async def SayHelloToMany(self, messages): async for message in messages: - await curio.sleep(0.05) + await anyio.sleep(0.05) yield HelloReply(message="Hello, " + message.name) with self.run_purerpc_service_in_process(Servicer().service) as port: @@ -116,12 +116,11 @@ async def worker(channel): ) async def main(): - channel = Channel("localhost", port) - await channel.connect() - async with curio.TaskGroup() as task_group: - for _ in range(50): - await task_group.spawn(worker(channel)) - curio.run(main) + async with purerpc.insecure_channel("localhost", port) as channel: + async with anyio.create_task_group() as task_group: + for _ in range(50): + await task_group.spawn(worker, channel) + anyio.run(main) def test_purerpc_server_purerpc_client(self): with self.compile_temp_proto("data/greeter.proto") as (_, grpc_module): @@ -134,7 +133,7 @@ async def SayHello(self, message): async def SayHelloGoodbye(self, message): yield HelloReply(message=f"Hello, {message.name}") - await curio.sleep(0.05) + await anyio.sleep(0.05) yield HelloReply(message=f"Goodbye, {message.name}") async def SayHelloToManyAtOnce(self, messages): @@ -145,7 +144,7 @@ async def SayHelloToManyAtOnce(self, messages): async def SayHelloToMany(self, messages): async for message in messages: - await curio.sleep(0.05) + await anyio.sleep(0.05) yield HelloReply(message="Hello, " + message.name) with self.run_purerpc_service_in_process(Servicer().service) as port: @@ -175,13 +174,12 @@ async def worker(channel): ) async def main(): - channel = Channel("localhost", port) - await channel.connect() - async with curio.TaskGroup() as task_group: - for _ in range(50): - await task_group.spawn(worker(channel)) + async with purerpc.insecure_channel("localhost", port) as channel: + async with anyio.create_task_group() as task_group: + for _ in range(50): + await task_group.spawn(worker, channel) - curio.run(main) + anyio.run(main) def test_purerpc_server_purerpc_client_large_payload_many_streams(self): with self.compile_temp_proto("data/greeter.proto") as (_, grpc_module): @@ -202,13 +200,12 @@ async def worker(channel): ) async def main(): - channel = Channel("localhost", port) - await channel.connect() - async with curio.TaskGroup() as task_group: - for _ in range(50): - await task_group.spawn(worker(channel)) + async with purerpc.insecure_channel("localhost", port) as channel: + async with anyio.create_task_group() as task_group: + for _ in range(50): + await task_group.spawn(worker, channel) - curio.run(main) + anyio.run(main) def test_purerpc_server_purerpc_client_large_payload_one_stream(self): with self.compile_temp_proto("data/greeter.proto") as (_, grpc_module): @@ -229,13 +226,12 @@ async def worker(channel): ) async def main(): - channel = Channel("localhost", port) - await channel.connect() - async with curio.TaskGroup() as task_group: - for _ in range(1): - await task_group.spawn(worker(channel)) + async with purerpc.insecure_channel("localhost", port) as channel: + async with anyio.create_task_group() as task_group: + for _ in range(1): + await task_group.spawn(worker, channel) - curio.run(main) + anyio.run(main) def test_purerpc_server_grpc_client_large_payload(self): with self.compile_temp_proto("data/greeter.proto") as (_, grpc_module): @@ -305,13 +301,12 @@ async def gen(): ) async def main(): - channel = Channel("localhost", port) - await channel.connect() - async with curio.TaskGroup() as task_group: - for _ in range(20): - await task_group.spawn(worker(channel)) + async with purerpc.insecure_channel("localhost", port) as channel: + async with anyio.create_task_group() as task_group: + for _ in range(20): + await task_group.spawn(worker, channel) - curio.run(main) + anyio.run(main) def test_purerpc_server_purerpc_client_deadlock(self): @@ -339,10 +334,9 @@ async def gen(): ) async def main(): - channel = Channel("localhost", port) - await channel.connect() - async with curio.TaskGroup() as task_group: - for _ in range(10): - await task_group.spawn(worker(channel)) + async with purerpc.insecure_channel("localhost", port) as channel: + async with anyio.create_task_group() as task_group: + for _ in range(10): + await task_group.spawn(worker, channel) - curio.run(main) + anyio.run(main) diff --git a/tests/test_client_server_errors.py b/tests/test_client_server_errors.py index 846a9da..76e98fc 100644 --- a/tests/test_client_server_errors.py +++ b/tests/test_client_server_errors.py @@ -1,11 +1,12 @@ import unittest -import curio +import anyio import grpc import typing import time from .greeter_pb2 import HelloReply, HelloRequest from .greeter_pb2_grpc import GreeterStub, GreeterServicer, add_GreeterServicer_to_server -from purerpc import Service, Stream, Channel, Client, RpcFailedError + +import purerpc from purerpc.test_utils import PureRPCTestCase @@ -60,20 +61,19 @@ async def generator(): GreeterStub = grpc_module.GreeterStub async def worker(channel): stub = GreeterStub(channel) - with self.assertRaisesRegex(RpcFailedError, r"oops my bad"): + with self.assertRaisesRegex(purerpc.RpcFailedError, r"oops my bad"): await stub.SayHello(HelloRequest(name="World")) aiter = stub.SayHelloToMany(generator()) for _ in range(7): await aiter.__anext__() - with self.assertRaisesRegex(RpcFailedError, r"Lucky 7"): + with self.assertRaisesRegex(purerpc.RpcFailedError, r"Lucky 7"): await aiter.__anext__() async def main(): - channel = Channel("localhost", port) - await channel.connect() - await worker(channel) - curio.run(main) + async with purerpc.insecure_channel("localhost", port) as channel: + await worker(channel) + anyio.run(main) def test_errors_purerpc_server_purerpc_client(self): with self.compile_temp_proto("data/greeter.proto") as (_, grpc_module): @@ -99,18 +99,17 @@ async def generator(): async def worker(channel): stub = GreeterStub(channel) - with self.assertRaisesRegex(RpcFailedError, "oops my bad"): + with self.assertRaisesRegex(purerpc.RpcFailedError, "oops my bad"): await stub.SayHello(HelloRequest(name="World")) aiter = stub.SayHelloToMany(generator()) for _ in range(7): await aiter.__anext__() - with self.assertRaisesRegex(RpcFailedError, r"Lucky 7"): + with self.assertRaisesRegex(purerpc.RpcFailedError, r"Lucky 7"): await aiter.__anext__() async def main(): - channel = Channel("localhost", port) - await channel.connect() - await worker(channel) + async with purerpc.insecure_channel("localhost", port) as channel: + await worker(channel) - curio.run(main) + anyio.run(main) diff --git a/tests/test_client_server_metadata.py b/tests/test_client_server_metadata.py index 379fccd..d30cc30 100644 --- a/tests/test_client_server_metadata.py +++ b/tests/test_client_server_metadata.py @@ -1,5 +1,5 @@ import unittest -import curio +import anyio import grpc import pickle import base64 @@ -7,9 +7,10 @@ import time from .greeter_pb2 import HelloReply, HelloRequest from .greeter_pb2_grpc import GreeterStub, GreeterServicer, add_GreeterServicer_to_server -from purerpc import Service, Stream, Channel, Client, RpcFailedError from purerpc.test_utils import PureRPCTestCase +import purerpc + class TestClientServerMetadata(PureRPCTestCase): def test_metadata_purerpc_server_grpc_client(self): @@ -74,10 +75,9 @@ async def worker(channel): async def main(): - channel = Channel("localhost", port) - await channel.connect() - await worker(channel) - curio.run(main) + async with purerpc.insecure_channel("localhost", port) as channel: + await worker(channel) + anyio.run(main) def test_metadata_purerpc_server_purerpc_client(self): with self.compile_temp_proto("data/greeter.proto") as (_, grpc_module): @@ -109,8 +109,7 @@ async def worker(channel): self.assertEqual(metadata, received_metadata) async def main(): - channel = Channel("localhost", port) - await channel.connect() - await worker(channel) + async with purerpc.insecure_channel("localhost", port) as channel: + await worker(channel) - curio.run(main) + anyio.run(main) diff --git a/tests/test_client_server_simple.py b/tests/test_client_server_simple.py index 023add2..6f094c2 100644 --- a/tests/test_client_server_simple.py +++ b/tests/test_client_server_simple.py @@ -1,11 +1,11 @@ import unittest -import curio +import anyio import grpc import typing import time from .greeter_pb2 import HelloReply, HelloRequest from .greeter_pb2_grpc import GreeterStub, GreeterServicer, add_GreeterServicer_to_server -from purerpc import Service, Stream, Channel, Client +from purerpc import Service, Stream, Client, insecure_channel from purerpc.test_utils import PureRPCTestCase @@ -20,7 +20,7 @@ async def say_hello(message: HelloRequest) -> HelloReply: @service.rpc("SayHelloGoodbye") async def say_hello_goodbye(message: HelloRequest) -> Stream[HelloReply]: yield HelloReply(message=f"Hello, {message.name}") - await curio.sleep(0.05) + await anyio.sleep(0.05) yield HelloReply(message=f"Goodbye, {message.name}") @service.rpc("SayHelloToManyAtOnce") @@ -33,7 +33,7 @@ async def say_hello_to_many_at_once(messages: Stream[HelloRequest]) -> HelloRepl @service.rpc("SayHelloToMany") async def say_hello_to_many(messages: Stream[HelloRequest]) -> Stream[HelloReply]: async for message in messages: - await curio.sleep(0.05) + await anyio.sleep(0.05) yield HelloReply(message="Hello, " + message.name) with self.run_purerpc_service_in_process(service) as port: @@ -134,12 +134,11 @@ async def worker(channel): await test_say_hello_to_many_at_once(client) async def main(): - channel = Channel("localhost", port) - await channel.connect() - async with curio.TaskGroup() as task_group: - for _ in range(50): - await task_group.spawn(worker(channel)) - curio.run(main) + async with insecure_channel("localhost", port) as channel: + async with anyio.create_task_group() as task_group: + for _ in range(25): + await task_group.spawn(worker, channel) + anyio.run(main) def test_purerpc_server_purerpc_client(self): service = Service("Greeter") @@ -151,7 +150,7 @@ async def say_hello(message: HelloRequest) -> HelloReply: @service.rpc("SayHelloGoodbye") async def say_hello_goodbye(message: HelloRequest) -> Stream[HelloReply]: yield HelloReply(message=f"Hello, {message.name}") - await curio.sleep(0.05) + await anyio.sleep(0.05) yield HelloReply(message=f"Goodbye, {message.name}") @service.rpc("SayHelloToManyAtOnce") @@ -164,7 +163,7 @@ async def say_hello_to_many_at_once(messages: Stream[HelloRequest]) -> HelloRepl @service.rpc("SayHelloToMany") async def say_hello_to_many(messages: Stream[HelloRequest]) -> Stream[HelloReply]: async for message in messages: - await curio.sleep(0.05) + await anyio.sleep(0.05) yield HelloReply(message="Hello, " + message.name) with self.run_purerpc_service_in_process(service) as port: @@ -215,9 +214,8 @@ async def worker(channel): await test_say_hello_to_many_at_once(client) async def main(): - channel = Channel("localhost", port) - await channel.connect() - async with curio.TaskGroup() as task_group: - for _ in range(50): - await task_group.spawn(worker(channel)) - curio.run(main) + async with insecure_channel("localhost", port) as channel: + async with anyio.create_task_group() as task_group: + for _ in range(50): + await task_group.spawn(worker, channel) + anyio.run(main) diff --git a/tests/test_protoc_plugin.py b/tests/test_protoc_plugin.py index fff1d29..aa18aef 100644 --- a/tests/test_protoc_plugin.py +++ b/tests/test_protoc_plugin.py @@ -1,4 +1,5 @@ import purerpc +import unittest.mock import purerpc.server from purerpc.test_utils import PureRPCTestCase @@ -22,7 +23,7 @@ def test_plugin(self): self.assertTrue(isinstance(GreeterServicer().service, purerpc.Service)) GreeterStub = grpc_module.GreeterStub - channel = purerpc.Channel("localhost", 0) + channel = unittest.mock.MagicMock() greeter_stub = GreeterStub(channel) self.assertIn("SayHello", dir(greeter_stub)) self.assertTrue(callable(greeter_stub.SayHello)) diff --git a/tests/test_status_codes.py b/tests/test_status_codes.py index 6ec1734..b3983fb 100644 --- a/tests/test_status_codes.py +++ b/tests/test_status_codes.py @@ -1,5 +1,5 @@ import unittest -import curio +import anyio import grpc import typing import time @@ -49,12 +49,11 @@ def SayHelloGoodbye(self, message, context): GreeterStub = grpc_module.GreeterStub async def main(): - channel = Channel("localhost", port) - await channel.connect() - stub = GreeterStub(channel) - with self.assertRaises(UnimplementedError): - await stub.SayHello(HelloRequest(name="World")) - curio.run(main) + async with insecure_channel("localhost", port) as channel: + stub = GreeterStub(channel) + with self.assertRaises(UnimplementedError): + await stub.SayHello(HelloRequest(name="World")) + anyio.run(main) def test_purerpc_server_grpc_client_status_codes(self): def test(error_to_raise, regex_to_check): @@ -101,12 +100,11 @@ async def say_hello(message: HelloRequest) -> HelloReply: GreeterStub = grpc_module.GreeterStub async def main(): - channel = Channel("localhost", port) - await channel.connect() - stub = GreeterStub(channel) - with self.assertRaises(error_to_raise): - await stub.SayHello(HelloRequest(name="World")) - curio.run(main) + async with insecure_channel("localhost", port) as channel: + stub = GreeterStub(channel) + with self.assertRaises(error_to_raise): + await stub.SayHello(HelloRequest(name="World")) + anyio.run(main) test(CancelledError, "CANCELLED") test(UnknownError, "UNKNOWN")