Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix custom messenger interface #206

Merged
merged 17 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions dff/messengers/common/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def connect(self, pipeline_runner: PipelineRunnerFunction):
May be used for sending an introduction message or displaying general bot information.

:param pipeline_runner: A function that should return pipeline response to user request;
usually it's a :py:meth:`~Pipeline._run_pipeline(request, ctx_id)` function.
usually it's a :py:meth:`~dff.pipeline.pipeline.pipeline.Pipeline._run_pipeline` function.
:type pipeline_runner: PipelineRunnerFunction
"""
raise NotImplementedError
Expand Down Expand Up @@ -97,7 +97,7 @@ async def connect(
for most cases the loop itself shouldn't be overridden.

:param pipeline_runner: A function that should return pipeline response to user request;
usually it's a :py:meth:`~Pipeline._run_pipeline(request, ctx_id)` function.
usually it's a :py:meth:`~dff.pipeline.pipeline.pipeline.Pipeline._run_pipeline` function.
:type pipeline_runner: PipelineRunnerFunction
:param loop: a function that determines whether polling should be continued;
called in each cycle, should return `True` to continue polling or `False` to stop.
Expand All @@ -124,18 +124,33 @@ def __init__(self):
async def connect(self, pipeline_runner: PipelineRunnerFunction):
self._pipeline_runner = pipeline_runner

async def on_request_async(self, request: Any, ctx_id: Hashable) -> Context:
"""
Method invoked on user input. This method works just like
:py:meth:`~dff.pipeline.pipeline.pipeline.Pipeline._run_pipeline`,
however callback message interface may contain additional functionality (e.g. for external API accessing).
Return context that represents dialog with the user;
`last_response`, `id` and some dialog info can be extracted from there.

:param request: User input.
:param ctx_id: Any unique id that will be associated with dialog between this user and pipeline.
:return: Context that represents dialog with the user.
"""
return await self._pipeline_runner(request, ctx_id)

def on_request(self, request: Any, ctx_id: Hashable) -> Context:
"""
Method invoked on user input. This method works just like :py:meth:`.__call__(request, ctx_id)`,
Method invoked on user input. This method works just like
:py:meth:`~dff.pipeline.pipeline.pipeline.Pipeline._run_pipeline`,
however callback message interface may contain additional functionality (e.g. for external API accessing).
Returns context that represents dialog with the user;
Return context that represents dialog with the user;
`last_response`, `id` and some dialog info can be extracted from there.

:param request: User input.
:param ctx_id: Any unique id that will be associated with dialog between this user and pipeline.
:return: Context that represents dialog with the user.
"""
return asyncio.run(self._pipeline_runner(request, ctx_id))
return asyncio.run(self.on_request_async(request, ctx_id))


class CLIMessengerInterface(PollingMessengerInterface):
Expand Down Expand Up @@ -169,7 +184,7 @@ async def connect(self, pipeline_runner: PipelineRunnerFunction, **kwargs):
The CLIProvider generates new dialog id used to user identification on each `connect` call.

:param pipeline_runner: A function that should return pipeline response to user request;
usually it's a :py:meth:`~Pipeline._run_pipeline(request, ctx_id)` function.
usually it's a :py:meth:`~dff.pipeline.pipeline.pipeline.Pipeline._run_pipeline` function.
:type pipeline_runner: PipelineRunnerFunction
:param \\**kwargs: argument, added for compatibility with super class, it shouldn't be used normally.
"""
Expand Down
2 changes: 1 addition & 1 deletion dff/messengers/telegram/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ async def endpoint():

json_string = request.get_data().decode("utf-8")
update = types.Update.de_json(json_string)
resp = self.on_request(*extract_telegram_request_and_id(update, self.messenger))
resp = await self.on_request_async(*extract_telegram_request_and_id(update, self.messenger))
self.messenger.send_response(resp.id, resp.last_response)
return ""

Expand Down
17 changes: 4 additions & 13 deletions tests/pipeline/test_tutorials.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from tests.test_utils import get_path_from_tests_to_current_dir
from dff.utils.testing import check_happy_path, HAPPY_PATH
from dff.script import Message

dot_path_to_addon = get_path_from_tests_to_current_dir(__file__, separator=".")

Expand All @@ -19,22 +18,14 @@
"4_groups_and_conditions_full",
"5_asynchronous_groups_and_services_basic",
"5_asynchronous_groups_and_services_full",
"6_custom_messenger_interface",
"7_extra_handlers_basic",
"7_extra_handlers_full",
"8_extra_handlers_and_extensions",
"6_extra_handlers_basic",
"6_extra_handlers_full",
"7_extra_handlers_and_extensions",
],
)
def test_tutorials(tutorial_module_name: str):
try:
tutorial_module = importlib.import_module(f"tutorials.{dot_path_to_addon}.{tutorial_module_name}")
except ModuleNotFoundError as e:
pytest.skip(f"dependencies unavailable: {e.msg}")
if tutorial_module_name == "6_custom_messenger_interface":
happy_path = tuple(
(req, Message(misc={"webpage": tutorial_module.construct_webpage_by_response(res.text)}))
for req, res in HAPPY_PATH
)
check_happy_path(tutorial_module.pipeline, happy_path)
else:
check_happy_path(tutorial_module.pipeline, HAPPY_PATH)
check_happy_path(tutorial_module.pipeline, HAPPY_PATH)
67 changes: 62 additions & 5 deletions tutorials/messengers/web_api_interface/1_fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@
"""
# Web API: 1. FastAPI

This tutorial shows how to create an API for DFF using FastAPI.
This tutorial shows how to create an API for DFF using FastAPI and
introduces messenger interfaces.

You can see the result at http://127.0.0.1:8000/docs.

Here, `_run_pipeline` (same as %mddoclink(api,pipeline.pipeline.pipeline,Pipeline.run))
method is used to execute pipeline once.
Here, %mddoclink(api,messengers.common.interface,CallbackMessengerInterface)
is used to process requests.

%mddoclink(api,script.core.message,Message) is used in creating a JSON Schema for the endpoint.
"""

