Skip to content

Commit

Permalink
[ArrayManager] Enable read_parquet to not create 2D blocks when using…
Browse files Browse the repository at this point in the history
… ArrayManager (#40303)
  • Loading branch information
jorisvandenbossche authored Apr 26, 2021
1 parent ead9404 commit 58181b1
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 13 deletions.
9 changes: 7 additions & 2 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,18 @@ def _from_mgr(cls, mgr: Manager):
object.__setattr__(obj, "_attrs", {})
return obj

def _as_manager(self: FrameOrSeries, typ: str) -> FrameOrSeries:
def _as_manager(
self: FrameOrSeries, typ: str, copy: bool_t = True
) -> FrameOrSeries:
"""
Private helper function to create a DataFrame with specific manager.
Parameters
----------
typ : {"block", "array"}
copy : bool, default True
Only controls whether the conversion from Block->ArrayManager
copies the 1D arrays (to ensure proper/contiguous memory layout).
Returns
-------
Expand All @@ -301,7 +306,7 @@ def _as_manager(self: FrameOrSeries, typ: str) -> FrameOrSeries:
to be a copy or not.
"""
new_mgr: Manager
new_mgr = mgr_to_mgr(self._mgr, typ=typ)
new_mgr = mgr_to_mgr(self._mgr, typ=typ, copy=copy)
# fastpath of passing a manager doesn't check the option/manager class
return self._constructor(new_mgr).__finalize__(self)

Expand Down
14 changes: 10 additions & 4 deletions pandas/core/internals/construction.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,11 @@ def fill_masked_arrays(data: MaskedRecords, arr_columns: Index) -> list[np.ndarr
return new_arrays


def mgr_to_mgr(mgr, typ: str):
def mgr_to_mgr(mgr, typ: str, copy: bool = True):
"""
Convert to specific type of Manager. Does not copy if the type is already
correct. Does not guarantee a copy otherwise.
correct. Does not guarantee a copy otherwise. `copy` keyword only controls
whether conversion from Block->ArrayManager copies the 1D arrays.
"""
new_mgr: Manager

Expand All @@ -231,10 +232,15 @@ def mgr_to_mgr(mgr, typ: str):
new_mgr = mgr
else:
if mgr.ndim == 2:
arrays = [mgr.iget_values(i).copy() for i in range(len(mgr.axes[0]))]
arrays = [mgr.iget_values(i) for i in range(len(mgr.axes[0]))]
if copy:
arrays = [arr.copy() for arr in arrays]
new_mgr = ArrayManager(arrays, [mgr.axes[1], mgr.axes[0]])
else:
new_mgr = SingleArrayManager([mgr.internal_values()], [mgr.index])
array = mgr.internal_values()
if copy:
array = array.copy()
new_mgr = SingleArrayManager([array], [mgr.index])
else:
raise ValueError(f"'typ' needs to be one of {{'block', 'array'}}, got '{typ}'")
return new_mgr
Expand Down
8 changes: 7 additions & 1 deletion pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ def read(
"'use_nullable_dtypes=True' is only supported for pyarrow >= 0.16 "
f"({self.api.__version__} is installed"
)
manager = get_option("mode.data_manager")
if manager == "array":
to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]

path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
path,
Expand All @@ -239,9 +242,12 @@ def read(
mode="rb",
)
try:
return self.api.parquet.read_table(
result = self.api.parquet.read_table(
path_or_handle, columns=columns, **kwargs
).to_pandas(**to_pandas_kwargs)
if manager == "array":
result = result._as_manager("array", copy=False)
return result
finally:
if handles is not None:
handles.close()
Expand Down
29 changes: 23 additions & 6 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import numpy as np
import pytest

from pandas._config import get_option

from pandas.compat import (
PY38,
is_platform_windows,
Expand Down Expand Up @@ -41,20 +43,21 @@
_HAVE_FASTPARQUET = False


pytestmark = [
pytest.mark.filterwarnings("ignore:RangeIndex.* is deprecated:DeprecationWarning"),
# TODO(ArrayManager) fastparquet / pyarrow rely on BlockManager internals
td.skip_array_manager_not_yet_implemented,
]
pytestmark = pytest.mark.filterwarnings(
"ignore:RangeIndex.* is deprecated:DeprecationWarning"
)


# TODO(ArrayManager) fastparquet relies on BlockManager internals

# setup engines & skips
@pytest.fixture(
params=[
pytest.param(
"fastparquet",
marks=pytest.mark.skipif(
not _HAVE_FASTPARQUET, reason="fastparquet is not installed"
not _HAVE_FASTPARQUET or get_option("mode.data_manager") == "array",
reason="fastparquet is not installed or ArrayManager is used",
),
),
pytest.param(
Expand All @@ -80,6 +83,8 @@ def pa():
def fp():
if not _HAVE_FASTPARQUET:
pytest.skip("fastparquet is not installed")
elif get_option("mode.data_manager") == "array":
pytest.skip("ArrayManager is not supported with fastparquet")
return "fastparquet"


Expand Down Expand Up @@ -923,6 +928,18 @@ def test_filter_row_groups(self, pa):
)
assert len(result) == 1

def test_read_parquet_manager(self, pa, using_array_manager):
# ensure that read_parquet honors the pandas.options.mode.data_manager option
df = pd.DataFrame(np.random.randn(10, 3), columns=["A", "B", "C"])

with tm.ensure_clean() as path:
df.to_parquet(path, pa)
result = read_parquet(path, pa)
if using_array_manager:
assert isinstance(result._mgr, pd.core.internals.ArrayManager)
else:
assert isinstance(result._mgr, pd.core.internals.BlockManager)


class TestParquetFastParquet(Base):
def test_basic(self, fp, df_full):
Expand Down

0 comments on commit 58181b1

Please sign in to comment.