Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ArrayManager] Enable read_parquet to not create 2D blocks when using ArrayManager #40303

Merged
merged 7 commits into from
Apr 26, 2021
9 changes: 7 additions & 2 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,18 @@ def _from_mgr(cls, mgr: Manager):
object.__setattr__(obj, "_attrs", {})
return obj

def _as_manager(self: FrameOrSeries, typ: str) -> FrameOrSeries:
def _as_manager(
self: FrameOrSeries, typ: str, copy: bool_t = True
) -> FrameOrSeries:
"""
Private helper function to create a DataFrame with specific manager.

Parameters
----------
typ : {"block", "array"}
copy : bool, default True
Only controls whether the conversion from Block->ArrayManager
copies the 1D arrays (to ensure proper/contiguous memory layout).

Returns
-------
Expand All @@ -301,7 +306,7 @@ def _as_manager(self: FrameOrSeries, typ: str) -> FrameOrSeries:
to be a copy or not.
"""
new_mgr: Manager
new_mgr = mgr_to_mgr(self._mgr, typ=typ)
new_mgr = mgr_to_mgr(self._mgr, typ=typ, copy=copy)
# fastpath of passing a manager doesn't check the option/manager class
return self._constructor(new_mgr).__finalize__(self)

Expand Down
14 changes: 10 additions & 4 deletions pandas/core/internals/construction.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,11 @@ def fill_masked_arrays(data: MaskedRecords, arr_columns: Index) -> list[np.ndarr
return new_arrays


def mgr_to_mgr(mgr, typ: str):
def mgr_to_mgr(mgr, typ: str, copy: bool = True):
"""
Convert to specific type of Manager. Does not copy if the type is already
correct. Does not guarantee a copy otherwise.
correct. Does not guarantee a copy otherwise. `copy` keyword only controls
whether conversion from Block->ArrayManager copies the 1D arrays.
"""
new_mgr: Manager

Expand All @@ -229,10 +230,15 @@ def mgr_to_mgr(mgr, typ: str):
new_mgr = mgr
else:
if mgr.ndim == 2:
arrays = [mgr.iget_values(i).copy() for i in range(len(mgr.axes[0]))]
arrays = [mgr.iget_values(i) for i in range(len(mgr.axes[0]))]
jreback marked this conversation as resolved.
Show resolved Hide resolved
if copy:
arrays = [arr.copy() for arr in arrays]
new_mgr = ArrayManager(arrays, [mgr.axes[1], mgr.axes[0]])
else:
new_mgr = SingleArrayManager([mgr.internal_values()], [mgr.index])
array = mgr.internal_values()
if copy:
array = array.copy()
new_mgr = SingleArrayManager([array], [mgr.index])
else:
raise ValueError(f"'typ' needs to be one of {{'block', 'array'}}, got '{typ}'")
return new_mgr
Expand Down
8 changes: 7 additions & 1 deletion pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ def read(
"'use_nullable_dtypes=True' is only supported for pyarrow >= 0.16 "
f"({self.api.__version__} is installed"
)
manager = get_option("mode.data_manager")
if manager == "array":
to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]

path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
path,
Expand All @@ -239,9 +242,12 @@ def read(
mode="rb",
)
try:
return self.api.parquet.read_table(
result = self.api.parquet.read_table(
path_or_handle, columns=columns, **kwargs
).to_pandas(**to_pandas_kwargs)
if manager == "array":
result = result._as_manager("array", copy=False)
return result
finally:
if handles is not None:
handles.close()
Expand Down
29 changes: 23 additions & 6 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import numpy as np
import pytest

from pandas._config import get_option

from pandas.compat import (
PY38,
is_platform_windows,
Expand Down Expand Up @@ -41,20 +43,21 @@
_HAVE_FASTPARQUET = False


pytestmark = [
pytest.mark.filterwarnings("ignore:RangeIndex.* is deprecated:DeprecationWarning"),
# TODO(ArrayManager) fastparquet / pyarrow rely on BlockManager internals
td.skip_array_manager_not_yet_implemented,
]
pytestmark = pytest.mark.filterwarnings(
"ignore:RangeIndex.* is deprecated:DeprecationWarning"
)


# TODO(ArrayManager) fastparquet relies on BlockManager internals

# setup engines & skips
@pytest.fixture(
params=[
pytest.param(
"fastparquet",
marks=pytest.mark.skipif(
not _HAVE_FASTPARQUET, reason="fastparquet is not installed"
not _HAVE_FASTPARQUET or get_option("mode.data_manager") == "array",
reason="fastparquet is not installed or ArrayManager is used",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a "for now" or a "ever"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If your question is about "will ArrayManager be supported with fastparquet engine", that's probably a question for the fastparquet package (and since this is only optional for now, there is still time to discuss that with them)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so not actionable on our end, thanks

),
),
pytest.param(
Expand All @@ -80,6 +83,8 @@ def pa():
def fp():
if not _HAVE_FASTPARQUET:
pytest.skip("fastparquet is not installed")
elif get_option("mode.data_manager") == "array":
pytest.skip("ArrayManager is not supported with fastparquet")
return "fastparquet"


Expand Down Expand Up @@ -923,6 +928,18 @@ def test_filter_row_groups(self, pa):
)
assert len(result) == 1

def test_read_parquet_manager(self, pa, using_array_manager):
# ensure that read_parquet honors the pandas.options.mode.data_manager option
df = pd.DataFrame(np.random.randn(10, 3), columns=["A", "B", "C"])

with tm.ensure_clean() as path:
df.to_parquet(path, pa)
result = read_parquet(path, pa)
if using_array_manager:
assert isinstance(result._mgr, pd.core.internals.ArrayManager)
else:
assert isinstance(result._mgr, pd.core.internals.BlockManager)


class TestParquetFastParquet(Base):
def test_basic(self, fp, df_full):
Expand Down