Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] [Docs] Improve docs around Parquet filter predicate / column selection pushdown #48095

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions doc/source/data/loading-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ To view the full list of supported file formats, see the
petal.width double
variety string

.. tip::

When reading parquet files, you can take advantage of column and row pruning
to efficiently filter columns and rows at the file scan level. See
:ref:`Parquet column pruning <parquet_column_pruning>` and
:ref:`Parquet row pruning <parquet_row_pruning>` for more details
on the projection and filter pushdown features.

.. tab-item:: Images

To read raw images, call :func:`~ray.data.read_images`. Ray Data represents
Expand Down
52 changes: 38 additions & 14 deletions doc/source/data/performance-tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -191,28 +191,52 @@ By default, Ray requests 1 CPU per read task, which means one read task per CPU
For datasources that benefit from more IO parallelism, you can specify a lower ``num_cpus`` value for the read function with the ``ray_remote_args`` parameter.
For example, use ``ray.data.read_parquet(path, ray_remote_args={"num_cpus": 0.25})`` to allow up to four read tasks per CPU.

Parquet column pruning
~~~~~~~~~~~~~~~~~~~~~~
.. _parquet_column_pruning:

Current Dataset reads all Parquet columns into memory.
Parquet column pruning (projection pushdown)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

By default, :func:`ray.data.read_parquet` reads all columns in the Parquet files into memory.
If you only need a subset of the columns, make sure to specify the list of columns
explicitly when calling :func:`ray.data.read_parquet` to
Comment on lines 200 to 201
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If you only need a subset of the columns, make sure to specify the list of columns
explicitly when calling :func:`ray.data.read_parquet` to
If you only need a subset of the columns, specify the list of columns
explicitly when calling :func:`ray.data.read_parquet` to

avoid loading unnecessary data (projection pushdown).
For example, use ``ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet", columns=["sepal.length", "variety"])`` to read
just two of the five columns of Iris dataset.
avoid loading unnecessary data (projection pushdown). Note that this is more efficient than
calling :func:`~ray.data.Dataset.select_columns`, since column selection is pushed down to the file scan.

.. testcode::

import ray
# Read just two of the five columns of the Iris dataset.
ray.data.read_parquet(
"s3://anonymous@ray-example-data/iris.parquet",
columns=["sepal.length", "variety"],
)

.. testoutput::

Dataset(num_rows=150, schema={sepal.length: double, variety: string})

.. _parquet_row_pruning:

Parquet row pruning
~~~~~~~~~~~~~~~~~~~
Parquet row pruning (filter pushdown)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Similarly, you can pass in a filter to :func:`ray.data.read_parquet` (filter pushdown)
Similar to Parquet column pruning, you can pass in a filter to :func:`ray.data.read_parquet` (filter pushdown)
which is applied at the file scan so only rows that match the filter predicate
are returned.
For example, use ``ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet", filter=pyarrow.dataset.field("sepal.length") > 5.0)``
(where ``pyarrow`` has to be imported)
to read rows with sepal.length greater than 5.0.
This can be used in conjunction with column pruning when appropriate to get the benefits of both.
are returned. This can be used in conjunction with column pruning when appropriate to get the benefits of both.

.. testcode::

import ray
# Only read rows with `sepal.length` greater than 5.0.
# The row count will be less than the total number of rows (150) in the full dataset.
ray.data.read_parquet(
"s3://anonymous@ray-example-data/iris.parquet",
filter=pyarrow.dataset.field("sepal.length") > 5.0,
).count()

.. testoutput::

118


.. _data_memory:
Expand Down
12 changes: 9 additions & 3 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,11 @@ def select_columns(

Specified columns must be in the dataset schema.

.. tip::
If you're reading parquet files with :meth:`ray.data.read_parquet`,
you might be able to speed it up by using projection pushdown; see
:ref:`Parquet column pruning <parquet_column_pruning>` for details.

Examples:

>>> import ray
Expand Down Expand Up @@ -1103,9 +1108,10 @@ def filter(
dropping rows.

.. tip::
If you're using parquet and the filter is a simple predicate, you might
be able to speed it up by using filter pushdown, see
:ref:`Parquet row pruning <parquet_row_pruning>`.
If you're reading parquet files with :meth:`ray.data.read_parquet`,
and the filter is a simple predicate, you might
be able to speed it up by using filter pushdown; see
:ref:`Parquet row pruning <parquet_row_pruning>` for details.

Examples:

Expand Down
Loading