Skip to content

Commit

Permalink
hive.messaging: Factor Notifier out of hive-tell-user
Browse files Browse the repository at this point in the history
  • Loading branch information
gbenson committed Sep 20, 2024
1 parent 58b1e77 commit f576183
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 7 deletions.
3 changes: 2 additions & 1 deletion libs/messaging/hive/messaging/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from .channel import Channel
from .channel import Channel, Notifier
from .connection import Connection
from .message_bus import MessageBus, Producer

DEFAULT_MESSAGE_BUS = MessageBus()

blocking_connection = DEFAULT_MESSAGE_BUS.blocking_connection
send_to_queue = DEFAULT_MESSAGE_BUS.send_to_queue
tell_user = DEFAULT_MESSAGE_BUS.tell_user

producer_connection = DEFAULT_MESSAGE_BUS.producer_connection
12 changes: 11 additions & 1 deletion libs/messaging/hive/messaging/channel.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
import json

from typing import Optional
from functools import cached_property
from typing import Callable, Optional

from pika import BasicProperties, DeliveryMode

from .wrapper import WrappedPikaThing
from .channel_services import Notifier


class Channel(WrappedPikaThing):
@cached_property
def notifier(self) -> Notifier:
return Notifier(self)

@cached_property
def tell_user(self) -> Callable:
return self.notifier.tell_user

def send_to_queue(
self,
queue: str,
Expand Down
1 change: 1 addition & 0 deletions libs/messaging/hive/messaging/channel_services/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .notifier import Notifier
21 changes: 21 additions & 0 deletions libs/messaging/hive/messaging/channel_services/channel_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from __future__ import annotations

import weakref

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from ..channel import Channel


class ChannelService:
def __init__(self, channel: Channel):
self.channel = weakref.proxy(channel)

def _declared_queue(self, queue, **kwargs) -> str:
"""Declare a queue, returning its name.
Intended to be wrapped in a cached_property.
"""
self.channel.queue_declare(queue, **kwargs)
return queue
18 changes: 18 additions & 0 deletions libs/messaging/hive/messaging/channel_services/notifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from functools import cached_property

from .channel_service import ChannelService


class Notifier(ChannelService):
@cached_property
def outbound_queue(self) -> str:
return self._declared_queue(
"matrix.messages.outgoing",
durable=True,
)

def tell_user(self, message: str, format: str = "text"):
self.channel.send_to_queue(self.outbound_queue, {
"format": format,
"messages": [message],
})
7 changes: 2 additions & 5 deletions libs/messaging/hive/messaging/commands/tell_user.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from hive.common import ArgumentParser

from .. import send_to_queue
from .. import tell_user


class TellUserArgumentParser(ArgumentParser):
Expand Down Expand Up @@ -28,7 +28,4 @@ def main():
parser.add_format_argument(format)
args = parser.parse_args()

send_to_queue("matrix.messages.outgoing", {
"format": args.format,
"messages": [args.message],
})
tell_user(args.message, format=args.format)
5 changes: 5 additions & 0 deletions libs/messaging/hive/messaging/message_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ def send_to_queue(self, queue: str, *args, **kwargs):
)
return channel.send_to_queue(queue, *args, **kwargs)

def tell_user(self, *args, **kwargs):
with self.blocking_connection(connection_attempts=1) as conn:
channel = conn.channel()
return channel.tell_user(*args, **kwargs)

# Streams

def stream_connection_parameters(self, *, port: int = 5552, **kwargs):
Expand Down

0 comments on commit f576183

Please sign in to comment.