Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: decompressive transcoding improvements #322

Merged
merged 1 commit into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions gcs/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import base64
import datetime
import gzip
import hashlib
import json
import re
Expand Down Expand Up @@ -196,13 +197,15 @@ def init_dict(cls, request, metadata, media, bucket, is_destination):
@classmethod
def init_media(cls, request, bucket):
object_name = request.args.get("name", None)
media = testbench.common.extract_media(request)
if object_name is None:
testbench.error.missing("name", None)
media = testbench.common.extract_media(request)
metadata = {
"bucket": testbench.common.bucket_name_from_proto(bucket.name),
"name": object_name,
"metadata": {"x_emulator_upload": "simple"},
"contentEncoding": request.args.get("contentEncoding", None),
"kmsKeyName": request.args.get("kmsKeyName", None),
}
return cls.init_dict(request, metadata, media, bucket, False)

Expand Down Expand Up @@ -385,11 +388,21 @@ def x_goog_hash_header(self):
hashes.append("md5=%s" % testbench.common.rest_md5_from_proto(cs.md5_hash))
return ",".join(hashes) if len(hashes) != 0 else None

def rest_media(self, request, delay=time.sleep):
def _decompress_on_download(self, request):
"""Returns True if a request requires decompressive transcoding."""
if self.metadata.content_encoding != "gzip":
return False
# If `gzip` appears in the `Accept-Encoding` header then we disable
# decompressive transcoding
return not ("gzip" in request.headers.get("accept-encoding", ""))

def _download_range(self, request, response_payload):
range_header = request.headers.get("range")
response_payload = self.media
length = len(response_payload)
if range_header is None or self._decompress_on_download(request):
return 0, length, length, response_payload
begin = 0
end = len(response_payload)
end = length
if range_header is not None:
m = re.match("bytes=([0-9]+)-([0-9]+)", range_header)
if m:
Expand All @@ -404,12 +417,27 @@ def rest_media(self, request, delay=time.sleep):
if m:
last = int(m.group(1))
response_payload = response_payload[-last:]
return begin, end, length, response_payload

streamer, length, headers = None, len(response_payload), {}
content_range = "bytes %d-%d/%d" % (begin, end - 1, len(self.media))
def rest_media(self, request, delay=time.sleep):
response_payload = (
gzip.decompress(self.media)
if self._decompress_on_download(request)
else self.media
)
begin, end, length, response_payload = self._download_range(
request, response_payload
)
headers = {}
content_range = "bytes %d-%d/%d" % (begin, end - 1, length)

instructions = testbench.common.extract_instruction(request, None)
if instructions == "return-broken-stream":
if instructions is None:

def streamer():
yield response_payload

elif instructions == "return-broken-stream":
request_socket = request.environ.get("gunicorn.socket", None)

def streamer():
Expand Down Expand Up @@ -442,7 +470,7 @@ def streamer():
def streamer():
yield media

elif instructions is not None and instructions.startswith("stall-always"):
elif instructions.startswith("stall-always"):

def streamer():
chunk_size = 16 * 1024
Expand All @@ -462,9 +490,9 @@ def streamer():
time.sleep(10)
yield response_payload[r:chunk_end]

elif instructions is not None and instructions.startswith(
elif instructions.startswith(
"return-503-after-256K"
):
) or instructions.startswith("break-after-256K"):
if begin == 0:
request_socket = request.environ.get("gunicorn.socket", None)

Expand Down Expand Up @@ -507,6 +535,8 @@ def streamer():
yield response_payload

headers["Content-Range"] = content_range
if self._decompress_on_download(request):
headers["x-guploader-response-body-transformations"] = "gunzipped"
headers["x-goog-hash"] = self.x_goog_hash_header()
headers["x-goog-generation"] = self.metadata.generation
headers["x-goog-metageneration"] = self.metadata.metageneration
Expand Down
1 change: 0 additions & 1 deletion testbench/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
common,
csek,
database,
handle_gzip,
grpc_server,
rest_server,
)
Expand Down
8 changes: 7 additions & 1 deletion testbench/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,21 @@ class FakeRequest(types.SimpleNamespace):
"if_metageneration_not_match": "ifMetagenerationNotMatch",
}

