Skip to content

Commit

Permalink
GC: Collect resources in versioned bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
psrok1 committed Jan 22, 2025
1 parent fc7f0b2 commit 6569f63
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
39 changes: 38 additions & 1 deletion karton/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,21 @@ def list_objects(self, bucket: str) -> List[str]:
objs.append(obj["Key"])
return objs

def list_object_versions(self, bucket: str) -> Dict[str, List[str]]:
"""
List version identifiers of stored resource objects
:param bucket: Bucket name
:return: Dictionary of object version identifiers {key: [version_ids, ...]}
"""
objs = defaultdict(list)
paginator = self.s3.get_paginator("list_object_versions")
for page in paginator.paginate(Bucket=bucket):
for obj in page.get("Versions", list()):
objs[obj["Key"]].append(obj["VersionId"])
for obj in page.get("DeleteMarkers", list()):
objs[obj["Key"]].append(obj["VersionId"])
return dict(objs)

def remove_object(self, bucket: str, object_uid: str) -> None:
"""
Remove resource object from object storage
Expand All @@ -1002,7 +1017,29 @@ def remove_objects(self, bucket: str, object_uids: Iterable[str]) -> None:
:param bucket: Bucket name
:param object_uids: Object identifiers
"""
for delete_objects in chunks([{"Key": uid} for uid in object_uids], 1000):
for delete_objects in chunks([{"Key": uid} for uid in object_uids], 100):
self.s3.delete_objects(Bucket=bucket, Delete={"Objects": delete_objects})

def remove_object_versions(
self, bucket: str, object_versions: Dict[str, List[str]]
) -> None:
"""
Bulk remove resource object versions from object storage
:param bucket: Bucket name
:param object_versions: Object version identifiers
"""
versions = iter(
(uid, version_id)
for uid, versions in object_versions.items()
for version_id in versions
)
deletion_chunks = chunks(
[{"Key": uid, "VersionId": version_id} for uid, version_id in versions],
100,
)
for delete_objects in deletion_chunks:
print(delete_objects)
self.s3.delete_objects(Bucket=bucket, Delete={"Objects": delete_objects})

def check_bucket_exists(self, bucket: str, create: bool = False) -> bool:
Expand Down
6 changes: 3 additions & 3 deletions karton/system/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _log_config(self):
def gc_collect_resources(self) -> None:
# Collects unreferenced resources left in object storage
karton_bucket = self.backend.default_bucket_name
resources_to_remove = set(self.backend.list_objects(karton_bucket))
resources_to_remove = self.backend.list_object_versions(karton_bucket)
# Note: it is important to get list of resources before getting list of tasks!
# Task is created before resource upload to lock the reference to the resource.
tasks = self.backend.iter_all_tasks()
Expand All @@ -85,10 +85,10 @@ def gc_collect_resources(self) -> None:
resource.bucket == karton_bucket
and resource.uid in resources_to_remove
):
resources_to_remove.remove(resource.uid)
del resources_to_remove[resource.uid]
# Remove unreferenced resources
if resources_to_remove:
self.backend.remove_objects(karton_bucket, resources_to_remove)
self.backend.remove_object_versions(karton_bucket, resources_to_remove)

def gc_collect_tasks(self) -> None:
self.log.debug("GC: gc_collect_tasks started")
Expand Down

0 comments on commit 6569f63

Please sign in to comment.