Skip to content

Commit

Permalink
Better
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran committed Mar 7, 2025
1 parent 0d17c27 commit 0667c6b
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 85 deletions.
14 changes: 8 additions & 6 deletions nucliadb/src/nucliadb/backups/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#


from nucliadb.tasks.utils import NatsConsumer, NatsStream


class MaindbKeys:
METADATA = "kbs/{kbid}/backups/{backup_id}"
LAST_RESTORED = "kbs/{kbid}/backup/{backup_id}/last_restored"
Expand All @@ -41,9 +44,8 @@ class BackupFinishedStream:
subject = "backups.creation_finished"


class BackupsNatsStream:
name = "ndb-backups"
stream_subjects = ["ndb-backups.>"]
create_subject = "ndb-backups.create"
delete_subject = "ndb-backups.delete"
restore_subject = "ndb-backups.restore"
class BackupsNatsConfig:
stream = NatsStream(name="ndb-backups", subjects=["ndb-backups.>"])
create_consumer = NatsConsumer(subject="ndb-backups.create", group="ndb-backups-create")
delete_consumer = NatsConsumer(subject="ndb-backups.delete", group="ndb-backups-delete")
restore_consumer = NatsConsumer(subject="ndb-backups.restore", group="ndb-backups-restore")
32 changes: 13 additions & 19 deletions nucliadb/src/nucliadb/backups/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#
from typing import Awaitable, Callable

