Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add object restore functionality #677

Merged
merged 19 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 153 additions & 8 deletions testbench/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import collections
import copy
import datetime
import json
import os
import pathlib
Expand All @@ -37,11 +38,13 @@ def __init__(
rewrites,
retry_tests,
supported_methods,
soft_deleted_objects,
):
self._resources_lock = threading.RLock()
self._buckets = buckets
self._objects = objects
self._live_generations = live_generations
self._soft_deleted_objects = soft_deleted_objects

self._uploads_lock = threading.RLock()
self._uploads = uploads
Expand All @@ -58,14 +61,15 @@ def __init__(

@classmethod
def init(cls):
return cls({}, {}, {}, {}, {}, {}, [])
return cls({}, {}, {}, {}, {}, {}, [], {})

def clear(self):
"""Clear all data except for the supported method list."""
with self._resources_lock:
self._buckets = {}
self._objects = {}
self._live_generations = {}
self._soft_deleted_objects = {}
with self._uploads_lock:
self._uploads = {}
with self._rewrites_lock:
Expand Down Expand Up @@ -101,6 +105,7 @@ def insert_bucket(self, bucket, context):
self._buckets[bucket.metadata.name] = bucket
self._objects[bucket.metadata.name] = {}
self._live_generations[bucket.metadata.name] = {}
self._soft_deleted_objects[bucket.metadata.name] = {}

def list_bucket(self, project_id, prefix, context):
with self._resources_lock:
Expand Down Expand Up @@ -133,6 +138,7 @@ def delete_bucket(self, bucket_name, context, preconditions=[]):
del self._buckets[bucket.metadata.name]
del self._objects[bucket.metadata.name]
del self._live_generations[bucket.metadata.name]
del self._soft_deleted_objects[bucket.metadata.name]

def insert_test_bucket(self):
"""Automatically create a bucket if needed.
Expand Down Expand Up @@ -173,6 +179,7 @@ def __extract_list_object_request_grpc(cls, request):
request.lexicographic_end,
request.include_trailing_delimiter,
request.match_glob,
request.soft_deleted,
)

@classmethod
Expand All @@ -186,6 +193,7 @@ def __extract_list_object_request(cls, request, context):
end_offset = request.args.get("endOffset")
include_trailing_delimiter = request.args.get("includeTrailingDelimiter", False)
match_glob = request.args.get("matchGlob", None)
soft_deleted = request.args.get("softDeleted", False)
return (
delimiter,
prefix,
Expand All @@ -194,6 +202,7 @@ def __extract_list_object_request(cls, request, context):
end_offset,
include_trailing_delimiter,
match_glob,
soft_deleted,
)

def __get_live_generation(self, bucket_name, object_name, context):
Expand All @@ -208,9 +217,65 @@ def __del_live_generation(self, bucket_name, object_name, context):
bucket_key = self.__bucket_key(bucket_name, context)
self._live_generations[bucket_key].pop(object_name, None)

def __soft_delete_object(
self, bucket_name, object_name, blob, retention_duration, context
):
bucket_key = self.__bucket_key(bucket_name, context)
if self._soft_deleted_objects[bucket_key].get(object_name) is None:
self._soft_deleted_objects[bucket_key][object_name] = []
soft_delete_time = datetime.datetime.now(datetime.timezone.utc)
hard_delete_time = soft_delete_time + datetime.timedelta(0, retention_duration)
blob.metadata.soft_delete_time.FromDatetime(soft_delete_time)
blob.metadata.hard_delete_time.FromDatetime(hard_delete_time)
self._soft_deleted_objects[bucket_key][object_name].append(blob)

def __remove_expired_objects_from_soft_delete(
self, bucket_name, object_name, context
):
bucket_key = self.__bucket_key(bucket_name, context)
now = datetime.datetime.now()

if self._soft_deleted_objects[bucket_key].get(object_name) is not None:
self._soft_deleted_objects[bucket_key][object_name] = list(
filter(
lambda blob: now < blob.metadata.hard_delete_time.ToDatetime(),
self._soft_deleted_objects[bucket_key][object_name],
)
)

def __remove_restored_soft_deleted_object(
self, bucket_name, object_name, generation, context
):
bucket_key = self.__bucket_key(bucket_name, context)
if self._soft_deleted_objects[bucket_key].get(object_name) is not None:
self._soft_deleted_objects[bucket_key][object_name] = list(
filter(
lambda blob: blob.metadata.generation == generation,
self._soft_deleted_objects[bucket_key][object_name],
)
cojenco marked this conversation as resolved.
Show resolved Hide resolved
)

def __get_soft_deleted_object(self, bucket_name, object_name, generation, context):
bucket_key = self.__bucket_key(bucket_name, context)
blobs = self._soft_deleted_objects[bucket_key].get(object_name, [])
blob = next(
(blob for blob in blobs if blob.metadata.generation == generation), None
)
if blob is None:
return testbench.error.notfound(object_name, context)
return blob

def __get_all_soft_deleted_objects(self, bucket_name, context):
bucket_key = self.__bucket_key(bucket_name, context)
all_soft_deleted = []
for soft_deleted_list in self._soft_deleted_objects[bucket_key].values():
all_soft_deleted.extend(soft_deleted_list)
ddelgrosso1 marked this conversation as resolved.
Show resolved Hide resolved
return all_soft_deleted

def list_object(self, request, bucket_name, context):
with self._resources_lock:
bucket = self.__get_bucket_for_object(bucket_name, context)
bucket_with_metadata = self.get_bucket(bucket_name, context)
(
delimiter,
prefix,
Expand All @@ -219,14 +284,29 @@ def list_object(self, request, bucket_name, context):
end_offset,
include_trailing_delimiter,
match_glob,
soft_deleted,
) = self.__extract_list_object_request(request, context)
items = []
prefixes = set()
for obj in bucket.values():

if (
soft_deleted
and not bucket_with_metadata.metadata.HasField("soft_delete_policy")
) or (soft_deleted and versions):
return testbench.error.invalid("bad request", context)

objects = bucket.values()
if soft_deleted:
objects = self.__get_all_soft_deleted_objects(bucket_name, context)

for obj in objects:
generation = obj.metadata.generation
name = obj.metadata.name
if not versions and generation != self.__get_live_generation(
bucket_name, name, context
if (
not soft_deleted
and not versions
and generation
!= self.__get_live_generation(bucket_name, name, context)
):
continue
if name.find(prefix) != 0:
Expand Down Expand Up @@ -282,12 +362,27 @@ def __get_object(
return blob, live_generation

def get_object(
self, bucket_name, object_name, context=None, generation=None, preconditions=[]
self,
bucket_name,
object_name,
context=None,
generation=None,
preconditions=[],
soft_deleted=False,
):
with self._resources_lock:
blob, _ = self.__get_object(
bucket_name, object_name, context, generation, preconditions
)
blob = None
if not soft_deleted:
blob, _ = self.__get_object(
bucket_name, object_name, context, generation, preconditions
)
else:
bucket_with_metadata = self.get_bucket(bucket_name, context)
if not bucket_with_metadata.metadata.HasField("soft_delete_policy"):
testbench.error.invalid("SoftDeletePolicyRequired", context)
blob = self.__get_soft_deleted_object(
bucket_name, object_name, int(generation), context
)
# return a snapshot copy of the blob/blob.metadata
if blob is None:
return None
Expand Down Expand Up @@ -336,6 +431,15 @@ def delete_object(
if generation == 0 or live_generation == generation:
self.__del_live_generation(bucket_name, object_name, context)
bucket = self.__get_bucket_for_object(bucket_name, context)
bucket_with_metadata = self.get_bucket(bucket_name, context)
if bucket_with_metadata.metadata.HasField("soft_delete_policy"):
self.__soft_delete_object(
bucket_name,
object_name,
blob,
bucket_with_metadata.metadata.soft_delete_policy.retention_duration.seconds,
context,
)
bucket.pop("%s#%d" % (blob.metadata.name, blob.metadata.generation), None)

def do_update_object(
Expand All @@ -354,6 +458,47 @@ def do_update_object(
)
return update_fn(blob, live_generation)

def restore_object(
self,
bucket_name: str,
object_name: str,
generation: int,
preconditions=[],
context=None,
) -> T:
with self._resources_lock:
bucket_with_metadata = self.get_bucket(bucket_name, context)
if not bucket_with_metadata.metadata.HasField("soft_delete_policy"):
testbench.error.invalid("SoftDeletePolicyRequired", context)
bucket = self.__get_bucket_for_object(bucket_name, context)
blob = bucket.get("%s#%d" % (object_name, generation), None)
if blob is not None:
testbench.error.not_soft_deleted(context)
cojenco marked this conversation as resolved.
Show resolved Hide resolved

self.__remove_expired_objects_from_soft_delete(
bucket_name,
object_name,
context,
)
blob = self.__get_soft_deleted_object(
bucket_name, object_name, generation, context
)
if blob is not None:
blob.metadata.create_time.FromDatetime(
datetime.datetime.now(datetime.timezone.utc)
)
blob.metadata.ClearField("soft_delete_time")
blob.metadata.metageneration = 1
blob.metadata.generation = blob.metadata.generation + 1
if bucket_with_metadata.metadata.autoclass.enabled is True:
blob.metadata.storage_class = "STANDARD"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

self.insert_object(bucket_name, blob, context, preconditions)
self.__remove_restored_soft_deleted_object(
bucket_name, object_name, generation, context
)

return blob

ddelgrosso1 marked this conversation as resolved.
Show resolved Hide resolved
# === UPLOAD === #

def get_upload(self, upload_id, context):
Expand Down
8 changes: 8 additions & 0 deletions testbench/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ def mismatch(
generic(_simple_json_error(msg), rest_code, grpc_code, context)


def not_soft_deleted(
context, rest_code=412, grpc_code=grpc.StatusCode.FAILED_PRECONDITION
):
"""This error is returned when object is not soft deleted but is either live or noncurrent"""
msg = "objectNotSoftDeleted"
generic(_simple_json_error(msg), rest_code, grpc_code, context)


def notchanged(msg, context, rest_code=304, grpc_code=grpc.StatusCode.ABORTED):
"""Error returned when if*NotMatch or If-None-Match pre-conditions fail."""
generic(
Expand Down
8 changes: 8 additions & 0 deletions testbench/grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,14 @@ def update_impl(blob, live_generation) -> storage_pb2.Object:
def __get_bucket(self, bucket_name, context) -> storage_pb2.Bucket:
return self.db.get_bucket(bucket_name, context).metadata

@retry_test(method="storage.objects.restore")
def RestoreObject(self, request, context):
preconditions = testbench.common.make_grpc_preconditions(request)
blob = self.db.restore_object(
request.bucket, request.object, request.generation, preconditions, context
)
return blob.metadata

@retry_test(method="storage.objects.insert")
def WriteObject(self, request_iterator, context):
upload, is_resumable = gcs.upload.Upload.init_write_object_grpc(
Expand Down
25 changes: 23 additions & 2 deletions testbench/rest_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,14 +568,20 @@ def object_delete(bucket_name, object_name):
@gcs.route("/b/<bucket_name>/o/<path:object_name>")
@retry_test(method="storage.objects.get")
def object_get(bucket_name, object_name):
soft_deleted = flask.request.args.get("softDeleted", False, bool)
media = flask.request.args.get("alt", None)
generation = flask.request.args.get("generation", None)
if (soft_deleted and generation is None) or (soft_deleted and media == "media"):
return testbench.error.invalid("invalid request", None)

blob = db.get_object(
bucket_name,
object_name,
generation=flask.request.args.get("generation", None),
generation=generation,
preconditions=testbench.common.make_json_preconditions(flask.request),
context=None,
soft_deleted=soft_deleted,
)
media = flask.request.args.get("alt", None)
if media is None or media == "json":
projection = testbench.common.extract_projection(flask.request, "noAcl", None)
fields = flask.request.args.get("fields", None)
Expand Down Expand Up @@ -773,6 +779,21 @@ def objects_rewrite(src_bucket_name, src_object_name, dst_bucket_name, dst_objec
return response


@gcs.route("/b/<bucket_name>/o/<path:object_name>/restore", methods=["POST"])
@retry_test(method="storage.objects.restore")
def object_restore(bucket_name, object_name):
if flask.request.args.get("generation") is None:
return testbench.error.invalid("generation", None)
blob = db.restore_object(
bucket_name,
object_name,
int(flask.request.args.get("generation")),
testbench.common.make_json_preconditions(flask.request),
)
projection = testbench.common.extract_projection(flask.request, "noAcl", None)
return testbench.common.filter_response_rest(blob.rest_metadata(), projection, None)


# === OBJECT ACCESS CONTROL === #


Expand Down
Loading