Skip to content

Commit

Permalink
matrix-sender: Rewrite but without reactions
Browse files Browse the repository at this point in the history
  • Loading branch information
gbenson committed Dec 2, 2024
1 parent fdf3874 commit 710fb7d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 108 deletions.
1 change: 0 additions & 1 deletion services/matrix-connector/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ services:
- matrix_sender:/var/lib/matrix
command:
- hive-matrix-sender
- --consume=matrix.message.send.requests
secrets:
- rabbitmq.env

Expand Down
160 changes: 53 additions & 107 deletions services/matrix-connector/hive/matrix_connector/sender.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,57 @@
import json
import logging
import os
import re
import shutil
import subprocess

from dataclasses import dataclass
from enum import Enum
from functools import cached_property
from shutil import which

from pika import BasicProperties
from pika.spec import Basic
from valkey import Valkey

from hive.common import ArgumentParser
from hive.chat import ChatMessage
from hive.common.units import SECONDS, MINUTES
from hive.messaging import Channel, blocking_connection
from hive.service import RestartMonitor, ServiceCondition
from hive.messaging import Channel, Message
from hive.service import HiveService

logger = logging.getLogger(__name__)
d = logger.debug

MessageFormat = Enum("MessageFormat", "TEXT HTML MARKDOWN CODE EMOJIZE")

DEFAULT_INPUT_QUEUE = "test.matrix.message.send.requests"


class Sender:
def __init__(self, command: str = "matrix-commander"):
filename = os.path.realpath(command)
if filename is None:
command = shutil.which(command)
self._command = command

def on_send_message_request(
self,
channel: Channel,
method: Basic.Deliver,
properties: BasicProperties,
body: bytes,
):
content_type = properties.content_type
if content_type != "application/json":
raise ValueError(content_type)

payload = json.loads(body)

self.send_messages(
*payload["messages"],
_format=MessageFormat.__members__[
payload["format"].upper()],
)
@dataclass
class Sender(HiveService):
command: str = "matrix-commander"
valkey_url: str = "valkey://matrix-valkey"

def __post_init__(self):
super().__post_init__()
if not os.path.dirname(self.command):
self.command = which(self.command)

@cached_property
def _valkey(self) -> Valkey:
return Valkey.from_url(self.valkey_url)

def _should_forward(self, message: ChatMessage) -> bool:
if message.sender != "hive":
return False
orig_message_id = message.in_reply_to
if not orig_message_id:
return True
event_id = self._valkey.get(f"message:{orig_message_id}:event_id")
return bool(event_id)

def on_chat_message(self, channel: Channel, message: Message):
message = ChatMessage.from_json(message.json())
if not self._should_forward(message):
return
if message.html:
self.send_messages(message.html, "html")
else:
self.send_messages(message.text)

def send_messages(
self,
Expand All @@ -61,7 +65,7 @@ def send_messages(
logger.warning("Nothing to send")
return

command = [self._command]
command = [self.command]
if _format is not MessageFormat.TEXT:
command.append(f"--{_format.name.lower()}")
command.append("--message")
Expand Down Expand Up @@ -92,23 +96,6 @@ def send_messages(
timeout = min(max_timeout, timeout * 2)
d("Timeout is now {timeout} seconds")

def on_send_reaction_request(
self,
channel: Channel,
method: Basic.Deliver,
properties: BasicProperties,
body: bytes,
):
content_type = properties.content_type
if content_type != "application/json":
raise ValueError(content_type)

payload = json.loads(body)
self.send_reaction(
reaction=payload["reaction"],
receiving_event_id=payload["receiver"]["event_id"],
)

def send_reaction(
self,
reaction: str,
Expand Down Expand Up @@ -156,60 +143,19 @@ def send_reaction(
timeout = min(max_timeout, timeout * 2)
d("Timeout is now {timeout} seconds")

def run(self):
with self.blocking_connection(on_channel_open=None) as conn:
channel = conn.channel()
try:
channel.consume_events(
queue="chat.messages",
on_message_callback=self.on_chat_message,
)
finally:
if self.on_channel_open:
# deferred so we receive our own restart monitor message
self.on_channel_open(channel)
channel.start_consuming()

class ReportingRestartMonitor(RestartMonitor):
@property
def status_emoji(self):
return {
ServiceCondition.HEALTHY: "",
ServiceCondition.DUBIOUS: ":white_question_mark:",
}.get(self.status.condition, ":fire:")

def report(self, sender: Sender):
if self.multiple_restarts_logged:
return
messages = self.status.messages
if not messages:
return
replacer = re.compile(r"^Service\b")
messages = [replacer.sub(self.name, msg) for msg in messages]
prefix = self.status_emoji
if prefix:
messages = [f"{prefix} {msg}" for msg in messages]
#try?
sender.send_messages(*messages, _format=MessageFormat.EMOJIZE)
#except?


def main():
parser = ArgumentParser(
description="Publish messages to Hive's Matrix room.",
)
parser.add_argument(
"--consume", dest="queue", default=DEFAULT_INPUT_QUEUE,
help=f"queue to consume [default: {DEFAULT_INPUT_QUEUE}]",
)
args = parser.parse_args()

rsm = ReportingRestartMonitor()
sender = Sender()
rsm.report(sender)

message_queue = args.queue
reaction_queue = message_queue.replace("message", "reaction")
assert reaction_queue != message_queue

with blocking_connection() as conn:
channel = conn.channel()
rsm.report_via_channel(channel)

channel.consume_requests(
queue=message_queue,
on_message_callback=sender.on_send_message_request,
)
channel.consume_requests(
queue=reaction_queue,
on_message_callback=sender.on_send_reaction_request,
)

channel.start_consuming()
main = Sender.main

0 comments on commit 710fb7d

Please sign in to comment.