Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/async decompression #3

Merged
merged 7 commits into from
Mar 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,33 @@ but wraps another stream and compress it on-the-fly.
`GZIPCompressedStream` does not read entire stream, but instead read it
by chunks, until compressed output size will not satisfy read size.

`AsyncGZIPDecompressedStream` class can async read from another source
with zlib and gzip decompression on-the-fly

.. code-block:: python
# aiobotocore example

import aiobotocore

from gzip_stream import AsyncGZIPDecompressedStream

AWS_ACCESS_KEY_ID = "KEY_ID"
AWS_SECRET_ACCESS_KEY = "ACCESS_KEY"
BUCKET = "AWESOME_BUCKET"

upload_client = MyAsyncUploadClient()
session = aiobotocore.get_session()
async with session.create_client(
service_name="s3",
endpoint_url="s3_endpoint",
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID,
) as client:
response = await client.get_object(Bucket=BUCKET, Key='my_very_big_1tb_file.txt.gz')
async for decompressed_chunk in GzipAsyncReaderWrapper(response["Body"])):
await upload_client.upload_fileobj(decompressed_chunk)


Module works on Python ~= 3.5.

Installation
Expand Down
82 changes: 82 additions & 0 deletions gzip_stream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import abc
import asyncio
import gzip
import io
import zlib
from enum import Enum, unique
from typing import BinaryIO


Expand Down Expand Up @@ -82,6 +86,84 @@ def __repr__(self) -> str:
).format(self=self)


BUFFER_SIZE = 2 ** 10


class BaseAsyncReader(abc.ABC):
@abc.abstractmethod
async def read(self, size: int):
raise NotImplementedError


@unique
class CompressedType(int, Enum):
gzip = 16
zlib_gzip = 32


class BaseAsyncIteratorReader(BaseAsyncReader, abc.ABC):
def __aiter__(self):
return self

async def __anext__(self):
chunk = await self.read(BUFFER_SIZE)
if not chunk:
raise StopAsyncIteration
return chunk


class AsyncGZIPDecompressedStream(BaseAsyncIteratorReader):
def __init__(self, stream: BaseAsyncReader, *,
compression_type: CompressedType = CompressedType.zlib_gzip):

self._stream = stream
self._lock = asyncio.Lock()
self._decompressed_stream = io.BytesIO()
'''
http://www.zlib.net/manual.html#Advanced

windowBits can also be greater than 15 for optional gzip decoding.
Add 32 to windowBits to enable zlib and gzip decoding with automatic
header detection, or add 16 to decode only the gzip format
(the zlib format will return a Z_DATA_ERROR).
'''
self._decompressor = (
zlib.decompressobj(compression_type.value + zlib.MAX_WBITS)
)

@property
def stream(self) -> BaseAsyncReader:
return self._stream

async def read(self, size: int):
assert size > 0

async with self._lock:
while self._decompressed_stream.tell() < size:
chunk = await self._stream.read(size)
if not chunk:
break
self._decompressed_stream.write(
self._decompressor.decompress(chunk)
)
self._decompressed_stream.seek(0)
res = self._decompressed_stream.read()

# clearing buffer and rollback tail
self._decompressed_stream.seek(0)
self._decompressed_stream.truncate(0)
self._decompressed_stream.write(res[size:])
return res[:size]

def __repr__(self) -> str:
return (
'{self.__class__.__name__}('
'{self.stream!r}, '
')'
).format(self=self)


__all__ = (
'AsyncGZIPDecompressedStream',
'GZIPCompressedStream',
)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
'Topic :: Software Development :: Libraries'
],

python_requires="~=3.5",
python_requires='~=3.5',
extras_require={
'develop': [
'pytest~=5.0',
Expand Down
75 changes: 73 additions & 2 deletions tests.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from io import BytesIO
import asyncio
import gzip
from gzip import decompress
from io import BytesIO
from pathlib import Path

import pytest
from faker import Faker

from gzip_stream import GZIPCompressedStream
from gzip_stream import (
GZIPCompressedStream, AsyncGZIPDecompressedStream,
BaseAsyncIteratorReader, BUFFER_SIZE
)


@pytest.mark.parametrize(
Expand All @@ -20,3 +26,68 @@ def test_basic(data):
input_stream = BytesIO(data)
output_stream = GZIPCompressedStream(input_stream, compression_level=5)
assert decompress(output_stream.read()) == data


class FakeAsyncReader(BaseAsyncIteratorReader):
def __init__(self, filename: Path):
self._fp = open(str(filename), 'rb')
self._lock = asyncio.Lock()

async def read(self, size: int = BUFFER_SIZE):
async with self._lock:
return self._fp.read(size)

def __del__(self):
self._fp.close()


@pytest.mark.parametrize(
'expected',
[
'', 't', 'test',
Faker().text(4 * 1024),
Faker().text(256 * 1024)
],
ids=['0 bytes', '1 bytes', '4 bytes',
'fake text - ~4 KB', 'fake text - ~256 KB']
)
async def test_gzip_aiter_async_reader(expected, tmpdir):
tmp_file = tmpdir / 'temp.txt'
with gzip.open(str(tmp_file), 'wb') as f:
f.write(expected.encode('utf-8'))

buffer = BytesIO()
async for chunk in AsyncGZIPDecompressedStream(FakeAsyncReader(tmp_file)):
buffer.write(chunk)
buffer.seek(0)
assert buffer.read().decode('utf-8') == expected


@pytest.mark.parametrize(
'buff_size',
[
2,
4,
8,
16,
1024
],
ids=['2 bytes', '4 bytes', '8 bytes',
'16 bytes', '1 KB']
)
async def test_buffer_gzip_async_reader(tmpdir, buff_size):
plain_text = 'hello world' * 1000

tmp_file = tmpdir / 'temp.txt'
with gzip.open(str(tmp_file), 'wb') as f:
f.write(plain_text.encode('utf-8'))

buffer = BytesIO()
reader = AsyncGZIPDecompressedStream(FakeAsyncReader(tmp_file))
while True:
chunk = await reader.read(buff_size)
if not chunk:
break
buffer.write(chunk)
buffer.seek(0)
assert buffer.read().decode('utf-8') == plain_text
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ envlist = lint,py3{5,6,7,8}

[testenv]
deps =
aiomisc
faker
pytest
pytest-cov
Expand Down