_COMMON_HEADERS = {
"range",
"accept-encoding",
}

def __init__(self, **kwargs):
super().__init__(**kwargs)

@classmethod
def init_xml(cls, request):
# Copy any common headers or starting with `x-goog-`
headers = {
key.lower(): value
for key, value in request.headers.items()
if key.lower().startswith("x-goog-") or key.lower() == "range"
if key.lower().startswith("x-goog-") or key.lower() in cls._COMMON_HEADERS
}
args = request.args.to_dict()
args.update(cls.xml_headers_to_json_args(headers))
Expand Down
34 changes: 0 additions & 34 deletions testbench/handle_gzip.py

This file was deleted.

27 changes: 14 additions & 13 deletions testbench/rest_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ def xml_get_object(bucket_name, object_name):
)
response = blob.rest_media(fake_request)
response.headers["x-goog-stored-content-length"] = len(blob.media)
response.headers["x-goog-stored-content-encoding"] = "identity"
encoding = blob.metadata.content_encoding
response.headers["x-goog-stored-content-encoding"] = (
"identity" if encoding is None or encoding == "" else encoding
)
return response


Expand Down Expand Up @@ -1041,18 +1044,16 @@ def delete_resumable_upload(bucket_name):
server = flask.Flask(__name__)
server.debug = False
server.register_error_handler(Exception, testbench.error.RestException.handler)
server.wsgi_app = testbench.handle_gzip.HandleGzipMiddleware(
DispatcherMiddleware(
root,
{
"/httpbin": httpbin.app,
GCS_HANDLER_PATH: gcs,
DOWNLOAD_HANDLER_PATH: download,
UPLOAD_HANDLER_PATH: upload,
PROJECTS_HANDLER_PATH: projects_app,
IAM_HANDLER_PATH: iam_app,
},
)
server.wsgi_app = DispatcherMiddleware(
root,
{
"/httpbin": httpbin.app,
GCS_HANDLER_PATH: gcs,
DOWNLOAD_HANDLER_PATH: download,
UPLOAD_HANDLER_PATH: upload,
PROJECTS_HANDLER_PATH: projects_app,
IAM_HANDLER_PATH: iam_app,
},
)

httpbin.app.register_error_handler(Exception, testbench.error.RestException.handler)
Expand Down
51 changes: 42 additions & 9 deletions tests/format_multipart_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@
import json


def _format_object_metadata_part(metadata):
return "\r\n".join(
[
"Content-Type: application/json; charset=UTF-8",
"",
json.dumps(metadata),
"",
]
)


def format_multipart_upload(metadata, media, content_type="application/octet-stream"):
boundary = "test_separator_deadbeef"
payload = (
("--" + boundary + "\r\n").join(
[
"",
# object metadata "part"
"\r\n".join(
[
"Content-Type: application/json; charset=UTF-8",
"",
json.dumps(metadata),
"",
]
),
_format_object_metadata_part(metadata),
# object media "part"
"\r\n".join(
[
Expand All @@ -51,3 +54,33 @@ def format_multipart_upload(metadata, media, content_type="application/octet-str
+ "--\r\n"
)
return boundary, payload


def format_multipart_upload_bytes(
metadata, media, content_type="application/octet-stream"
):
boundary = "test_separator_deadbeef"
full_separator = b"--" + boundary.encode("utf-8") + b"\r\n"
payload = (
full_separator.join(
[
b"",
# object metadata "part"
_format_object_metadata_part(metadata).encode("utf-8"),
# object media "part"
b"\r\n".join(
[
b"Content-Type: " + content_type.encode("utf-8"),
b"Content-Length: %d" % len(media),
b"",
media,
b"",
]
),
]
)
+ b"--"
+ boundary.encode("utf-8")
+ b"--\r\n"
)
return boundary, payload
49 changes: 0 additions & 49 deletions tests/test_handle_gzip.py

This file was deleted.

Loading