-
Notifications
You must be signed in to change notification settings - Fork 351
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* aiohttp * swap req for httpx/aiohttp * (WIP) * add processing on dendrite side that calls stream synapse * add example that works, fix var name * remove print statements * integrate streaming into bittensor * run black and remove examples * drop support for python<3.9 * defer to default deserialize() instead of abstractmethod, make logging stmt debug for dendrite * add client pooling for efficiency * add annotations so we don't have to use string type-hints, run black * Aio merge master (#1507) * add automated submodule sync to setup * add aliases * add aliases * Remove set_adjustment_interval from sudo set command * subtensor config changes * fix wallet create * Fix set hyperparameter * fix transfer command * Revolution security fix jsons (#1506) * replace pickle with jsons to avoid pickle security issues * fix test_syapse.py * use our internal msgpack-numpy that disables pickle * skip git+ reqs in check_compat * graceful error for submodules * remove jsons in favor of json, no benefit as async client (httpx) attempts to internally json deserialize anyway. jsons would be of no benefit * fix tests * remove jsons req * run black * fix readme, remove recurse-submod as it's in setup.py now * remove pep517 note, causes issues --------- Co-authored-by: unconst <jake@bittensor.com> Co-authored-by: Ayden Brewer <dalegribble@riseup.net> * merge master and update reqs * remove trace calls --------- Co-authored-by: unconst <jake@bittensor.com> Co-authored-by: Ayden Brewer <dalegribble@riseup.net>
- Loading branch information
1 parent
4f3efb0
commit 240a438
Showing
7 changed files
with
418 additions
and
37 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
import bittensor | ||
|
||
from starlette.responses import StreamingResponse as _StreamingResponse | ||
from starlette.types import Send | ||
from typing import Callable, Awaitable, List | ||
from pydantic import BaseModel | ||
from abc import ABC, abstractmethod | ||
|
||
|
||
class BTStreamingResponseModel(BaseModel): | ||
""" | ||
BTStreamingResponseModel is a Pydantic model that encapsulates the token streamer callable for Pydantic validation. | ||
It is used within the StreamingSynapse class to create a BTStreamingResponse object, which is responsible for handling | ||
the streaming of tokens. | ||
The token streamer is a callable that takes a send function and returns an awaitable. It is responsible for generating | ||
the content of the streaming response, typically by processing tokens and sending them to the client. | ||
This model ensures that the token streamer conforms to the expected signature and provides a clear interface for | ||
passing the token streamer to the BTStreamingResponse class. | ||
Attributes: | ||
token_streamer: Callable[[Send], Awaitable[None]] | ||
The token streamer callable, which takes a send function (provided by the ASGI server) and returns an awaitable. | ||
It is responsible for generating the content of the streaming response. | ||
""" | ||
|
||
token_streamer: Callable[[Send], Awaitable[None]] | ||
|
||
|
||
class StreamingSynapse(bittensor.Synapse, ABC): | ||
""" | ||
The StreamingSynapse class is designed to be subclassed for handling streaming responses in the Bittensor network. | ||
It provides abstract methods that must be implemented by the subclass to deserialize, process streaming responses, | ||
and extract JSON data. It also includes a method to create a streaming response object. | ||
""" | ||
|
||
class Config: | ||
validate_assignment = True | ||
|
||
class BTStreamingResponse(_StreamingResponse): | ||
""" | ||
BTStreamingResponse is a specialized subclass of the Starlette StreamingResponse designed to handle the streaming | ||
of tokens within the Bittensor network. It is used internally by the StreamingSynapse class to manage the response | ||
streaming process, including sending headers and calling the token streamer provided by the subclass. | ||
This class is not intended to be directly instantiated or modified by developers subclassing StreamingSynapse. | ||
Instead, it is used by the create_streaming_response method to create a response object based on the token streamer | ||
provided by the subclass. | ||
""" | ||
|
||
def __init__(self, model: BTStreamingResponseModel, **kwargs) -> None: | ||
""" | ||
Initializes the BTStreamingResponse with the given token streamer model. | ||
Args: | ||
model: A BTStreamingResponseModel instance containing the token streamer callable, which is responsible | ||
for generating the content of the response. | ||
**kwargs: Additional keyword arguments passed to the parent StreamingResponse class. | ||
""" | ||
super().__init__(content=iter(()), **kwargs) | ||
self.token_streamer = model.token_streamer | ||
|
||
async def stream_response(self, send: Send) -> None: | ||
""" | ||
Asynchronously streams the response by sending headers and calling the token streamer. | ||
This method is responsible for initiating the response by sending the appropriate headers, including the | ||
content type for event-streaming. It then calls the token streamer to generate the content and sends the | ||
response body to the client. | ||
Args: | ||
send: A callable to send the response, provided by the ASGI server. | ||
""" | ||
bittensor.logging.trace("Streaming response.") | ||
|
||
headers = [(b"content-type", b"text/event-stream")] + self.raw_headers | ||
|
||
await send( | ||
{"type": "http.response.start", "status": 200, "headers": headers} | ||
) | ||
|
||
await self.token_streamer(send) | ||
|
||
await send({"type": "http.response.body", "body": b"", "more_body": False}) | ||
|
||
async def __call__(self, scope, receive, send): | ||
""" | ||
Asynchronously calls the stream_response method, allowing the BTStreamingResponse object to be used as an ASGI | ||
application. | ||
This method is part of the ASGI interface and is called by the ASGI server to handle the request and send the | ||
response. It delegates to the stream_response method to perform the actual streaming process. | ||
Args: | ||
scope: The scope of the request, containing information about the client, server, and request itself. | ||
receive: A callable to receive the request, provided by the ASGI server. | ||
send: A callable to send the response, provided by the ASGI server. | ||
""" | ||
await self.stream_response(send) | ||
|
||
@abstractmethod | ||
async def process_streaming_response(self, response): | ||
""" | ||
Abstract method that must be implemented by the subclass. | ||
This method should provide logic to handle the streaming response, such as parsing and accumulating data. | ||
It is called as the response is being streamed from the network, and should be implemented to handle the specific | ||
streaming data format and requirements of the subclass. | ||
Args: | ||
response: The response object to be processed, typically containing chunks of data. | ||
""" | ||
... | ||
|
||
@abstractmethod | ||
def extract_response_json(self, response): | ||
""" | ||
Abstract method that must be implemented by the subclass. | ||
This method should provide logic to extract JSON data from the response, including headers and content. | ||
It is called after the response has been processed and is responsible for retrieving structured data | ||
that can be used by the application. | ||
Args: | ||
response: The response object from which to extract JSON data. | ||
""" | ||
... | ||
|
||
def create_streaming_response( | ||
self, token_streamer: Callable[[Send], Awaitable[None]] | ||
): | ||
""" | ||
Creates a streaming response using the provided token streamer. | ||
This method can be used by the subclass to create a response object that can be sent back to the client. | ||
The token streamer should be implemented to generate the content of the response according to the specific | ||
requirements of the subclass. | ||
Args: | ||
token_streamer: A callable that takes a send function and returns an awaitable. It's responsible for generating the content of the response. | ||
Returns: | ||
BTStreamingResponse: The streaming response object, ready to be sent to the client. | ||
""" | ||
bittensor.logging.trace("Creating streaming response.") | ||
|
||
model_instance = BTStreamingResponseModel(token_streamer=token_streamer) | ||
|
||
return self.BTStreamingResponse(model_instance) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.