Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: scan_csv #1555

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api-reference/narwhals.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Here are the top-level functions available in Narwhals.
- narwhalify
- new_series
- nth
- scan_csv
- sum
- sum_horizontal
- show_versions
Expand Down
2 changes: 2 additions & 0 deletions narwhals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from narwhals.functions import from_numpy
from narwhals.functions import get_level
from narwhals.functions import new_series
from narwhals.functions import scan_csv
from narwhals.functions import show_versions
from narwhals.schema import Schema
from narwhals.series import Series
Expand Down Expand Up @@ -138,6 +139,7 @@
"narwhalify",
"new_series",
"nth",
"scan_csv",
"selectors",
"show_versions",
"stable",
Expand Down
89 changes: 89 additions & 0 deletions narwhals/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,3 +936,92 @@ def get_level(
- 'interchange': only metadata operations are supported (`df.schema`)
"""
return obj._level


def scan_csv(
source: str,
*,
native_namespace: ModuleType,
) -> LazyFrame[Any]:
"""Lazily read from a CSV file.

This allows the query optimizer to push down predicates and projections
to the scan level, thereby potentially reducing memory overhead.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is Polars-specific, perhaps we can remove it?

For the libraries that do not support lazy dataframes, the function reads
a csv file eagerly and then converts the resulting dataframe to a lazyframe.

Arguments:
source: Path to a file.
native_namespace: The native library to use for DataFrame creation.

Returns:
LazyFrame.

Examples:
>>> import dask.dataframe as dd
>>> import polars as pl
>>> import pyarrow as pa
>>> import narwhals as nw
>>> from narwhals.typing import IntoFrame
>>> from types import ModuleType

Let's create an agnostic function that lazily reads a csv file with a specified native namespace:

>>> def agnostic_scan_csv(native_namespace: ModuleType) -> IntoFrame:
... return (
... nw.scan_csv("file.csv", native_namespace=native_namespace)
... .to_native()
... .collect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect isn't guaranteed to be present on a native frame, can we keep it out of agnostic_scan_csv? you can just put collect as the end of the Polars example, and compute at the end of the dask example

either that, or put collect before to_native

... )

Then we can read the file by passing Polars or dask namespaces:

>>> agnostic_scan_csv(native_namespace=pl) # doctest:+SKIP
shape: (3, 2)
β”Œβ”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”
β”‚ a ┆ b β”‚
β”‚ --- ┆ --- β”‚
β”‚ i64 ┆ i64 β”‚
β•žβ•β•β•β•β•β•ͺ═════║
β”‚ 1 ┆ 4 β”‚
β”‚ 2 ┆ 5 β”‚
β”‚ 3 ┆ 6 β”‚
β””β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”˜
>>> agnostic_scan_csv(native_namespace=dd) # doctest:+SKIP
a b
0 1 4
1 2 5
2 3 6
"""
return _scan_csv_impl(source, native_namespace=native_namespace)


def _scan_csv_impl(
source: str,
*,
native_namespace: ModuleType,
) -> LazyFrame[Any]:
implementation = Implementation.from_native_namespace(native_namespace)
if implementation is Implementation.POLARS:
native_frame = native_namespace.scan_csv(source)
elif implementation in (
Implementation.PANDAS,
Implementation.MODIN,
Implementation.CUDF,
):
native_frame = native_namespace.read_csv(source)
elif implementation is Implementation.PYARROW:
from pyarrow import csv # ignore-banned-import

native_frame = csv.read_csv(source)
elif implementation is Implementation.DASK:
native_frame = native_namespace.read_csv(source)
else: # pragma: no cover
try:
# implementation is UNKNOWN, Narwhals extension using this feature should
# implement `scan_csv` function in the top-level namespace.
native_frame = native_namespace.scan_csv(source=source)
except AttributeError as e:
msg = "Unknown namespace is expected to implement `scan_csv` function."
raise AttributeError(msg) from e
return from_native(native_frame).lazy()
62 changes: 62 additions & 0 deletions narwhals/stable/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from narwhals.functions import _from_dict_impl
from narwhals.functions import _from_numpy_impl
from narwhals.functions import _new_series_impl
from narwhals.functions import _scan_csv_impl
from narwhals.functions import from_arrow as nw_from_arrow
from narwhals.functions import get_level
from narwhals.functions import show_versions
Expand Down Expand Up @@ -3385,6 +3386,66 @@ def from_numpy(
)


def scan_csv(
source: str,
*,
native_namespace: ModuleType,
) -> LazyFrame[Any]:
"""Lazily read from a CSV file.

This allows the query optimizer to push down predicates and projections
to the scan level, thereby potentially reducing memory overhead.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

For the libraries that do not support lazy dataframes, the function reads
a csv file eagerly and then converts the resulting dataframe to a lazyframe.

Arguments:
source: Path to a file.
native_namespace: The native library to use for DataFrame creation.

Returns:
LazyFrame.

Examples:
>>> import dask.dataframe as dd
>>> import polars as pl
>>> import pyarrow as pa
>>> import narwhals as nw
>>> from narwhals.typing import IntoFrame
>>> from types import ModuleType

Let's create an agnostic function that lazily reads a csv file with a specified native namespace:

>>> def agnostic_scan_csv(native_namespace: ModuleType) -> IntoFrame:
... return (
... nw.scan_csv("file.csv", native_namespace=native_namespace)
... .to_native()
... .collect()
... )

Then we can read the file by passing Polars or dask namespaces:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

by passing, for example, Polars or Dask namespaces

?


>>> agnostic_scan_csv(native_namespace=pl) # doctest:+SKIP
shape: (3, 2)
β”Œβ”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”
β”‚ a ┆ b β”‚
β”‚ --- ┆ --- β”‚
β”‚ i64 ┆ i64 β”‚
β•žβ•β•β•β•β•β•ͺ═════║
β”‚ 1 ┆ 4 β”‚
β”‚ 2 ┆ 5 β”‚
β”‚ 3 ┆ 6 β”‚
β””β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”˜
>>> agnostic_scan_csv(native_namespace=dd) # doctest:+SKIP
a b
0 1 4
1 2 5
2 3 6
"""
return _stableify( # type: ignore[no-any-return]
_scan_csv_impl(source, native_namespace=native_namespace)
)


__all__ = [
"Array",
"Boolean",
Expand Down Expand Up @@ -3449,6 +3510,7 @@ def from_numpy(
"narwhalify",
"new_series",
"nth",
"scan_csv",
"selectors",
"show_versions",
"sum",
Expand Down
43 changes: 43 additions & 0 deletions tests/scan_csv_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import polars as pl

import narwhals as nw
import narwhals.stable.v1 as nw_v1
from tests.utils import Constructor
from tests.utils import assert_equal_data

if TYPE_CHECKING:
import pytest

data = {"a": [1, 2, 3], "b": [4.5, 6.7, 8.9], "z": ["x", "y", "w"]}


def test_scan_csv(
tmpdir: pytest.TempdirFactory,
constructor: Constructor,
) -> None:
df_pl = pl.DataFrame(data)
filepath = str(tmpdir / "file.csv") # type: ignore[operator]
df_pl.write_csv(filepath)
df = nw.from_native(constructor(data))
native_namespace = nw.get_native_namespace(df)
result = nw.scan_csv(filepath, native_namespace=native_namespace)
assert_equal_data(result.collect(), data)
assert isinstance(result, nw.LazyFrame)


def test_scan_csv_v1(
tmpdir: pytest.TempdirFactory,
constructor: Constructor,
) -> None:
df_pl = pl.DataFrame(data)
filepath = str(tmpdir / "file.csv") # type: ignore[operator]
df_pl.write_csv(filepath)
df = nw_v1.from_native(constructor(data))
native_namespace = nw_v1.get_native_namespace(df)
result = nw_v1.scan_csv(filepath, native_namespace=native_namespace)
assert_equal_data(result.collect(), data)
assert isinstance(result, nw_v1.LazyFrame)
Loading