From 7eae49e17cc42cda3759f2750198f0b8f5e735ec Mon Sep 17 00:00:00 2001 From: Sooyong Kim Date: Wed, 14 Feb 2024 16:36:33 +1300 Subject: [PATCH] Remove _force_mode: Wrap non gzip files in r mode with TextIOWrapper to support universal newlines --- storages/backends/s3.py | 8 ++-- tests/settings.py | 1 + tests/test_s3.py | 88 ++++++++++++++++++++--------------------- 3 files changed, 48 insertions(+), 49 deletions(-) diff --git a/storages/backends/s3.py b/storages/backends/s3.py index ef915164..dc7938bc 100644 --- a/storages/backends/s3.py +++ b/storages/backends/s3.py @@ -1,5 +1,6 @@ import mimetypes import os +import io import posixpath import tempfile import threading @@ -123,7 +124,6 @@ def __init__(self, name, mode, storage, buffer_size=None): self._storage = storage self.name = name[len(self._storage.location) :].lstrip("/") self._mode = mode - self._force_mode = (lambda b: b) if "b" in mode else (lambda b: b.decode()) self.obj = storage.bucket.Object(name) if "w" not in mode: # Force early RAII-style exception if object does not exist @@ -184,6 +184,8 @@ def _get_file(self): self._file.seek(0) if self._storage.gzip and self.obj.content_encoding == "gzip": self._file = self._decompress_file(mode=self._mode, file=self._file) + elif "b" not in self._mode: + self._file = io.TextIOWrapper(self._file._file, encoding="utf-8") self._closed = False return self._file @@ -195,12 +197,12 @@ def _set_file(self, value): def read(self, *args, **kwargs): if "r" not in self._mode: raise AttributeError("File was not opened in read mode.") - return self._force_mode(super().read(*args, **kwargs)) + return super().read(*args, **kwargs) def readline(self, *args, **kwargs): if "r" not in self._mode: raise AttributeError("File was not opened in read mode.") - return self._force_mode(super().readline(*args, **kwargs)) + return super().readline(*args, **kwargs) def readlines(self): return list(self) diff --git a/tests/settings.py b/tests/settings.py index 02c7ecab..e7d3806e 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -8,3 +8,4 @@ # the following test settings are required for moto to work. AWS_STORAGE_BUCKET_NAME = "test-bucket" +AWS_S3_REGION_NAME = "us-east-1" diff --git a/tests/test_s3.py b/tests/test_s3.py index b4aeaac4..e4ce9f1a 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -305,52 +305,6 @@ def test_storage_open_read_string(self): self.assertEqual(content_str, "") file.close() - def test_storage_open_read_with_newlines(self): - """ - Test opening a file in "r" mode with various newline characters - """ - name = "test_storage_open_read_with_newlines.txt" - with io.BytesIO() as temp_file: - temp_file.write(b"line1\nline2\r\nmore\rtext\n") - temp_file.seek(0) - file = self.storage.open(name, "r") - file._file = temp_file - content_str = file.read() - file.close() - self.assertEqual(content_str, "line1\nline2\nmore\ntext\n") - - def test_storage_open_readlines(self): - """ - Test readlines with file opened in "r" and "rb" modes - """ - name = "test_storage_open_readlines.txt" - with io.BytesIO() as temp_file: - temp_file.write(b"line1\nline2") - file = self.storage.open(name, "r") - file._file = temp_file - - content_lines = file.readlines() - self.assertEqual(content_lines, ["line1\n", "line2"]) - - temp_file.seek(0) - file = self.storage.open(name, "rb") - file._file = temp_file - content_lines = file.readlines() - self.assertEqual(content_lines, [b"line1\n", b"line2"]) - - def test_storage_open_readlines_with_newlines(self): - """ - Test readlines with file opened in "r" mode with various newline characters - """ - name = "test_storage_open_readlines_with_newlines.txt" - with io.BytesIO() as temp_file: - temp_file.write(b"line1\nline2\r\nmore\rtext") - file = self.storage.open(name, "r") - file._file = temp_file - - content_lines = file.readlines() - self.assertEqual(content_lines, ['line1\n', 'line2\n', 'more\n', 'text']) - def test_storage_open_write(self): """ Test opening a file in write mode @@ -1172,6 +1126,48 @@ def test_content_type_not_detectable(self): s3.S3Storage.default_content_type, ) + def test_storage_open_read_with_newlines(self): + """ + Test opening a file in "r" and "rb" mode with various newline characters + """ + name = "test_storage_open_read_with_newlines.txt" + with io.BytesIO() as temp_file: + temp_file.write(b"line1\nline2\r\nmore\rtext\n") + self.storage.save(name, temp_file) + file = self.storage.open(name, "r") + content_str = file.read() + file.close() + self.assertEqual(content_str, "line1\nline2\nmore\ntext\n") + + with io.BytesIO() as temp_file: + temp_file.write(b"line1\nline2\r\nmore\rtext\n") + self.storage.save(name, temp_file) + file = self.storage.open(name, "rb") + content_str = file.read() + file.close() + self.assertEqual(content_str, b"line1\nline2\r\nmore\rtext\n") + + def test_storage_open_readlines_with_newlines(self): + """ + Test readlines with file opened in "r" and "rb" mode with various newline characters + """ + name = "test_storage_open_readlines_with_newlines.txt" + with io.BytesIO() as temp_file: + temp_file.write(b"line1\nline2\r\nmore\rtext") + self.storage.save(name, temp_file) + file = self.storage.open(name, "r") + content_lines = file.readlines() + file.close() + self.assertEqual(content_lines, ['line1\n', 'line2\n', 'more\n', 'text']) + + with io.BytesIO() as temp_file: + temp_file.write(b"line1\nline2\r\nmore\rtext") + self.storage.save(name, temp_file) + file = self.storage.open(name, "rb") + content_lines = file.readlines() + file.close() + self.assertEqual(content_lines, [b'line1\n', b'line2\r\n', b'more\r', b'text']) + class TestBackwardsNames(TestCase): def test_importing(self):