forked from permitio/fastapi_websocket_rpc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbidirectional_server_example.py
33 lines (27 loc) · 1.04 KB
/
bidirectional_server_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi_websocket_rpc import RpcMethodsBase, WebsocketRPCEndpoint,logger
import random
#Set debug logging
logger.logging_config.set_mode(logger.LoggingModes.UVICORN, logger.logging.DEBUG)
# Methods to expose to the clients
class ConcatServer(RpcMethodsBase):
async def concat(self, a="", b=""):
# allow client to exit after some time after
asyncio.create_task(self.channel.other.allow_exit(delay=random.randint(1,4)))
# return calculated response
return a + b
async def on_connect(channel):
# Wait a bit
await asyncio.sleep(1)
# now tell the client it can start sending us queries
asyncio.create_task(channel.other.allow_queries())
# Init the FAST-API app
app = FastAPI()
# Create an endpoint and load it with the methods to expose
endpoint = WebsocketRPCEndpoint(ConcatServer(), on_connect=[on_connect])
# add the endpoint to the app
endpoint.register_route(app)
# Start the server itself
uvicorn.run(app, host="0.0.0.0", port=9000)