Skip to content

Commit

Permalink
Merge branch 'main' into backup-proto-renaming-kb-id
Browse files Browse the repository at this point in the history
  • Loading branch information
drf7 committed Mar 6, 2025
2 parents 2262378 + 5696dd0 commit 6ddee90
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 26 deletions.
8 changes: 8 additions & 0 deletions nucliadb/src/nucliadb/backups/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,11 @@ class StorageKeys:
class BackupFinishedStream:
name = "backups"
subject = "backups.creation_finished"


class BackupsNatsStream:
name = "ndb-backups"
stream_subjects = ["ndb-backups.>"]
create_subject = "ndb-backups.create"
delete_subject = "ndb-backups.delete"
restore_subject = "ndb-backups.restore"
2 changes: 1 addition & 1 deletion nucliadb/src/nucliadb/backups/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from nucliadb_utils.utilities import get_audit


async def backup_kb_retried(context: ApplicationContext, msg: CreateBackupRequest):
async def backup_kb_task(context: ApplicationContext, msg: CreateBackupRequest):
kbid = msg.kb_id
backup_id = msg.backup_id

Expand Down
7 changes: 5 additions & 2 deletions nucliadb/src/nucliadb/backups/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
from nucliadb.common.context import ApplicationContext


async def delete_backup(context: ApplicationContext, msg: DeleteBackupRequest):
async def delete_backup_task(context: ApplicationContext, msg: DeleteBackupRequest):
"""
Deletes the backup files from the cloud storage.
"""
backup_id = msg.backup_id
await delete_backup(context, msg.backup_id)


async def delete_backup(context: ApplicationContext, backup_id: str):
while True:
deleted = await delete_n(context, backup_id, n=1000)
if deleted == 0:
Expand Down
4 changes: 3 additions & 1 deletion nucliadb/src/nucliadb/backups/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from nucliadb_protos.writer_pb2 import BrokerMessage


async def restore_kb_retried(context: ApplicationContext, msg: RestoreBackupRequest):
async def restore_kb_task(context: ApplicationContext, msg: RestoreBackupRequest):
kbid = msg.kb_id
backup_id = msg.backup_id

Expand Down Expand Up @@ -135,6 +135,8 @@ def __init__(self, download_stream: AsyncIterator[bytes]):
async def read(self, size: int) -> bytes:
while len(self.buffer) < size:
chunk = await self.download_stream.__anext__()
if not chunk:
continue
self.buffer += chunk
result = self.buffer[:size]
self.buffer = self.buffer[size:]
Expand Down
45 changes: 23 additions & 22 deletions nucliadb/src/nucliadb/backups/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
#
from typing import Awaitable, Callable

