From faeee95d40a3803cb576bb69a5d6e6a12219c4f4 Mon Sep 17 00:00:00 2001 From: Gary Benson Date: Mon, 25 Nov 2024 00:34:45 +0000 Subject: [PATCH] hive-service: Add hive.service.HiveService --- libs/service/hive/service/__init__.py | 1 + libs/service/hive/service/restart_monitor.py | 3 +- libs/service/hive/service/service.py | 60 +++++++++++++++++++ libs/service/hive/service/status.py | 4 +- libs/service/pyproject.toml | 1 + libs/service/requirements.txt | 1 + .../hive/email_receiver/__init__.py | 4 +- .../hive/email_receiver/main.py | 13 ---- .../hive/email_receiver/service.py | 11 ++-- .../hive/matrix_router/__init__.py | 4 +- .../matrix-router/hive/matrix_router/main.py | 18 ------ .../hive/matrix_router/service.py | 19 ++---- .../hive/reading_list_updater/__init__.py | 4 +- .../hive/reading_list_updater/main.py | 13 ---- .../hive/reading_list_updater/service.py | 9 ++- .../hive/service_monitor/__init__.py | 4 +- .../hive/service_monitor/main.py | 16 ----- .../hive/service_monitor/service.py | 13 ++-- .../hive/vane_webui_api/__init__.py | 4 +- .../hive/vane_webui_api/main.py | 13 ---- .../hive/vane_webui_api/service.py | 10 ++-- 21 files changed, 106 insertions(+), 119 deletions(-) create mode 100644 libs/service/hive/service/service.py delete mode 100644 services/email-receiver/hive/email_receiver/main.py delete mode 100644 services/matrix-router/hive/matrix_router/main.py delete mode 100644 services/reading-list-updater/hive/reading_list_updater/main.py delete mode 100644 services/service-monitor/hive/service_monitor/main.py delete mode 100644 services/vane-webui-api/hive/vane_webui_api/main.py diff --git a/libs/service/hive/service/__init__.py b/libs/service/hive/service/__init__.py index dadc0c3..041f0b6 100644 --- a/libs/service/hive/service/__init__.py +++ b/libs/service/hive/service/__init__.py @@ -1,2 +1,3 @@ from .restart_monitor import RestartMonitor +from .service import Service as HiveService from .status import ServiceCondition, ServiceStatus diff --git a/libs/service/hive/service/restart_monitor.py b/libs/service/hive/service/restart_monitor.py index 7f37173..9fccb7d 100644 --- a/libs/service/hive/service/restart_monitor.py +++ b/libs/service/hive/service/restart_monitor.py @@ -8,6 +8,7 @@ from typing import Optional from hive.common.units import MINUTES +from hive.messaging import Channel from .status import ServiceCondition, ServiceStatus @@ -161,7 +162,7 @@ def touch(filename: str): except FileNotFoundError: open(filename, "wb").close() - def report_via_channel(self, channel, **kwargs): + def report_via_channel(self, channel: Channel, **kwargs): """Report this startup via a :class:`hive.messaging.Channel`. """ return self.status.report_via_channel(channel, **kwargs) diff --git a/libs/service/hive/service/service.py b/libs/service/hive/service/service.py new file mode 100644 index 0000000..fc22928 --- /dev/null +++ b/libs/service/hive/service/service.py @@ -0,0 +1,60 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Callable, Optional + +from hive.common import ArgumentParser +from hive.common.functools import once +from hive.messaging import ( + Channel, + Connection, + blocking_connection, + publisher_connection, +) + +from .restart_monitor import RestartMonitor + + +@dataclass +class Service(ABC): + argument_parser: Optional[ArgumentParser] = None + on_channel_open: Optional[Callable[[Channel], None]] = None + + def make_argument_parser(self) -> ArgumentParser: + parser = ArgumentParser() + parser.add_argument( + "--no-monitor", + dest="with_restart_monitor", + action="store_false", + help="run without restart monitoring", + ) + return parser + + def __post_init__(self): + if not self.argument_parser: + self.argument_parser = self.make_argument_parser() + self.args = self.argument_parser.parse_args() + + if getattr(self.args, "with_restart_monitor", True): + rsm = RestartMonitor() + if self.on_channel_open: + raise NotImplementedError + self.on_channel_open = once(rsm.report_via_channel) + + @classmethod + def main(cls, **kwargs): + service = cls(**kwargs) + return service.run() + + @abstractmethod + def run(self): + raise NotImplementedError + + def blocking_connection(self, **kwargs) -> Connection: + return self._connect(blocking_connection, kwargs) + + def publisher_connection(self, **kwargs) -> Connection: + return self._connect(publisher_connection, kwargs) + + def _connect(self, connect, kwargs) -> Connection: + on_channel_open = kwargs.get("on_channel_open", self.on_channel_open) + return connect(on_channel_open=on_channel_open, **kwargs) diff --git a/libs/service/hive/service/status.py b/libs/service/hive/service/status.py index 4942b41..479c35d 100644 --- a/libs/service/hive/service/status.py +++ b/libs/service/hive/service/status.py @@ -6,6 +6,8 @@ from enum import Enum from uuid import UUID, uuid4 +from hive.messaging import Channel + ServiceCondition = Enum("ServiceCondition", "HEALTHY DUBIOUS IN_ERROR") @@ -40,7 +42,7 @@ def _as_dict(self) -> dict[str]: def report_via_channel( self, - channel, + channel: Channel, *, routing_key: str = "service.status", mandatory: bool = False, diff --git a/libs/service/pyproject.toml b/libs/service/pyproject.toml index efac6c9..fa1ee6c 100644 --- a/libs/service/pyproject.toml +++ b/libs/service/pyproject.toml @@ -9,6 +9,7 @@ classifiers = [ ] dependencies = [ "hive-common", + "hive-messaging", ] [project.urls] diff --git a/libs/service/requirements.txt b/libs/service/requirements.txt index caf9b56..e25c546 100644 --- a/libs/service/requirements.txt +++ b/libs/service/requirements.txt @@ -1,2 +1,3 @@ -r ../../requirements.txt -e ../common +-e ../messaging diff --git a/services/email-receiver/hive/email_receiver/__init__.py b/services/email-receiver/hive/email_receiver/__init__.py index c28a133..0661ffa 100644 --- a/services/email-receiver/hive/email_receiver/__init__.py +++ b/services/email-receiver/hive/email_receiver/__init__.py @@ -1 +1,3 @@ -from .main import main +from .service import Service + +main = Service.main diff --git a/services/email-receiver/hive/email_receiver/main.py b/services/email-receiver/hive/email_receiver/main.py deleted file mode 100644 index 789d081..0000000 --- a/services/email-receiver/hive/email_receiver/main.py +++ /dev/null @@ -1,13 +0,0 @@ -import logging - -from hive.common.functools import once -from hive.service import RestartMonitor - -from .service import Service - - -def main(): - logging.basicConfig(level=logging.INFO) - rsm = RestartMonitor() - service = Service(on_channel_open=once(rsm.report_via_channel)) - service.run() diff --git a/services/email-receiver/hive/email_receiver/service.py b/services/email-receiver/hive/email_receiver/service.py index 0f91296..2621ba2 100644 --- a/services/email-receiver/hive/email_receiver/service.py +++ b/services/email-receiver/hive/email_receiver/service.py @@ -4,11 +4,11 @@ from collections.abc import Sequence from dataclasses import dataclass, field from datetime import datetime -from typing import Callable, Optional from hive.common.units import MINUTE from hive.config import read_config -from hive.messaging import publisher_connection, Channel +from hive.messaging import Channel +from hive.service import HiveService from . import imap from .processors import Processor, DEFAULT_PROCESSORS @@ -17,11 +17,10 @@ @dataclass -class Service: +class Service(HiveService): config_key: str = "email" processors: Sequence[Processor] = field(default_factory=list) cycle_time: float = 1 * MINUTE - on_channel_open: Optional[Callable[[Channel], None]] = None def __post_init__(self): config = read_config(self.config_key) @@ -43,9 +42,7 @@ def __post_init__(self): raise RuntimeError("Service not configured") from e def run(self): - with publisher_connection( - on_channel_open=self.on_channel_open - ) as conn: + with self.publisher_connection() as conn: self._run(conn.channel()) def _run(self, channel: Channel): diff --git a/services/matrix-router/hive/matrix_router/__init__.py b/services/matrix-router/hive/matrix_router/__init__.py index c28a133..0661ffa 100644 --- a/services/matrix-router/hive/matrix_router/__init__.py +++ b/services/matrix-router/hive/matrix_router/__init__.py @@ -1 +1,3 @@ -from .main import main +from .service import Service + +main = Service.main diff --git a/services/matrix-router/hive/matrix_router/main.py b/services/matrix-router/hive/matrix_router/main.py deleted file mode 100644 index 524b6d9..0000000 --- a/services/matrix-router/hive/matrix_router/main.py +++ /dev/null @@ -1,18 +0,0 @@ -import logging - -from hive.common.functools import once -from hive.service import RestartMonitor - -from .router import Router -from .service import Service - - -class RouterService(Router, Service): - pass - - -def main(): - logging.basicConfig(level=logging.INFO) - rsm = RestartMonitor() - service = RouterService(on_channel_open=once(rsm.report_via_channel)) - service.run() diff --git a/services/matrix-router/hive/matrix_router/service.py b/services/matrix-router/hive/matrix_router/service.py index 58ab1eb..5ebc64d 100644 --- a/services/matrix-router/hive/matrix_router/service.py +++ b/services/matrix-router/hive/matrix_router/service.py @@ -1,38 +1,29 @@ import json import logging -from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Callable, Optional from pika import BasicProperties from pika.spec import Basic from hive.chat import ChatMessage -from hive.messaging import Channel, blocking_connection +from hive.messaging import Channel +from hive.service import HiveService from . import smoke_test_corpus from .event import MatrixEvent from .reaction_manager import reaction_manager +from .router import Router logger = logging.getLogger(__name__) @dataclass -class Service(ABC): +class Service(Router, HiveService): input_queue: str = "matrix.events.received" event_queues: list[str] | tuple[str] = ( "readinglist.updates", ) - on_channel_open: Optional[Callable[[Channel], None]] = None - - @abstractmethod - def on_matrix_event( - self, - channel: Channel, - event: MatrixEvent, - ): - raise NotImplementedError def _on_matrix_event( self, @@ -61,7 +52,7 @@ def _maybe_forward_to_vane(channel: Channel, event: MatrixEvent): logger.exception("Forwarding %r failed:", event.json()) def run(self): - with blocking_connection(on_channel_open=self.on_channel_open) as conn: + with self.blocking_connection() as conn: channel = conn.channel() for queue in self.event_queues: channel.consume_events( diff --git a/services/reading-list-updater/hive/reading_list_updater/__init__.py b/services/reading-list-updater/hive/reading_list_updater/__init__.py index c28a133..0661ffa 100644 --- a/services/reading-list-updater/hive/reading_list_updater/__init__.py +++ b/services/reading-list-updater/hive/reading_list_updater/__init__.py @@ -1 +1,3 @@ -from .main import main +from .service import Service + +main = Service.main diff --git a/services/reading-list-updater/hive/reading_list_updater/main.py b/services/reading-list-updater/hive/reading_list_updater/main.py deleted file mode 100644 index 789d081..0000000 --- a/services/reading-list-updater/hive/reading_list_updater/main.py +++ /dev/null @@ -1,13 +0,0 @@ -import logging - -from hive.common.functools import once -from hive.service import RestartMonitor - -from .service import Service - - -def main(): - logging.basicConfig(level=logging.INFO) - rsm = RestartMonitor() - service = Service(on_channel_open=once(rsm.report_via_channel)) - service.run() diff --git a/services/reading-list-updater/hive/reading_list_updater/service.py b/services/reading-list-updater/hive/reading_list_updater/service.py index 344fff1..58eea75 100644 --- a/services/reading-list-updater/hive/reading_list_updater/service.py +++ b/services/reading-list-updater/hive/reading_list_updater/service.py @@ -2,13 +2,13 @@ from dataclasses import dataclass from functools import cached_property -from typing import Callable, Optional from pika import BasicProperties from pika.spec import Basic from hive.mediawiki import HiveWiki -from hive.messaging import Channel, blocking_connection +from hive.messaging import Channel +from hive.service import HiveService from .entry import ReadingListEntry @@ -17,10 +17,9 @@ @dataclass -class Service: +class Service(HiveService): update_request_queue: str = "readinglist.update.requests" # input update_event_routing_key: str = "readinglist.updates" # output - on_channel_open: Optional[Callable[[Channel], None]] = None @cached_property def wiki(self): @@ -49,7 +48,7 @@ def on_update_request( logger.warning("EXCEPTION", exc_info=True) def run(self): - with blocking_connection(on_channel_open=self.on_channel_open) as conn: + with self.blocking_connection() as conn: channel = conn.channel() channel.consume_requests( queue=self.update_request_queue, diff --git a/services/service-monitor/hive/service_monitor/__init__.py b/services/service-monitor/hive/service_monitor/__init__.py index c28a133..0661ffa 100644 --- a/services/service-monitor/hive/service_monitor/__init__.py +++ b/services/service-monitor/hive/service_monitor/__init__.py @@ -1 +1,3 @@ -from .main import main +from .service import Service + +main = Service.main diff --git a/services/service-monitor/hive/service_monitor/main.py b/services/service-monitor/hive/service_monitor/main.py deleted file mode 100644 index 23332e2..0000000 --- a/services/service-monitor/hive/service_monitor/main.py +++ /dev/null @@ -1,16 +0,0 @@ -import logging - -from hive.common.functools import once -from hive.service import RestartMonitor - -from .service import Service - - -def main(): - logging.basicConfig(level=logging.INFO) - rsm = RestartMonitor() - service = Service( - service_condition_window=rsm.rapid_restart_cutoff, - on_channel_open=once(rsm.report_via_channel), - ) - service.run() diff --git a/services/service-monitor/hive/service_monitor/service.py b/services/service-monitor/hive/service_monitor/service.py index 81bbca9..074a651 100644 --- a/services/service-monitor/hive/service_monitor/service.py +++ b/services/service-monitor/hive/service_monitor/service.py @@ -5,7 +5,7 @@ from datetime import datetime from functools import cached_property from html import escape -from typing import Callable, Optional +from typing import Optional from uuid import RFC_4122, UUID from pika import BasicProperties @@ -14,16 +14,15 @@ from valkey import Valkey from hive.chat import tell_user -from hive.common.units import MINUTES -from hive.messaging import Channel, blocking_connection +from hive.messaging import Channel +from hive.service import HiveService, RestartMonitor @dataclass -class Service: +class Service(HiveService): service_status_event_queue: str = "service.status" valkey_url: str = "valkey://service-monitor-valkey" - service_condition_window: float = 5 * MINUTES - on_channel_open: Optional[Callable[[Channel], None]] = None + service_condition_window: float = RestartMonitor.rapid_restart_cutoff @cached_property def _valkey(self) -> Valkey: @@ -97,7 +96,7 @@ def _to_html(text: str) -> Optional[str]: return "
".join(map(escape, lines)) def run(self): - with blocking_connection() as conn: + with self.blocking_connection(on_channel_open=None) as conn: channel = conn.channel() try: channel.consume_events( diff --git a/services/vane-webui-api/hive/vane_webui_api/__init__.py b/services/vane-webui-api/hive/vane_webui_api/__init__.py index c28a133..0661ffa 100644 --- a/services/vane-webui-api/hive/vane_webui_api/__init__.py +++ b/services/vane-webui-api/hive/vane_webui_api/__init__.py @@ -1 +1,3 @@ -from .main import main +from .service import Service + +main = Service.main diff --git a/services/vane-webui-api/hive/vane_webui_api/main.py b/services/vane-webui-api/hive/vane_webui_api/main.py deleted file mode 100644 index 789d081..0000000 --- a/services/vane-webui-api/hive/vane_webui_api/main.py +++ /dev/null @@ -1,13 +0,0 @@ -import logging - -from hive.common.functools import once -from hive.service import RestartMonitor - -from .service import Service - - -def main(): - logging.basicConfig(level=logging.INFO) - rsm = RestartMonitor() - service = Service(on_channel_open=once(rsm.report_via_channel)) - service.run() diff --git a/services/vane-webui-api/hive/vane_webui_api/service.py b/services/vane-webui-api/hive/vane_webui_api/service.py index 48abce3..6d1bdfd 100644 --- a/services/vane-webui-api/hive/vane_webui_api/service.py +++ b/services/vane-webui-api/hive/vane_webui_api/service.py @@ -1,25 +1,23 @@ from dataclasses import dataclass -from typing import Callable, Optional from hive.common.socketserver import serving -from hive.messaging import Channel, blocking_connection +from hive.service import HiveService from .server import HTTPServer @dataclass -class Service: +class Service(HiveService): server_address: tuple[str, int | str] = ("", 7224) - on_channel_open: Optional[Callable[[Channel], None]] = None def run(self): - with blocking_connection() as conn: + with self.blocking_connection(on_channel_open=None) as conn: channel = conn.channel() try: server = HTTPServer(self.server_address, channel=channel) finally: if self.on_channel_open: - # deferred so we receive our own (re)start message + # deferred so we receive our own restart monitor message self.on_channel_open(channel) with serving(server): channel.start_consuming()