From d3844e1bd2c40d231b1892ce7f07ac391155c0b8 Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Sun, 1 May 2022 15:06:41 +0000 Subject: [PATCH] fix: decompressive transcoding improvements Store the data in compressed form (if applicable), I believe this is a better approach as it: - Makes the `size` and checksum attributes match the behavior in the service. - It enables support for `Accept-Encoding`, we can return compressed data. - It makes the gRPC case match the service behavior, i.e., always return compressed data. --- gcs/object.py | 50 ++++-- testbench/__init__.py | 1 - testbench/common.py | 8 +- testbench/handle_gzip.py | 34 ----- testbench/rest_server.py | 27 ++-- tests/format_multipart_upload.py | 51 +++++-- tests/test_handle_gzip.py | 49 ------ tests/test_testbench_object_gzip.py | 227 ++++++++++++++++++++++++++++ 8 files changed, 330 insertions(+), 117 deletions(-) delete mode 100644 testbench/handle_gzip.py delete mode 100644 tests/test_handle_gzip.py create mode 100644 tests/test_testbench_object_gzip.py diff --git a/gcs/object.py b/gcs/object.py index b70a1ba4..5aaf13ce 100644 --- a/gcs/object.py +++ b/gcs/object.py @@ -16,6 +16,7 @@ import base64 import datetime +import gzip import hashlib import json import re @@ -196,13 +197,15 @@ def init_dict(cls, request, metadata, media, bucket, is_destination): @classmethod def init_media(cls, request, bucket): object_name = request.args.get("name", None) - media = testbench.common.extract_media(request) if object_name is None: testbench.error.missing("name", None) + media = testbench.common.extract_media(request) metadata = { "bucket": testbench.common.bucket_name_from_proto(bucket.name), "name": object_name, "metadata": {"x_emulator_upload": "simple"}, + "contentEncoding": request.args.get("contentEncoding", None), + "kmsKeyName": request.args.get("kmsKeyName", None), } return cls.init_dict(request, metadata, media, bucket, False) @@ -385,11 +388,21 @@ def x_goog_hash_header(self): hashes.append("md5=%s" % testbench.common.rest_md5_from_proto(cs.md5_hash)) return ",".join(hashes) if len(hashes) != 0 else None - def rest_media(self, request, delay=time.sleep): + def _decompress_on_download(self, request): + """Returns True if a request requires decompressive transcoding.""" + if self.metadata.content_encoding != "gzip": + return False + # If `gzip` appears in the `Accept-Encoding` header then we disable + # decompressive transcoding + return not ("gzip" in request.headers.get("accept-encoding", "")) + + def _download_range(self, request, response_payload): range_header = request.headers.get("range") - response_payload = self.media + length = len(response_payload) + if range_header is None or self._decompress_on_download(request): + return 0, length, length, response_payload begin = 0 - end = len(response_payload) + end = length if range_header is not None: m = re.match("bytes=([0-9]+)-([0-9]+)", range_header) if m: @@ -404,12 +417,27 @@ def rest_media(self, request, delay=time.sleep): if m: last = int(m.group(1)) response_payload = response_payload[-last:] + return begin, end, length, response_payload - streamer, length, headers = None, len(response_payload), {} - content_range = "bytes %d-%d/%d" % (begin, end - 1, len(self.media)) + def rest_media(self, request, delay=time.sleep): + response_payload = ( + gzip.decompress(self.media) + if self._decompress_on_download(request) + else self.media + ) + begin, end, length, response_payload = self._download_range( + request, response_payload + ) + headers = {} + content_range = "bytes %d-%d/%d" % (begin, end - 1, length) instructions = testbench.common.extract_instruction(request, None) - if instructions == "return-broken-stream": + if instructions is None: + + def streamer(): + yield response_payload + + elif instructions == "return-broken-stream": request_socket = request.environ.get("gunicorn.socket", None) def streamer(): @@ -442,7 +470,7 @@ def streamer(): def streamer(): yield media - elif instructions is not None and instructions.startswith("stall-always"): + elif instructions.startswith("stall-always"): def streamer(): chunk_size = 16 * 1024 @@ -462,9 +490,9 @@ def streamer(): time.sleep(10) yield response_payload[r:chunk_end] - elif instructions is not None and instructions.startswith( + elif instructions.startswith( "return-503-after-256K" - ): + ) or instructions.startswith("break-after-256K"): if begin == 0: request_socket = request.environ.get("gunicorn.socket", None) @@ -507,6 +535,8 @@ def streamer(): yield response_payload headers["Content-Range"] = content_range + if self._decompress_on_download(request): + headers["x-guploader-response-body-transformations"] = "gunzipped" headers["x-goog-hash"] = self.x_goog_hash_header() headers["x-goog-generation"] = self.metadata.generation headers["x-goog-metageneration"] = self.metadata.metageneration diff --git a/testbench/__init__.py b/testbench/__init__.py index bca17f70..f33b28e7 100644 --- a/testbench/__init__.py +++ b/testbench/__init__.py @@ -18,7 +18,6 @@ common, csek, database, - handle_gzip, grpc_server, rest_server, ) diff --git a/testbench/common.py b/testbench/common.py index 7c94a17a..db5844ad 100644 --- a/testbench/common.py +++ b/testbench/common.py @@ -79,15 +79,21 @@ class FakeRequest(types.SimpleNamespace): "if_metageneration_not_match": "ifMetagenerationNotMatch", } + _COMMON_HEADERS = { + "range", + "accept-encoding", + } + def __init__(self, **kwargs): super().__init__(**kwargs) @classmethod def init_xml(cls, request): + # Copy any common headers or starting with `x-goog-` headers = { key.lower(): value for key, value in request.headers.items() - if key.lower().startswith("x-goog-") or key.lower() == "range" + if key.lower().startswith("x-goog-") or key.lower() in cls._COMMON_HEADERS } args = request.args.to_dict() args.update(cls.xml_headers_to_json_args(headers)) diff --git a/testbench/handle_gzip.py b/testbench/handle_gzip.py deleted file mode 100644 index 615b0bb5..00000000 --- a/testbench/handle_gzip.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import io -import gzip - -from werkzeug.wrappers import Request - - -class HandleGzipMiddleware: - """ - Handle decompressing requests which contain header Content-Encoding: gzip - """ - - def __init__(self, app): - self.app = app - - def __call__(self, environ, start_response): - request = Request(environ) - if request.headers.get("Content-Encoding", "") == "gzip": - request.data = gzip.decompress(request.data) - request.environ["wsgi.input"] = io.BytesIO(request.data) - return self.app(environ, start_response) diff --git a/testbench/rest_server.py b/testbench/rest_server.py index 8dcc8861..e7a7c97c 100644 --- a/testbench/rest_server.py +++ b/testbench/rest_server.py @@ -86,7 +86,10 @@ def xml_get_object(bucket_name, object_name): ) response = blob.rest_media(fake_request) response.headers["x-goog-stored-content-length"] = len(blob.media) - response.headers["x-goog-stored-content-encoding"] = "identity" + encoding = blob.metadata.content_encoding + response.headers["x-goog-stored-content-encoding"] = ( + "identity" if encoding is None or encoding == "" else encoding + ) return response @@ -1041,18 +1044,16 @@ def delete_resumable_upload(bucket_name): server = flask.Flask(__name__) server.debug = False server.register_error_handler(Exception, testbench.error.RestException.handler) -server.wsgi_app = testbench.handle_gzip.HandleGzipMiddleware( - DispatcherMiddleware( - root, - { - "/httpbin": httpbin.app, - GCS_HANDLER_PATH: gcs, - DOWNLOAD_HANDLER_PATH: download, - UPLOAD_HANDLER_PATH: upload, - PROJECTS_HANDLER_PATH: projects_app, - IAM_HANDLER_PATH: iam_app, - }, - ) +server.wsgi_app = DispatcherMiddleware( + root, + { + "/httpbin": httpbin.app, + GCS_HANDLER_PATH: gcs, + DOWNLOAD_HANDLER_PATH: download, + UPLOAD_HANDLER_PATH: upload, + PROJECTS_HANDLER_PATH: projects_app, + IAM_HANDLER_PATH: iam_app, + }, ) httpbin.app.register_error_handler(Exception, testbench.error.RestException.handler) diff --git a/tests/format_multipart_upload.py b/tests/format_multipart_upload.py index 48ac6099..85015fbc 100644 --- a/tests/format_multipart_upload.py +++ b/tests/format_multipart_upload.py @@ -19,21 +19,24 @@ import json +def _format_object_metadata_part(metadata): + return "\r\n".join( + [ + "Content-Type: application/json; charset=UTF-8", + "", + json.dumps(metadata), + "", + ] + ) + + def format_multipart_upload(metadata, media, content_type="application/octet-stream"): boundary = "test_separator_deadbeef" payload = ( ("--" + boundary + "\r\n").join( [ "", - # object metadata "part" - "\r\n".join( - [ - "Content-Type: application/json; charset=UTF-8", - "", - json.dumps(metadata), - "", - ] - ), + _format_object_metadata_part(metadata), # object media "part" "\r\n".join( [ @@ -51,3 +54,33 @@ def format_multipart_upload(metadata, media, content_type="application/octet-str + "--\r\n" ) return boundary, payload + + +def format_multipart_upload_bytes( + metadata, media, content_type="application/octet-stream" +): + boundary = "test_separator_deadbeef" + full_separator = b"--" + boundary.encode("utf-8") + b"\r\n" + payload = ( + full_separator.join( + [ + b"", + # object metadata "part" + _format_object_metadata_part(metadata).encode("utf-8"), + # object media "part" + b"\r\n".join( + [ + b"Content-Type: " + content_type.encode("utf-8"), + b"Content-Length: %d" % len(media), + b"", + media, + b"", + ] + ), + ] + ) + + b"--" + + boundary.encode("utf-8") + + b"--\r\n" + ) + return boundary, payload diff --git a/tests/test_handle_gzip.py b/tests/test_handle_gzip.py deleted file mode 100644 index 3ae6a2c0..00000000 --- a/tests/test_handle_gzip.py +++ /dev/null @@ -1,49 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Unit tests for testbench.handle_gzip.""" - -import gzip -import unittest - -from werkzeug.test import create_environ - -import testbench - - -class TestHandleGzip(unittest.TestCase): - def test_handle_decompressing(self): - plain_text = b"hello world" - compressed_text = gzip.compress(plain_text) - environ = create_environ( - base_url="http://localhost:8080", - content_length=len(compressed_text), - data=compressed_text, - content_type="application/octet-stream", - method="GET", - headers={"Content-Encoding": "gzip"}, - ) - - def passthrough_fn(environ, _): - return environ - - middleware = testbench.handle_gzip.HandleGzipMiddleware(passthrough_fn) - decompressed_environ = middleware(environ, None) - self.assertEqual(decompressed_environ["werkzeug.request"].data, plain_text) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_testbench_object_gzip.py b/tests/test_testbench_object_gzip.py new file mode 100644 index 00000000..c0478cfa --- /dev/null +++ b/tests/test_testbench_object_gzip.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python3 +# +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit test for downloading gzip-ed objects.""" + +import gzip +import json +import os +import re +import unittest + +from testbench import rest_server +from tests.format_multipart_upload import format_multipart_upload_bytes + + +UPLOAD_QUANTUM = 256 * 1024 + + +class TestTestbenchObjectGzip(unittest.TestCase): + def setUp(self): + rest_server.db.clear() + rest_server.server.config["PREFERRED_URL_SCHEME"] = "https" + rest_server.server.config["SERVER_NAME"] = "storage.googleapis.com" + rest_server.root.config["PREFERRED_URL_SCHEME"] = "https" + rest_server.root.config["SERVER_NAME"] = "storage.googleapis.com" + self.client = rest_server.server.test_client(allow_subdomain_redirects=True) + # Avoid magic buckets in the test + os.environ.pop("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME", None) + + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + + def _insert_compressed_object(self, name): + media = "How vexingly quick daft zebras jump!" + compressed = gzip.compress(media.encode("utf-8")) + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={ + "name": name, + "uploadType": "media", + "contentEncoding": "gzip", + }, + content_type="application/octet-stream", + data=compressed, + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + insert_rest = json.loads(response.data) + self.assertEqual(insert_rest.get("kind"), "storage#object") + self.assertEqual(insert_rest.get("contentEncoding", ""), "gzip") + + return media + + def test_download_gzip_data_simple_upload(self): + media = self._insert_compressed_object("zebra") + + response = self.client.get( + "/download/storage/v1/b/bucket-name/o/zebra", query_string={"alt": "media"} + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data.decode("utf-8"), media) + self.assertEqual( + response.headers.get("x-guploader-response-body-transformations", ""), + "gunzipped", + ) + + def test_download_gzip_compressed(self): + media = self._insert_compressed_object("zebra") + + response = self.client.get( + "/download/storage/v1/b/bucket-name/o/zebra", + query_string={"alt": "media"}, + headers={"Accept-Encoding": "gzip"}, + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, gzip.compress(media.encode("utf-8"))) + self.assertEqual( + response.headers.get("x-guploader-response-body-transformations", ""), "" + ) + + def test_download_gzip_range_ignored(self): + media = self._insert_compressed_object("zebra") + + response = self.client.get( + "/download/storage/v1/b/bucket-name/o/zebra", + query_string={"alt": "media"}, + headers={"Range": "4-8"}, + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, media.encode("utf-8")) + self.assertEqual( + response.headers.get("x-guploader-response-body-transformations", ""), + "gunzipped", + ) + self.assertEqual( + response.headers.get("content-range", ""), + "bytes 0-%d/%d" % (len(media) - 1, len(media)), + ) + + def test_download_gzip_uncompressed_xml(self): + media = self._insert_compressed_object("zebra") + + response = self.client.get( + "/zebra", base_url="https://bucket-name.storage.googleapis.com" + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, media.encode("utf-8")) + self.assertEqual( + response.headers.get("x-guploader-response-body-transformations", ""), + "gunzipped", + ) + self.assertEqual( + response.headers.get("x-goog-stored-content-encoding", ""), "gzip" + ) + + def test_download_gzip_compressed_xml(self): + media = self._insert_compressed_object("zebra") + + response = self.client.get( + "/zebra", + base_url="https://bucket-name.storage.googleapis.com", + headers={"Accept-Encoding": "gzip"}, + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, gzip.compress(media.encode("utf-8"))) + self.assertEqual( + response.headers.get("x-guploader-response-body-transformations", ""), + "", + ) + self.assertEqual( + response.headers.get("x-goog-stored-content-encoding", ""), "gzip" + ) + + def test_download_of_multipart_upload(self): + media = "How vexingly quick daft zebras jump!" + compressed = gzip.compress(media.encode("utf-8")) + boundary, payload = format_multipart_upload_bytes( + {"contentEncoding": "gzip"}, compressed + ) + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={"uploadType": "multipart", "name": "zebra"}, + content_type="multipart/related; boundary=" + boundary, + data=payload, + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + insert_rest = json.loads(response.data) + self.assertEqual(insert_rest.get("kind"), "storage#object") + self.assertEqual(insert_rest.get("contentEncoding", ""), "gzip") + self.assertEqual( + response.headers.get("x-guploader-response-body-transformations", ""), "" + ) + + response = self.client.get( + "/download/storage/v1/b/bucket-name/o/zebra", query_string={"alt": "media"} + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data.decode("utf-8"), media) + self.assertEqual( + response.headers.get("x-guploader-response-body-transformations", ""), + "gunzipped", + ) + + def test_download_of_resumable_upload(self): + media = "How vexingly quick daft zebras jump!" + compressed = gzip.compress(media.encode("utf-8")) + + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={"uploadType": "resumable", "name": "zebra"}, + content_type="application/json", + data=json.dumps({"name": "zebra", "contentEncoding": "gzip"}), + ) + self.assertEqual(response.status_code, 200) + location = response.headers.get("location") + self.assertIn("upload_id=", location) + match = re.search("[&?]upload_id=([^&]+)", location) + self.assertIsNotNone(match, msg=location) + upload_id = match.group(1) + + finalized = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + data=compressed, + ) + self.assertEqual(finalized.status_code, 200) + self.assertTrue( + finalized.headers.get("content-type").startswith("application/json") + ) + insert_rest = json.loads(finalized.data) + self.assertIn("metadata", insert_rest) + self.assertEqual(insert_rest.get("kind"), "storage#object") + self.assertEqual(insert_rest.get("contentEncoding", ""), "gzip") + + response = self.client.get( + "/download/storage/v1/b/bucket-name/o/zebra", query_string={"alt": "media"} + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data.decode("utf-8"), media) + self.assertEqual( + response.headers.get("x-guploader-response-body-transformations", ""), + "gunzipped", + ) + + +if __name__ == "__main__": + unittest.main()