Skip to content

Commit

Permalink
hive-service: Add hive.service.HiveService
Browse files Browse the repository at this point in the history
  • Loading branch information
gbenson committed Nov 26, 2024
1 parent 8d36574 commit faeee95
Show file tree
Hide file tree
Showing 21 changed files with 106 additions and 119 deletions.
1 change: 1 addition & 0 deletions libs/service/hive/service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .restart_monitor import RestartMonitor
from .service import Service as HiveService
from .status import ServiceCondition, ServiceStatus
3 changes: 2 additions & 1 deletion libs/service/hive/service/restart_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Optional

from hive.common.units import MINUTES
from hive.messaging import Channel

from .status import ServiceCondition, ServiceStatus

Expand Down Expand Up @@ -161,7 +162,7 @@ def touch(filename: str):
except FileNotFoundError:
open(filename, "wb").close()

def report_via_channel(self, channel, **kwargs):
def report_via_channel(self, channel: Channel, **kwargs):
"""Report this startup via a :class:`hive.messaging.Channel`.
"""
return self.status.report_via_channel(channel, **kwargs)
60 changes: 60 additions & 0 deletions libs/service/hive/service/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Callable, Optional

from hive.common import ArgumentParser
from hive.common.functools import once
from hive.messaging import (
Channel,
Connection,
blocking_connection,
publisher_connection,
)

from .restart_monitor import RestartMonitor


@dataclass
class Service(ABC):
argument_parser: Optional[ArgumentParser] = None
on_channel_open: Optional[Callable[[Channel], None]] = None

def make_argument_parser(self) -> ArgumentParser:
parser = ArgumentParser()
parser.add_argument(
"--no-monitor",
dest="with_restart_monitor",
action="store_false",
help="run without restart monitoring",
)
return parser

def __post_init__(self):
if not self.argument_parser:
self.argument_parser = self.make_argument_parser()
self.args = self.argument_parser.parse_args()

if getattr(self.args, "with_restart_monitor", True):
rsm = RestartMonitor()
if self.on_channel_open:
raise NotImplementedError
self.on_channel_open = once(rsm.report_via_channel)

@classmethod
def main(cls, **kwargs):
service = cls(**kwargs)
return service.run()

@abstractmethod
def run(self):
raise NotImplementedError

def blocking_connection(self, **kwargs) -> Connection:
return self._connect(blocking_connection, kwargs)

def publisher_connection(self, **kwargs) -> Connection:
return self._connect(publisher_connection, kwargs)

def _connect(self, connect, kwargs) -> Connection:
on_channel_open = kwargs.get("on_channel_open", self.on_channel_open)
return connect(on_channel_open=on_channel_open, **kwargs)
4 changes: 3 additions & 1 deletion libs/service/hive/service/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from enum import Enum
from uuid import UUID, uuid4

from hive.messaging import Channel

ServiceCondition = Enum("ServiceCondition", "HEALTHY DUBIOUS IN_ERROR")


Expand Down Expand Up @@ -40,7 +42,7 @@ def _as_dict(self) -> dict[str]:

