diff --git a/doc/source/data/loading-data.rst b/doc/source/data/loading-data.rst index 5d1ef8771d987..b390e6e6be35b 100644 --- a/doc/source/data/loading-data.rst +++ b/doc/source/data/loading-data.rst @@ -43,6 +43,14 @@ To view the full list of supported file formats, see the petal.width double variety string + .. tip:: + + When reading parquet files, you can take advantage of column and row pruning + to efficiently filter columns and rows at the file scan level. See + :ref:`Parquet column pruning ` and + :ref:`Parquet row pruning ` for more details + on the projection and filter pushdown features. + .. tab-item:: Images To read raw images, call :func:`~ray.data.read_images`. Ray Data represents diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index 26602938c46d1..03b5f4100894a 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -191,28 +191,52 @@ By default, Ray requests 1 CPU per read task, which means one read task per CPU For datasources that benefit from more IO parallelism, you can specify a lower ``num_cpus`` value for the read function with the ``ray_remote_args`` parameter. For example, use ``ray.data.read_parquet(path, ray_remote_args={"num_cpus": 0.25})`` to allow up to four read tasks per CPU. -Parquet column pruning -~~~~~~~~~~~~~~~~~~~~~~ +.. _parquet_column_pruning: -Current Dataset reads all Parquet columns into memory. +Parquet column pruning (projection pushdown) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +By default, :func:`ray.data.read_parquet` reads all columns in the Parquet files into memory. If you only need a subset of the columns, make sure to specify the list of columns explicitly when calling :func:`ray.data.read_parquet` to -avoid loading unnecessary data (projection pushdown). -For example, use ``ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet", columns=["sepal.length", "variety"])`` to read -just two of the five columns of Iris dataset. +avoid loading unnecessary data (projection pushdown). Note that this is more efficient than +calling :func:`~ray.data.Dataset.select_columns`, since column selection is pushed down to the file scan. + +.. testcode:: + + import ray + # Read just two of the five columns of the Iris dataset. + ray.data.read_parquet( + "s3://anonymous@ray-example-data/iris.parquet", + columns=["sepal.length", "variety"], + ) + +.. testoutput:: + + Dataset(num_rows=150, schema={sepal.length: double, variety: string}) .. _parquet_row_pruning: -Parquet row pruning -~~~~~~~~~~~~~~~~~~~ +Parquet row pruning (filter pushdown) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Similarly, you can pass in a filter to :func:`ray.data.read_parquet` (filter pushdown) +Similar to Parquet column pruning, you can pass in a filter to :func:`ray.data.read_parquet` (filter pushdown) which is applied at the file scan so only rows that match the filter predicate -are returned. -For example, use ``ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet", filter=pyarrow.dataset.field("sepal.length") > 5.0)`` -(where ``pyarrow`` has to be imported) -to read rows with sepal.length greater than 5.0. -This can be used in conjunction with column pruning when appropriate to get the benefits of both. +are returned. This can be used in conjunction with column pruning when appropriate to get the benefits of both. + +.. testcode:: + + import ray + # Only read rows with `sepal.length` greater than 5.0. + # The row count will be less than the total number of rows (150) in the full dataset. + ray.data.read_parquet( + "s3://anonymous@ray-example-data/iris.parquet", + filter=pyarrow.dataset.field("sepal.length") > 5.0, + ).count() + +.. testoutput:: + + 118 .. _data_memory: diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 0a58f1818d4a2..1b4dae3dbbefa 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -831,6 +831,11 @@ def select_columns( Specified columns must be in the dataset schema. + .. tip:: + If you're reading parquet files with :meth:`ray.data.read_parquet`, + you might be able to speed it up by using projection pushdown; see + :ref:`Parquet column pruning ` for details. + Examples: >>> import ray @@ -1103,9 +1108,10 @@ def filter( dropping rows. .. tip:: - If you're using parquet and the filter is a simple predicate, you might - be able to speed it up by using filter pushdown, see - :ref:`Parquet row pruning `. + If you're reading parquet files with :meth:`ray.data.read_parquet`, + and the filter is a simple predicate, you might + be able to speed it up by using filter pushdown; see + :ref:`Parquet row pruning ` for details. Examples: