Skip to content

Commit

Permalink
BUG: read_parquet, to_parquet for s3 destinations (#19135)
Browse files Browse the repository at this point in the history
  • Loading branch information
maximveksler authored and jreback committed Jan 18, 2018
1 parent 51d71cd commit 6e0927e
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 53 deletions.
1 change: 1 addition & 0 deletions doc/source/whatsnew/v0.23.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ I/O
- Bug in :func:`read_sas` where a file with 0 variables gave an ``AttributeError`` incorrectly. Now it gives an ``EmptyDataError`` (:issue:`18184`)
- Bug in :func:`DataFrame.to_latex()` where pairs of braces meant to serve as invisible placeholders were escaped (:issue:`18667`)
- Bug in :func:`read_json` where large numeric values were causing an ``OverflowError`` (:issue:`18842`)
- Bug in :func:`DataFrame.to_parquet` where an exception was raised if the write destination is S3 (:issue:`19134`)
-

Plotting
Expand Down
26 changes: 14 additions & 12 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ def _is_url(url):
return False


def _is_s3_url(url):
"""Check for an s3, s3n, or s3a url"""
try:
return parse_url(url).scheme in ['s3', 's3n', 's3a']
except:
return False


def _expand_user(filepath_or_buffer):
"""Return the argument with an initial component of ~ or ~user
replaced by that user's home directory.
Expand Down Expand Up @@ -168,8 +160,16 @@ def _stringify_path(filepath_or_buffer):
return filepath_or_buffer


def is_s3_url(url):
"""Check for an s3, s3n, or s3a url"""
try:
return parse_url(url).scheme in ['s3', 's3n', 's3a']
except: # noqa
return False


def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
compression=None):
compression=None, mode=None):
"""
If the filepath_or_buffer is a url, translate and return the buffer.
Otherwise passthrough.
Expand All @@ -179,10 +179,11 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path),
or buffer
encoding : the encoding to use to decode py3 bytes, default is 'utf-8'
mode : str, optional
Returns
-------
a filepath_or_buffer, the encoding, the compression
a filepath_ or buffer or S3File instance, the encoding, the compression
"""
filepath_or_buffer = _stringify_path(filepath_or_buffer)

Expand All @@ -195,11 +196,12 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
reader = BytesIO(req.read())
return reader, encoding, compression

if _is_s3_url(filepath_or_buffer):
if is_s3_url(filepath_or_buffer):
from pandas.io import s3
return s3.get_filepath_or_buffer(filepath_or_buffer,
encoding=encoding,
compression=compression)
compression=compression,
mode=mode)

if isinstance(filepath_or_buffer, (compat.string_types,
compat.binary_type,
Expand Down
28 changes: 23 additions & 5 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pandas import DataFrame, RangeIndex, Int64Index, get_option
from pandas.compat import string_types
from pandas.core.common import AbstractMethodError
from pandas.io.common import get_filepath_or_buffer
from pandas.io.common import get_filepath_or_buffer, is_s3_url


def get_engine(engine):
Expand Down Expand Up @@ -107,7 +107,7 @@ def write(self, df, path, compression='snappy',
self.validate_dataframe(df)
if self._pyarrow_lt_070:
self._validate_write_lt_070(df)
path, _, _ = get_filepath_or_buffer(path)
path, _, _ = get_filepath_or_buffer(path, mode='wb')

if self._pyarrow_lt_060:
table = self.api.Table.from_pandas(df, timestamps_to_ms=True)
Expand Down Expand Up @@ -194,14 +194,32 @@ def write(self, df, path, compression='snappy', **kwargs):
# thriftpy/protocol/compact.py:339:
# DeprecationWarning: tostring() is deprecated.
# Use tobytes() instead.
path, _, _ = get_filepath_or_buffer(path)

if is_s3_url(path):
# path is s3:// so we need to open the s3file in 'wb' mode.
# TODO: Support 'ab'

path, _, _ = get_filepath_or_buffer(path, mode='wb')
# And pass the opened s3file to the fastparquet internal impl.
kwargs['open_with'] = lambda path, _: path
else:
path, _, _ = get_filepath_or_buffer(path)

with catch_warnings(record=True):
self.api.write(path, df,
compression=compression, **kwargs)

def read(self, path, columns=None, **kwargs):
path, _, _ = get_filepath_or_buffer(path)
parquet_file = self.api.ParquetFile(path)
if is_s3_url(path):
# When path is s3:// an S3File is returned.
# We need to retain the original path(str) while also
# pass the S3File().open function to fsatparquet impl.
s3, _, _ = get_filepath_or_buffer(path)
parquet_file = self.api.ParquetFile(path, open_with=s3.s3.open)
else:
path, _, _ = get_filepath_or_buffer(path)
parquet_file = self.api.ParquetFile(path)

return parquet_file.to_pandas(columns=columns, **kwargs)


Expand Down
10 changes: 7 additions & 3 deletions pandas/io/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ def _strip_schema(url):


def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
compression=None):
compression=None, mode=None):

if mode is None:
mode = 'rb'

fs = s3fs.S3FileSystem(anon=False)
try:
filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer))
filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer), mode)
except (OSError, NoCredentialsError):
# boto3 has troubles when trying to access a public file
# when credentialed...
Expand All @@ -31,5 +35,5 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
# A NoCredentialsError is raised if you don't have creds
# for that bucket.
fs = s3fs.S3FileSystem(anon=True)
filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer))
filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer), mode)
return filepath_or_buffer, None, compression
84 changes: 54 additions & 30 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,22 @@ def test_cross_engine_fp_pa(df_cross_compat, pa, fp):
tm.assert_frame_equal(result, df[['a', 'd']])


def check_round_trip_equals(df, path, engine,
write_kwargs, read_kwargs,
expected, check_names):

df.to_parquet(path, engine, **write_kwargs)
actual = read_parquet(path, engine, **read_kwargs)
tm.assert_frame_equal(expected, actual,
check_names=check_names)

# repeat
df.to_parquet(path, engine, **write_kwargs)
actual = read_parquet(path, engine, **read_kwargs)
tm.assert_frame_equal(expected, actual,
check_names=check_names)


class Base(object):

def check_error_on_write(self, df, engine, exc):
Expand All @@ -212,28 +228,32 @@ def check_error_on_write(self, df, engine, exc):
with tm.ensure_clean() as path:
to_parquet(df, path, engine, compression=None)

def check_round_trip(self, df, engine, expected=None,
def check_round_trip(self, df, engine, expected=None, path=None,
write_kwargs=None, read_kwargs=None,
check_names=True):

if write_kwargs is None:
write_kwargs = {}
write_kwargs = {'compression': None}

if read_kwargs is None:
read_kwargs = {}
with tm.ensure_clean() as path:
df.to_parquet(path, engine, **write_kwargs)
result = read_parquet(path, engine, **read_kwargs)

if expected is None:
expected = df
tm.assert_frame_equal(result, expected, check_names=check_names)

# repeat
to_parquet(df, path, engine, **write_kwargs)
result = pd.read_parquet(path, engine, **read_kwargs)
if expected is None:
expected = df

if expected is None:
expected = df
tm.assert_frame_equal(result, expected, check_names=check_names)
if path is None:
with tm.ensure_clean() as path:
check_round_trip_equals(df, path, engine,
write_kwargs=write_kwargs,
read_kwargs=read_kwargs,
expected=expected,
check_names=check_names)
else:
check_round_trip_equals(df, path, engine,
write_kwargs=write_kwargs,
read_kwargs=read_kwargs,
expected=expected,
check_names=check_names)


class TestBasic(Base):
Expand All @@ -251,7 +271,7 @@ def test_columns_dtypes(self, engine):

# unicode
df.columns = [u'foo', u'bar']
self.check_round_trip(df, engine, write_kwargs={'compression': None})
self.check_round_trip(df, engine)

def test_columns_dtypes_invalid(self, engine):

Expand Down Expand Up @@ -292,7 +312,6 @@ def test_read_columns(self, engine):

expected = pd.DataFrame({'string': list('abc')})
self.check_round_trip(df, engine, expected=expected,
write_kwargs={'compression': None},
read_kwargs={'columns': ['string']})

def test_write_index(self, engine):
Expand All @@ -304,7 +323,7 @@ def test_write_index(self, engine):
pytest.skip("pyarrow is < 0.7.0")

df = pd.DataFrame({'A': [1, 2, 3]})
self.check_round_trip(df, engine, write_kwargs={'compression': None})
self.check_round_trip(df, engine)

indexes = [
[2, 3, 4],
Expand All @@ -315,15 +334,12 @@ def test_write_index(self, engine):
# non-default index
for index in indexes:
df.index = index
self.check_round_trip(
df, engine,
write_kwargs={'compression': None},
check_names=check_names)
self.check_round_trip(df, engine, check_names=check_names)

# index with meta-data
df.index = [0, 1, 2]
df.index.name = 'foo'
self.check_round_trip(df, engine, write_kwargs={'compression': None})
self.check_round_trip(df, engine)

def test_write_multiindex(self, pa_ge_070):
# Not suppoprted in fastparquet as of 0.1.3 or older pyarrow version
Expand All @@ -332,7 +348,7 @@ def test_write_multiindex(self, pa_ge_070):
df = pd.DataFrame({'A': [1, 2, 3]})
index = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)])
df.index = index
self.check_round_trip(df, engine, write_kwargs={'compression': None})
self.check_round_trip(df, engine)

def test_write_column_multiindex(self, engine):
# column multi-index
Expand Down Expand Up @@ -426,6 +442,11 @@ def test_categorical_unsupported(self, pa_lt_070):
df = pd.DataFrame({'a': pd.Categorical(list('abc'))})
self.check_error_on_write(df, pa, NotImplementedError)

def test_s3_roundtrip(self, df_compat, s3_resource, pa):
# GH #19134
self.check_round_trip(df_compat, pa,
path='s3://pandas-test/pyarrow.parquet')


class TestParquetFastParquet(Base):

Expand All @@ -436,7 +457,7 @@ def test_basic(self, fp, df_full):
# additional supported types for fastparquet
df['timedelta'] = pd.timedelta_range('1 day', periods=3)

self.check_round_trip(df, fp, write_kwargs={'compression': None})
self.check_round_trip(df, fp)

@pytest.mark.skip(reason="not supported")
def test_duplicate_columns(self, fp):
Expand All @@ -449,8 +470,7 @@ def test_duplicate_columns(self, fp):
def test_bool_with_none(self, fp):
df = pd.DataFrame({'a': [True, None, False]})
expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16')
self.check_round_trip(df, fp, expected=expected,
write_kwargs={'compression': None})
self.check_round_trip(df, fp, expected=expected)

def test_unsupported(self, fp):

Expand All @@ -466,7 +486,7 @@ def test_categorical(self, fp):
if LooseVersion(fastparquet.__version__) < LooseVersion("0.1.3"):
pytest.skip("CategoricalDtype not supported for older fp")
df = pd.DataFrame({'a': pd.Categorical(list('abc'))})
self.check_round_trip(df, fp, write_kwargs={'compression': None})
self.check_round_trip(df, fp)

def test_datetime_tz(self, fp):
# doesn't preserve tz
Expand All @@ -475,8 +495,7 @@ def test_datetime_tz(self, fp):

# warns on the coercion
with catch_warnings(record=True):
self.check_round_trip(df, fp, df.astype('datetime64[ns]'),
write_kwargs={'compression': None})
self.check_round_trip(df, fp, df.astype('datetime64[ns]'))

def test_filter_row_groups(self, fp):
d = {'a': list(range(0, 3))}
Expand All @@ -486,3 +505,8 @@ def test_filter_row_groups(self, fp):
row_group_offsets=1)
result = read_parquet(path, fp, filters=[('a', '==', 0)])
assert len(result) == 1

def test_s3_roundtrip(self, df_compat, s3_resource, fp):
# GH #19134
self.check_round_trip(df_compat, fp,
path='s3://pandas-test/fastparquet.parquet')
6 changes: 3 additions & 3 deletions pandas/tests/io/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from pandas.io.common import _is_s3_url
from pandas.io.common import is_s3_url


class TestS3URL(object):

def test_is_s3_url(self):
assert _is_s3_url("s3://pandas/somethingelse.com")
assert not _is_s3_url("s4://pandas/somethingelse.com")
assert is_s3_url("s3://pandas/somethingelse.com")
assert not is_s3_url("s4://pandas/somethingelse.com")

0 comments on commit 6e0927e

Please sign in to comment.