def report_via_channel(
self,
channel,
channel: Channel,
*,
routing_key: str = "service.status",
mandatory: bool = False,
Expand Down
1 change: 1 addition & 0 deletions libs/service/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ classifiers = [
]
dependencies = [
"hive-common",
"hive-messaging",
]

[project.urls]
Expand Down
1 change: 1 addition & 0 deletions libs/service/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
-r ../../requirements.txt
-e ../common
-e ../messaging
4 changes: 3 additions & 1 deletion services/email-receiver/hive/email_receiver/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .main import main
from .service import Service

main = Service.main
13 changes: 0 additions & 13 deletions services/email-receiver/hive/email_receiver/main.py

This file was deleted.

11 changes: 4 additions & 7 deletions services/email-receiver/hive/email_receiver/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from collections.abc import Sequence
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Optional

from hive.common.units import MINUTE
from hive.config import read_config
from hive.messaging import publisher_connection, Channel
from hive.messaging import Channel
from hive.service import HiveService

from . import imap
from .processors import Processor, DEFAULT_PROCESSORS
Expand All @@ -17,11 +17,10 @@


@dataclass
class Service:
class Service(HiveService):
config_key: str = "email"
processors: Sequence[Processor] = field(default_factory=list)
cycle_time: float = 1 * MINUTE
on_channel_open: Optional[Callable[[Channel], None]] = None

def __post_init__(self):
config = read_config(self.config_key)
Expand All @@ -43,9 +42,7 @@ def __post_init__(self):
raise RuntimeError("Service not configured") from e

def run(self):
with publisher_connection(
on_channel_open=self.on_channel_open
) as conn:
with self.publisher_connection() as conn:
self._run(conn.channel())

def _run(self, channel: Channel):
Expand Down
4 changes: 3 additions & 1 deletion services/matrix-router/hive/matrix_router/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .main import main
from .service import Service

main = Service.main
18 changes: 0 additions & 18 deletions services/matrix-router/hive/matrix_router/main.py

This file was deleted.

19 changes: 5 additions & 14 deletions services/matrix-router/hive/matrix_router/service.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,29 @@
import json
import logging

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Callable, Optional

from pika import BasicProperties
from pika.spec import Basic

from hive.chat import ChatMessage
from hive.messaging import Channel, blocking_connection
from hive.messaging import Channel
from hive.service import HiveService

from . import smoke_test_corpus
from .event import MatrixEvent
from .reaction_manager import reaction_manager
from .router import Router

logger = logging.getLogger(__name__)


@dataclass
class Service(ABC):
class Service(Router, HiveService):
input_queue: str = "matrix.events.received"
event_queues: list[str] | tuple[str] = (
"readinglist.updates",
)
on_channel_open: Optional[Callable[[Channel], None]] = None

@abstractmethod
def on_matrix_event(
self,
channel: Channel,
event: MatrixEvent,
):
raise NotImplementedError

def _on_matrix_event(
self,
Expand Down Expand Up @@ -61,7 +52,7 @@ def _maybe_forward_to_vane(channel: Channel, event: MatrixEvent):
logger.exception("Forwarding %r failed:", event.json())

def run(self):
with blocking_connection(on_channel_open=self.on_channel_open) as conn:
with self.blocking_connection() as conn:
channel = conn.channel()
for queue in self.event_queues:
channel.consume_events(
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .main import main
from .service import Service

main = Service.main
13 changes: 0 additions & 13 deletions services/reading-list-updater/hive/reading_list_updater/main.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

from dataclasses import dataclass
from functools import cached_property
from typing import Callable, Optional

from pika import BasicProperties
from pika.spec import Basic

from hive.mediawiki import HiveWiki
from hive.messaging import Channel, blocking_connection
from hive.messaging import Channel
from hive.service import HiveService

from .entry import ReadingListEntry

Expand All @@ -17,10 +17,9 @@


@dataclass
class Service:
class Service(HiveService):
update_request_queue: str = "readinglist.update.requests" # input
update_event_routing_key: str = "readinglist.updates" # output
on_channel_open: Optional[Callable[[Channel], None]] = None

@cached_property
def wiki(self):
Expand Down Expand Up @@ -49,7 +48,7 @@ def on_update_request(
logger.warning("EXCEPTION", exc_info=True)

def run(self):
with blocking_connection(on_channel_open=self.on_channel_open) as conn:
with self.blocking_connection() as conn:
channel = conn.channel()
channel.consume_requests(
queue=self.update_request_queue,
Expand Down
4 changes: 3 additions & 1 deletion services/service-monitor/hive/service_monitor/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .main import main
from .service import Service

main = Service.main
16 changes: 0 additions & 16 deletions services/service-monitor/hive/service_monitor/main.py

This file was deleted.

13 changes: 6 additions & 7 deletions services/service-monitor/hive/service_monitor/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime
from functools import cached_property
from html import escape
from typing import Callable, Optional
from typing import Optional
from uuid import RFC_4122, UUID

from pika import BasicProperties
Expand All @@ -14,16 +14,15 @@
from valkey import Valkey

from hive.chat import tell_user
from hive.common.units import MINUTES
from hive.messaging import Channel, blocking_connection
from hive.messaging import Channel
from hive.service import HiveService, RestartMonitor


@dataclass
class Service:
class Service(HiveService):
service_status_event_queue: str = "service.status"
valkey_url: str = "valkey://service-monitor-valkey"
service_condition_window: float = 5 * MINUTES
on_channel_open: Optional[Callable[[Channel], None]] = None
service_condition_window: float = RestartMonitor.rapid_restart_cutoff

@cached_property
def _valkey(self) -> Valkey:
Expand Down Expand Up @@ -97,7 +96,7 @@ def _to_html(text: str) -> Optional[str]:
return "<br>".join(map(escape, lines))

def run(self):
with blocking_connection() as conn:
with self.blocking_connection(on_channel_open=None) as conn:
channel = conn.channel()
try:
channel.consume_events(
Expand Down
4 changes: 3 additions & 1 deletion services/vane-webui-api/hive/vane_webui_api/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .main import main
from .service import Service

main = Service.main
13 changes: 0 additions & 13 deletions services/vane-webui-api/hive/vane_webui_api/main.py

This file was deleted.

Loading

0 comments on commit faeee95

Please sign in to comment.