# %pip install dff uvicorn fastapi

# %%
from dff.messengers.common.interface import CallbackMessengerInterface
from dff.script import Message
from dff.pipeline import Pipeline
from dff.utils.testing import TOY_SCRIPT_ARGS, is_interactive_mode
Expand All @@ -23,9 +25,63 @@
from pydantic import BaseModel
from fastapi import FastAPI

# %% [markdown]
"""
Messenger interfaces establish communication between users and the pipeline.
They manage message channel initialization and termination
as well as pipeline execution on every user request.
There are two built-in messenger interface types that can be extended
through inheritance:

* `PollingMessengerInterface` - Starts polling for user requests
in a loop upon initialization,
it has following methods:

* `_request()` - Method that is used to retrieve user requests from a messenger,
should return list of tuples: (user request, unique dialog id).
* `_respond(responses)` - Method that sends responses generated by pipeline
to users through a messenger,
accepts list of dialog `Contexts`.
* `_on_exception(e)` - Method that is called when a critical exception occurs
i.e. exception from context storage or messenger interface, not a service exception.

Such exceptions lead to the termination of the loop.
* `connect(pipeline_runner, loop, timeout)` -
Method that starts the polling loop.

This method is called inside `pipeline.run` method.

It accepts 3 arguments:

* a callback that runs pipeline,
* a function that should return True to continue polling,
* and time to wait between loop executions.

* `CallbackMessengerInterface` - Creates message channel
and provides a callback for pipeline execution,
it has following methods:

* `on_request(request, ctx_id)` or `on_request_async(request, ctx_id)` -
Method that should be called each time
user provides new input to pipeline,
returns dialog Context.
* `connect(pipeline_runner)` - Method that sets `pipeline_runner` as
a function to be called inside `on_request`.

This method is called inside `pipeline.run` method.

You can find API reference for these classes [here](%doclink(api,messengers.common.interface)).

Here the default `CallbackMessengerInterface` is used to setup
communication between the pipeline on the server side and the messenger client.
"""

# %%
pipeline = Pipeline.from_script(*TOY_SCRIPT_ARGS)
messenger_interface = CallbackMessengerInterface()
# CallbackMessengerInterface instantiating the dedicated messenger interface
pipeline = Pipeline.from_script(
*TOY_SCRIPT_ARGS, messenger_interface=messenger_interface
)


# %%
Expand All @@ -42,13 +98,14 @@ async def respond(
user_id: str,
user_message: Message,
):
context = await pipeline._run_pipeline(user_message, user_id)
context = await messenger_interface.on_request_async(user_message, user_id)
return {"user_id": user_id, "response": context.last_response}


# %%
if __name__ == "__main__":
if is_interactive_mode(): # do not run this during doc building
pipeline.run() # runs the messenger_interface.connect method
uvicorn.run(
app,
host="127.0.0.1",
Expand Down
17 changes: 10 additions & 7 deletions tutorials/messengers/web_api_interface/2_websocket_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@
> all inside a long string.
> This, of course, is not optimal and you wouldn't use it for production.

Here, `_run_pipeline` (same as %mddoclink(api,pipeline.pipeline.pipeline,Pipeline.run))
method is used to execute pipeline once.
Here, %mddoclink(api,messengers.common.interface,CallbackMessengerInterface)
is used to process requests.

%mddoclink(api,script.core.message,Message) is used to represent text messages.
"""

# %pip install dff uvicorn fastapi

# %%
from dff.messengers.common.interface import CallbackMessengerInterface
from dff.script import Message
from dff.pipeline import Pipeline
from dff.utils.testing import TOY_SCRIPT, is_interactive_mode
from dff.utils.testing import TOY_SCRIPT_ARGS, is_interactive_mode

import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse


# %%
messenger_interface = CallbackMessengerInterface()
pipeline = Pipeline.from_script(
TOY_SCRIPT,
("greeting_flow", "start_node"),
("greeting_flow", "fallback_node"),
*TOY_SCRIPT_ARGS, messenger_interface=messenger_interface
)


Expand Down Expand Up @@ -93,7 +93,9 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int):
data = await websocket.receive_text()
await websocket.send_text(f"User: {data}")
request = Message(text=data)
context = await pipeline._run_pipeline(request, client_id)
context = await messenger_interface.on_request_async(
request, client_id
)
response = context.last_response.text
if response is not None:
await websocket.send_text(f"Bot: {response}")
Expand All @@ -106,6 +108,7 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int):
# %%
if __name__ == "__main__":
if is_interactive_mode(): # do not run this during doc building
pipeline.run()
uvicorn.run(
app,
host="127.0.0.1",
Expand Down
Loading