Skip to content

Commit

Permalink
email-receiver: Factor out ReadingListProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
gbenson committed Oct 8, 2024
1 parent 1a31e93 commit a7dcade
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 36 deletions.
61 changes: 61 additions & 0 deletions services/email-receiver/hive/email_receiver/processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import logging

from abc import ABC, abstractmethod
from collections.abc import Sequence
from dataclasses import dataclass

from hive.messaging import Channel, UnroutableError

from .imap import ClientConnection as IMAPConn, Message

logger = logging.getLogger(__name__)
d = logger.debug


@dataclass
class Processor(ABC):
mailboxes: Sequence[str]
queue_name: str

@abstractmethod
def process_messages(self, channel: Channel, imap: IMAPConn) -> int:
"""Process messages from the specified mailboxes.
Return the number of messages processed."""
raise NotImplementedError


@dataclass
class ReadingListProcessor(Processor):
queue_name: str = "readinglist.emails.received"

def process_messages(self, channel: Channel, imap: IMAPConn) -> int:
num_processed = 0
for mailbox_name in self.mailboxes:
with imap.select(mailbox_name) as mailbox:
for msg in mailbox.messages:
if not self._process_message(channel, msg):
continue
num_processed += 1
return num_processed

def _process_message(self, channel: Channel, email: Message):
for header in ("To", "Cc", "Bcc"):
if email[header]:
d("Message %s has '%s:' header", email.uid, header)
return False

try:
channel.publish_event(
message=bytes(email),
content_type="message/rfc822",
routing_key=self.queue_name,
mandatory=True,
)
except UnroutableError:
logger.info("Retaining message %s on %s", email.uid, email.server)
return False

d("Message %s queued", email.uid)
email.delete()
d("Message %s marked for deletion", email.uid)
return True
63 changes: 27 additions & 36 deletions services/email-receiver/hive/email_receiver/service.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,47 @@
import logging
import time

from dataclasses import dataclass
from collections.abc import Sequence
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Optional

from hive.common.units import MINUTE
from hive.config import read_config
from hive.messaging import publisher_connection, Channel, UnroutableError
from hive.messaging import publisher_connection, Channel

from . import imap
from .processors import Processor, ReadingListProcessor

logger = logging.getLogger(__name__)
d = logger.debug


@dataclass
class Service:
config_key: str = "email"
queue_name: str = "readinglist.emails.received"
processors: Sequence[Processor] = field(default_factory=list)
cycle_time: float = 1 * MINUTE
on_channel_open: Optional[Callable[[Channel], None]] = None

DEFAULT_PROCESSORS = {
"reading_lists": ReadingListProcessor,
}

def __post_init__(self):
config = read_config(self.config_key)
try:
imap_config = config[self.config_key]["imap"]
self._imap = imap.Client(imap_config)
if self.processors:
return
mbox_config = imap_config["mailboxes"]
self._reading_lists = mbox_config["reading_lists"]
for config_key, cls in self.DEFAULT_PROCESSORS.items():
mailboxes = mbox_config.get(config_key)
if not mailboxes:
continue
self.processors.append(cls(mailboxes))
if not self.processors:
raise RuntimeError("Service not configured")

except KeyError as e:
raise RuntimeError("Service not configured") from e
Expand Down Expand Up @@ -57,34 +70,12 @@ def _main_loop(self, channel: Channel, imap: imap.ClientConnection):
logger.debug("Sleeping for %.4f seconds", sleep_time)
time.sleep(sleep_time)

def _process_messages(self, channel: Channel, imap: imap.ClientConnection):
num_processed = 0
for mailbox_name in self._reading_lists:
with imap.select(mailbox_name) as mailbox:
for msg in mailbox.messages:
if not self._process_message(channel, msg):
continue
num_processed += 1
return num_processed

def _process_message(self, channel: Channel, email: imap.Message):
for header in ("To", "Cc", "Bcc"):
if email[header]:
d("Message %s has '%s:' header", email.uid, header)
return False

try:
channel.publish_event(
message=bytes(email),
content_type="message/rfc822",
routing_key=self.queue_name,
mandatory=True,
)
except UnroutableError:
logger.info("Retaining message %s on %s", email.uid, email.server)
return False

d("Message %s queued", email.uid)
email.delete()
d("Message %s marked for deletion", email.uid)
return True
def _process_messages(
self,
channel: Channel,
imap: imap.ClientConnection,
) -> int:
return sum(
p.process_messages(channel, imap)
for p in self.processors
)

0 comments on commit a7dcade

Please sign in to comment.