from nucliadb.backups.create import backup_kb_retried
from nucliadb.backups.const import BackupsNatsStream
from nucliadb.backups.create import backup_kb_task
from nucliadb.backups.delete import delete_backup
from nucliadb.backups.models import CreateBackupRequest, DeleteBackupRequest, RestoreBackupRequest
from nucliadb.backups.restore import restore_kb_retried
from nucliadb.backups.restore import restore_kb_task
from nucliadb.common.context import ApplicationContext
from nucliadb.tasks import create_consumer, create_producer
from nucliadb.tasks.consumer import NatsTaskConsumer
Expand All @@ -32,10 +33,10 @@
def creator_consumer() -> NatsTaskConsumer[CreateBackupRequest]:
consumer: NatsTaskConsumer = create_consumer(
name="backup_creator",
stream="backups",
stream_subjects=["backups.>"],
consumer_subject="backups.create",
callback=backup_kb_retried,
stream=BackupsNatsStream.name,
stream_subjects=BackupsNatsStream.stream_subjects,
consumer_subject=BackupsNatsStream.create_subject,
callback=backup_kb_task,
msg_type=CreateBackupRequest,
max_concurrent_messages=10,
)
Expand All @@ -45,9 +46,9 @@ def creator_consumer() -> NatsTaskConsumer[CreateBackupRequest]:
async def create(kbid: str, backup_id: str) -> None:
producer: NatsTaskProducer[CreateBackupRequest] = create_producer(
name="backup_creator",
stream="backups",
stream_subjects=["backups.>"],
producer_subject="backups.create",
stream=BackupsNatsStream.name,
stream_subjects=BackupsNatsStream.stream_subjects,
producer_subject=BackupsNatsStream.create_subject,
msg_type=CreateBackupRequest,
)
msg = CreateBackupRequest(
Expand All @@ -60,10 +61,10 @@ async def create(kbid: str, backup_id: str) -> None:
def restorer_consumer() -> NatsTaskConsumer[RestoreBackupRequest]:
consumer: NatsTaskConsumer = create_consumer(
name="backup_restorer",
stream="backups",
stream_subjects=["backups.>"],
consumer_subject="backups.restore",
callback=restore_kb_retried,
stream=BackupsNatsStream.name,
stream_subjects=BackupsNatsStream.stream_subjects,
consumer_subject=BackupsNatsStream.restore_subject,
callback=restore_kb_task,
msg_type=RestoreBackupRequest,
max_concurrent_messages=10,
)
Expand All @@ -73,9 +74,9 @@ def restorer_consumer() -> NatsTaskConsumer[RestoreBackupRequest]:
async def restore(kbid: str, backup_id: str) -> None:
producer: NatsTaskProducer[RestoreBackupRequest] = create_producer(
name="backup_restorer",
stream="backups",
stream_subjects=["backups.>"],
producer_subject="backups.restore",
stream=BackupsNatsStream.name,
stream_subjects=BackupsNatsStream.stream_subjects,
producer_subject=BackupsNatsStream.restore_subject,
msg_type=RestoreBackupRequest,
)
msg = RestoreBackupRequest(
Expand All @@ -88,9 +89,9 @@ async def restore(kbid: str, backup_id: str) -> None:
def deleter_consumer() -> NatsTaskConsumer[DeleteBackupRequest]:
consumer: NatsTaskConsumer = create_consumer(
name="backup_deleter",
stream="backups",
stream_subjects=["backups.>"],
consumer_subject="backups.delete",
stream=BackupsNatsStream.name,
stream_subjects=BackupsNatsStream.stream_subjects,
consumer_subject=BackupsNatsStream.delete_subject,
callback=delete_backup,
msg_type=DeleteBackupRequest,
max_concurrent_messages=2,
Expand All @@ -101,9 +102,9 @@ def deleter_consumer() -> NatsTaskConsumer[DeleteBackupRequest]:
async def delete(backup_id: str) -> None:
producer: NatsTaskProducer[DeleteBackupRequest] = create_producer(
name="backup_deleter",
stream="backups",
stream_subjects=["backups.>"],
producer_subject="backups.delete",
stream=BackupsNatsStream.name,
stream_subjects=BackupsNatsStream.stream_subjects,
producer_subject=BackupsNatsStream.delete_subject,
msg_type=DeleteBackupRequest,
)
msg = DeleteBackupRequest(
Expand Down
5 changes: 5 additions & 0 deletions nucliadb/tests/nucliadb/integration/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from nucliadb.backups.const import StorageKeys
from nucliadb.backups.create import backup_kb, get_metadata, set_metadata
from nucliadb.backups.delete import delete_backup
from nucliadb.backups.models import BackupMetadata
from nucliadb.backups.restore import (
get_last_restored_resource_key,
Expand Down Expand Up @@ -150,6 +151,10 @@ async def test_backup(
# Make sure that the restore metadata is cleaned up
assert await get_last_restored_resource_key(context, dst_kb, backup_id) is None

await delete_backup(context, backup_id)

assert await exists_backup(context.blob_storage, backup_id) is False

# Check that the resources were restored
resp = await nucliadb_reader.get(f"/kb/{dst_kb}/resources")
assert resp.status_code == 200
Expand Down

0 comments on commit 6ddee90

Please sign in to comment.