diff --git a/ansible_ai_connect/ai/api/eventbus/__init__.py b/ansible_ai_connect/ai/api/eventbus/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ansible_ai_connect/ai/api/eventbus/sinks.py b/ansible_ai_connect/ai/api/eventbus/sinks.py new file mode 100644 index 000000000..cbbfdbe20 --- /dev/null +++ b/ansible_ai_connect/ai/api/eventbus/sinks.py @@ -0,0 +1,40 @@ +# Copyright Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from django.apps import apps +from django.dispatch import Signal, receiver + +from ansible_ai_connect.ai.api.model_pipelines.pipelines import ( + ChatBotParameters, + ModelPipelineChatBot, +) + +chat_service = Signal() + + +@receiver(chat_service) +def chat_service_receiver( + sender, conversation_id, req_query, req_system_prompt, req_model_id, req_provider, **kwargs +): + llm: ModelPipelineChatBot = apps.get_app_config("ai").get_model_pipeline(ModelPipelineChatBot) + data = llm.invoke( + ChatBotParameters.init( + query=req_query, + system_prompt=req_system_prompt, + model_id=req_model_id or llm.config.model_id, + provider=req_provider, + conversation_id=conversation_id, + ) + ) + return data diff --git a/ansible_ai_connect/ai/api/eventbus/source.py b/ansible_ai_connect/ai/api/eventbus/source.py new file mode 100644 index 000000000..9a1b573a0 --- /dev/null +++ b/ansible_ai_connect/ai/api/eventbus/source.py @@ -0,0 +1,37 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations + +from enum import StrEnum + +from ansible_ai_connect.ai.api.eventbus.sinks import chat_service + + +class EventType(StrEnum): + CHAT = "chat" + + +class EventBus: + + def send(self, event_type: EventType, **kwargs) -> any: + data = None + match event_type: + case "chat": + response = chat_service.send( + event_type, + conversation_id=kwargs["conversation_id"], + req_query=kwargs["query"], + req_system_prompt=kwargs["system_prompt"], + req_model_id=kwargs["model_id"], + req_provider=kwargs["provider"], + ) + data = response[0][1] + return data diff --git a/ansible_ai_connect/ai/api/views.py b/ansible_ai_connect/ai/api/views.py index d6dc976ec..4554473b6 100644 --- a/ansible_ai_connect/ai/api/views.py +++ b/ansible_ai_connect/ai/api/views.py @@ -32,6 +32,7 @@ from rest_framework.views import APIView from ansible_ai_connect.ai.api.aws.exceptions import WcaSecretManagerError +from ansible_ai_connect.ai.api.eventbus.source import EventBus, EventType from ansible_ai_connect.ai.api.exceptions import ( BaseWisdomAPIException, ChatbotInvalidResponseException, @@ -68,7 +69,6 @@ WcaUserTrialExpired, ) from ansible_ai_connect.ai.api.model_pipelines.pipelines import ( - ChatBotParameters, ContentMatchParameters, MetaData, ModelPipelineChatBot, @@ -1007,14 +1007,14 @@ def post(self, request) -> Response: self.event.conversation_id = conversation_id self.event.modelName = self.req_model_id or self.llm.config.model_id - data = self.llm.invoke( - ChatBotParameters.init( - query=req_query, - system_prompt=req_system_prompt, - model_id=self.req_model_id or self.llm.config.model_id, - provider=req_provider, - conversation_id=conversation_id, - ) + event_bus: EventBus = apps.get_app_config("ai").get_event_bus() + data = event_bus.send( + EventType.CHAT, + conversation_id=conversation_id, + query=req_query, + system_prompt=req_system_prompt, + model_id=self.req_model_id, + provider=req_provider, ) response_serializer = ChatResponseSerializer(data=data) diff --git a/ansible_ai_connect/ai/apps.py b/ansible_ai_connect/ai/apps.py index b66ca3ca8..40ef27ce0 100644 --- a/ansible_ai_connect/ai/apps.py +++ b/ansible_ai_connect/ai/apps.py @@ -34,6 +34,7 @@ ) from .api.aws.wca_secret_manager import AWSSecretManager, DummySecretManager +from .api.eventbus.source import EventBus logger = logging.getLogger(__name__) @@ -50,6 +51,7 @@ class AiConfig(AppConfig): _ansible_lint_caller = UNINITIALIZED _reports_postman = UNINITIALIZED _pipeline_factory = UNINITIALIZED + _event_bus = UNINITIALIZED def ready(self) -> None: self._pipeline_factory = ModelPipelineFactory() @@ -58,6 +60,11 @@ def ready(self) -> None: def get_model_pipeline(self, feature: Type[PIPELINE_TYPE]) -> PIPELINE_TYPE: return self._pipeline_factory.get_pipeline(feature) + def get_event_bus(self): + if self._event_bus is UNINITIALIZED: + self._event_bus = EventBus() + return self._event_bus + def get_ari_caller(self): # Django calls apps.ready() when registering INSTALLED_APPS # We can therefore guarantee self.model_mesh_client is not None