from nucliadb.backups.const import BackupsNatsStream
from nucliadb.backups.const import BackupsNatsConfig
from nucliadb.backups.create import backup_kb_task
from nucliadb.backups.delete import delete_backup
from nucliadb.backups.models import CreateBackupRequest, DeleteBackupRequest, RestoreBackupRequest
Expand All @@ -33,9 +33,8 @@
def creator_consumer() -> NatsTaskConsumer[CreateBackupRequest]:
consumer: NatsTaskConsumer = create_consumer(
name="backup_creator",
stream=BackupsNatsStream.name,
stream_subjects=BackupsNatsStream.stream_subjects,
consumer_subject=BackupsNatsStream.create_subject,
stream=BackupsNatsConfig.stream,
consumer=BackupsNatsConfig.create_consumer,
callback=backup_kb_task,
msg_type=CreateBackupRequest,
max_concurrent_messages=10,
Expand All @@ -46,9 +45,8 @@ def creator_consumer() -> NatsTaskConsumer[CreateBackupRequest]:
async def create(kbid: str, backup_id: str) -> None:
producer: NatsTaskProducer[CreateBackupRequest] = create_producer(
name="backup_creator",
stream=BackupsNatsStream.name,
stream_subjects=BackupsNatsStream.stream_subjects,
producer_subject=BackupsNatsStream.create_subject,
stream=BackupsNatsConfig.stream,
producer_subject=BackupsNatsConfig.create_consumer.subject,
msg_type=CreateBackupRequest,
)
msg = CreateBackupRequest(
Expand All @@ -61,9 +59,8 @@ async def create(kbid: str, backup_id: str) -> None:
def restorer_consumer() -> NatsTaskConsumer[RestoreBackupRequest]:
consumer: NatsTaskConsumer = create_consumer(
name="backup_restorer",
stream=BackupsNatsStream.name,
stream_subjects=BackupsNatsStream.stream_subjects,
consumer_subject=BackupsNatsStream.restore_subject,
stream=BackupsNatsConfig.stream,
consumer=BackupsNatsConfig.restore_consumer,
callback=restore_kb_task,
msg_type=RestoreBackupRequest,
max_concurrent_messages=10,
Expand All @@ -74,9 +71,8 @@ def restorer_consumer() -> NatsTaskConsumer[RestoreBackupRequest]:
async def restore(kbid: str, backup_id: str) -> None:
producer: NatsTaskProducer[RestoreBackupRequest] = create_producer(
name="backup_restorer",
stream=BackupsNatsStream.name,
stream_subjects=BackupsNatsStream.stream_subjects,
producer_subject=BackupsNatsStream.restore_subject,
stream=BackupsNatsConfig.stream,
producer_subject=BackupsNatsConfig.restore_consumer.subject,
msg_type=RestoreBackupRequest,
)
msg = RestoreBackupRequest(
Expand All @@ -89,9 +85,8 @@ async def restore(kbid: str, backup_id: str) -> None:
def deleter_consumer() -> NatsTaskConsumer[DeleteBackupRequest]:
consumer: NatsTaskConsumer = create_consumer(
name="backup_deleter",
stream=BackupsNatsStream.name,
stream_subjects=BackupsNatsStream.stream_subjects,
consumer_subject=BackupsNatsStream.delete_subject,
stream=BackupsNatsConfig.stream,
consumer=BackupsNatsConfig.delete_consumer,
callback=delete_backup,
msg_type=DeleteBackupRequest,
max_concurrent_messages=2,
Expand All @@ -102,9 +97,8 @@ def deleter_consumer() -> NatsTaskConsumer[DeleteBackupRequest]:
async def delete(backup_id: str) -> None:
producer: NatsTaskProducer[DeleteBackupRequest] = create_producer(
name="backup_deleter",
stream=BackupsNatsStream.name,
stream_subjects=BackupsNatsStream.stream_subjects,
producer_subject=BackupsNatsStream.delete_subject,
stream=BackupsNatsConfig.stream,
producer_subject=BackupsNatsConfig.delete_consumer.subject,
msg_type=DeleteBackupRequest,
)
msg = DeleteBackupRequest(
Expand Down
44 changes: 31 additions & 13 deletions nucliadb/src/nucliadb/export_import/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,36 @@
from nucliadb.tasks import create_consumer, create_producer
from nucliadb.tasks.consumer import NatsTaskConsumer
from nucliadb.tasks.producer import NatsTaskProducer
from nucliadb_utils import const
from nucliadb.tasks.utils import NatsConsumer, NatsStream


class ExportsNatsConfig:
stream = NatsStream(
name="ndb-exports",
subjects=["ndb-exports"],
)
consumer = NatsConsumer(
subject="ndb-exports",
group="ndb-exports",
)


class ImportsNatsConfig:
stream = NatsStream(
name="ndb-imports",
subjects=["ndb-imports"],
)
consumer = NatsConsumer(
subject="ndb-imports",
group="ndb-imports",
)


def get_exports_consumer() -> NatsTaskConsumer[NatsTaskMessage]:
return create_consumer(
name="exports_consumer",
stream=const.Streams.KB_EXPORTS.name,
stream_subjects=[const.Streams.KB_EXPORTS.subject],
consumer_subject=const.Streams.KB_EXPORTS.subject,
stream=ExportsNatsConfig.stream,
consumer=ExportsNatsConfig.consumer,
callback=export_kb_to_blob_storage,
msg_type=NatsTaskMessage,
max_concurrent_messages=10,
Expand All @@ -42,9 +63,8 @@ def get_exports_consumer() -> NatsTaskConsumer[NatsTaskMessage]:
async def get_exports_producer(context: ApplicationContext) -> NatsTaskProducer[NatsTaskMessage]:
producer = create_producer(
name="exports_producer",
stream=const.Streams.KB_EXPORTS.name,
stream_subjects=[const.Streams.KB_EXPORTS.subject],
producer_subject=const.Streams.KB_EXPORTS.subject,
stream=ExportsNatsConfig.stream,
producer_subject=ExportsNatsConfig.consumer.subject,
msg_type=NatsTaskMessage,
)
await producer.initialize(context)
Expand All @@ -54,9 +74,8 @@ async def get_exports_producer(context: ApplicationContext) -> NatsTaskProducer[
def get_imports_consumer() -> NatsTaskConsumer[NatsTaskMessage]:
return create_consumer(
name="imports_consumer",
stream=const.Streams.KB_IMPORTS.name,
stream_subjects=[const.Streams.KB_IMPORTS.subject],
consumer_subject=const.Streams.KB_IMPORTS.subject,
stream=ImportsNatsConfig.stream,
consumer=ImportsNatsConfig.consumer,
callback=import_kb_from_blob_storage,
msg_type=NatsTaskMessage,
max_concurrent_messages=10,
Expand All @@ -66,9 +85,8 @@ def get_imports_consumer() -> NatsTaskConsumer[NatsTaskMessage]:
async def get_imports_producer(context: ApplicationContext) -> NatsTaskProducer[NatsTaskMessage]:
producer = create_producer(
name="imports_producer",
stream=const.Streams.KB_IMPORTS.name,
stream_subjects=[const.Streams.KB_IMPORTS.subject],
producer_subject=const.Streams.KB_IMPORTS.subject,
stream=ImportsNatsConfig.stream,
producer_subject=ImportsNatsConfig.consumer.subject,
msg_type=NatsTaskMessage,
)
await producer.initialize(context)
Expand Down
33 changes: 13 additions & 20 deletions nucliadb/src/nucliadb/tasks/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from nucliadb.common.context import ApplicationContext
from nucliadb.tasks.logger import logger
from nucliadb.tasks.models import Callback, MsgType
from nucliadb.tasks.utils import create_nats_stream_if_not_exists
from nucliadb.tasks.utils import NatsConsumer, NatsStream, create_nats_stream_if_not_exists
from nucliadb_telemetry import errors
from nucliadb_utils.nats import MessageProgressUpdater
from nucliadb_utils.settings import nats_consumer_settings
Expand All @@ -40,17 +40,15 @@ class NatsTaskConsumer(Generic[MsgType]):
def __init__(
self,
name: str,
stream: str,
stream_subjects: list[str],
consumer_subject: str,
stream: NatsStream,
consumer: NatsConsumer,
callback: Callback,
msg_type: Type[MsgType],
max_concurrent_messages: Optional[int] = None,
):
self.name = name
self.stream = stream
self.stream_subjects = stream_subjects
self.consumer_subject = consumer_subject
self.consumer = consumer
self.callback = callback
self.msg_type = msg_type
self.max_concurrent_messages = max_concurrent_messages
Expand All @@ -61,7 +59,7 @@ def __init__(
async def initialize(self, context: ApplicationContext):
self.context = context
await create_nats_stream_if_not_exists(
context, stream_name=self.stream, subjects=self.stream_subjects
context, stream_name=self.stream.name, subjects=self.stream.subjects
)
await self._setup_nats_subscription()
self.initialized = True
Expand All @@ -80,17 +78,15 @@ async def finalize(self):

async def _setup_nats_subscription(self):
# Nats push consumer
stream = self.stream
subject = group = self.consumer_subject
max_ack_pending = (
self.max_concurrent_messages
if self.max_concurrent_messages
else nats_consumer_settings.nats_max_ack_pending
)
self.subscription = await self.context.nats_manager.subscribe(
subject=subject,
queue=group,
stream=stream,
subject=self.consumer.subject,
queue=self.consumer.group,
stream=self.stream.name,
cb=self._subscription_worker_as_task,
subscription_lost_cb=self._setup_nats_subscription,
manual_ack=True,
Expand All @@ -103,7 +99,7 @@ async def _setup_nats_subscription(self):
),
)
logger.info(
f"Subscribed to {subject} on stream {stream}",
f"Subscribed {self.consumer.group} to {self.consumer.subject} on stream {self.stream.name}",
extra={"consumer_name": self.name},
)

Expand Down Expand Up @@ -178,23 +174,20 @@ async def subscription_worker(self, msg: Msg):

def create_consumer(
name: str,
stream: str,
stream_subjects: list[str],
consumer_subject: str,
stream: NatsStream,
consumer: NatsConsumer,
callback: Callback,
msg_type: Type[MsgType],
max_concurrent_messages: Optional[int] = None,
) -> NatsTaskConsumer[MsgType]:
"""
Returns a non-initialized consumer
"""
consumer = NatsTaskConsumer(
return NatsTaskConsumer(
name=name,
stream=stream,
stream_subjects=stream_subjects,
consumer_subject=consumer_subject,
consumer=consumer,
callback=callback,
msg_type=msg_type,
max_concurrent_messages=max_concurrent_messages,
)
return consumer
14 changes: 5 additions & 9 deletions nucliadb/src/nucliadb/tasks/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,20 @@
from nucliadb.common.context import ApplicationContext
from nucliadb.tasks.logger import logger
from nucliadb.tasks.models import MsgType
from nucliadb.tasks.utils import create_nats_stream_if_not_exists
from nucliadb.tasks.utils import NatsStream, create_nats_stream_if_not_exists
from nucliadb_telemetry import errors


class NatsTaskProducer(Generic[MsgType]):
def __init__(
self,
name: str,
stream: str,
stream_subjects: list[str],
stream: NatsStream,
producer_subject: str,
msg_type: Type[MsgType],
):
self.name = name
self.stream = stream
self.stream_subjects = stream_subjects
self.producer_subject = producer_subject
self.msg_type = msg_type
self.context: Optional[ApplicationContext] = None
Expand All @@ -47,8 +45,8 @@ async def initialize(self, context: ApplicationContext):
self.context = context
await create_nats_stream_if_not_exists(
self.context,
self.stream,
subjects=self.stream_subjects,
self.stream.name,
subjects=self.stream.subjects,
)
self.initialized = True

Expand Down Expand Up @@ -81,8 +79,7 @@ async def send(self, msg: MsgType) -> int:

def create_producer(
name: str,
stream: str,
stream_subjects: list[str],
stream: NatsStream,
producer_subject: str,
msg_type: Type[MsgType],
) -> NatsTaskProducer[MsgType]:
Expand All @@ -92,7 +89,6 @@ def create_producer(
producer = NatsTaskProducer[MsgType](
name=name,
stream=stream,
stream_subjects=stream_subjects,
producer_subject=producer_subject,
msg_type=msg_type,
)
Expand Down
14 changes: 14 additions & 0 deletions nucliadb/src/nucliadb/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

from dataclasses import dataclass

import nats

from nucliadb.common.context import ApplicationContext
Expand All @@ -31,3 +33,15 @@ async def create_nats_stream_if_not_exists(
await js.stream_info(stream_name)
except nats.js.errors.NotFoundError:
await js.add_stream(name=stream_name, subjects=subjects)


@dataclass
class NatsStream:
name: str
subjects: list[str]


@dataclass
class NatsConsumer:
subject: str
group: str
22 changes: 4 additions & 18 deletions nucliadb_utils/src/nucliadb_utils/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class PubSubChannels:


class Streams:
"""
NOTE: groups can't contain '.', '*' or '>' characters.
"""

class INGEST:
"""
Writing resource changes go to this steam/consumer group.
Expand All @@ -42,24 +46,6 @@ class INGEST_PROCESSED:
subject = "ndb.consumer.processed"
group = "nucliadb-pull-processed"

class KB_EXPORTS:
"""
Exporting kbs
"""

name = "ndb-exports"
subject = "ndb-exports"
group = "ndb-exports"

class KB_IMPORTS:
"""
Importing kbs
"""

name = "ndb-imports"
subject = "ndb-imports"
group = "ndb-imports"


class Features:
SKIP_EXTERNAL_INDEX = "nucliadb_skip_external_index"
Expand Down

0 comments on commit 0667c6b

Please sign in to comment.