From d5c9766390c11ec3782964ea38b503d3f81f3b65 Mon Sep 17 00:00:00 2001 From: raisadz <34237447+raisadz@users.noreply.github.com> Date: Tue, 10 Dec 2024 13:19:21 +0000 Subject: [PATCH 1/2] add scan_csv --- docs/api-reference/narwhals.md | 1 + narwhals/__init__.py | 2 + narwhals/functions.py | 85 ++++++++++++++++++++++++++++++++++ narwhals/stable/v1/__init__.py | 58 +++++++++++++++++++++++ tests/scan_csv_test.py | 43 +++++++++++++++++ 5 files changed, 189 insertions(+) create mode 100644 tests/scan_csv_test.py diff --git a/docs/api-reference/narwhals.md b/docs/api-reference/narwhals.md index d1423b9cc..f94aabea1 100644 --- a/docs/api-reference/narwhals.md +++ b/docs/api-reference/narwhals.md @@ -37,6 +37,7 @@ Here are the top-level functions available in Narwhals. - narwhalify - new_series - nth + - scan_csv - sum - sum_horizontal - show_versions diff --git a/narwhals/__init__.py b/narwhals/__init__.py index 1885a9153..209015f9f 100644 --- a/narwhals/__init__.py +++ b/narwhals/__init__.py @@ -55,6 +55,7 @@ from narwhals.functions import from_numpy from narwhals.functions import get_level from narwhals.functions import new_series +from narwhals.functions import scan_csv from narwhals.functions import show_versions from narwhals.schema import Schema from narwhals.series import Series @@ -138,6 +139,7 @@ "narwhalify", "new_series", "nth", + "scan_csv", "selectors", "show_versions", "stable", diff --git a/narwhals/functions.py b/narwhals/functions.py index 4e31d5b41..bcc142090 100644 --- a/narwhals/functions.py +++ b/narwhals/functions.py @@ -936,3 +936,88 @@ def get_level( - 'interchange': only metadata operations are supported (`df.schema`) """ return obj._level + + +def scan_csv( + source: str, + *, + native_namespace: ModuleType, +) -> LazyFrame[Any]: + """Lazily read from a CSV file. + + This allows the query optimizer to push down predicates and projections + to the scan level, thereby potentially reducing memory overhead. + For the libraries that do not support lazy dataframes, the function reads + a csv file eagerly and then converts the resulting dataframe to a lazyframe. + + Arguments: + source: Path to a file. + native_namespace: The native library to use for DataFrame creation. + + Returns: + LazyFrame. + + Examples: + >>> import dask.dataframe as dd + >>> import polars as pl + >>> import pyarrow as pa + >>> import narwhals as nw + >>> from narwhals.typing import IntoFrame + >>> from types import ModuleType + + Let's create an agnostic function that lazily reads a csv file with a specified native namespace: + + >>> def agnostic_scan_csv(native_namespace: ModuleType) -> IntoFrame: + ... return nw.scan_csv("file.csv", native_namespace=native_namespace).to_native() + + Then we can read the file by passing Polars or dask namespaces: + + >>> agnostic_scan_csv(native_namespace=pl) # doctest:+SKIP + shape: (3, 2) + ┌─────┬─────┐ + │ a ┆ b │ + │ --- ┆ --- │ + │ i64 ┆ i64 │ + ╞═════╪═════╡ + │ 1 ┆ 4 │ + │ 2 ┆ 5 │ + │ 3 ┆ 6 │ + └─────┴─────┘ + >>> agnostic_scan_csv(native_namespace=dd) # doctest:+SKIP + a b + 0 1 4 + 1 2 5 + 2 3 6 + """ + return _scan_csv_impl(source, native_namespace=native_namespace) + + +def _scan_csv_impl( + source: str, + *, + native_namespace: ModuleType, +) -> LazyFrame[Any]: + implementation = Implementation.from_native_namespace(native_namespace) + if implementation is Implementation.POLARS: + native_frame = native_namespace.scan_csv(source) + elif implementation in ( + Implementation.PANDAS, + Implementation.MODIN, + Implementation.CUDF, + ): + native_frame = native_namespace.read_csv(source) + elif implementation is Implementation.PYARROW: + from pyarrow import csv # ignore-banned-import + + native_frame = csv.read_csv(source) + elif implementation is Implementation.DASK: + native_frame = native_namespace.read_csv(source) + else: # pragma: no cover + try: + # implementation is UNKNOWN, Narwhals extension using this feature should + # implement `scan_csv` function in the top-level namespace. + native_frame = native_namespace.scan_csv(source=source) + except AttributeError as e: + msg = "Unknown namespace is expected to implement `scan_csv` function." + raise AttributeError(msg) from e + return from_native(native_frame).lazy() diff --git a/narwhals/stable/v1/__init__.py b/narwhals/stable/v1/__init__.py index e5b84cff8..3377c7628 100644 --- a/narwhals/stable/v1/__init__.py +++ b/narwhals/stable/v1/__init__.py @@ -24,6 +24,7 @@ from narwhals.functions import _from_dict_impl from narwhals.functions import _from_numpy_impl from narwhals.functions import _new_series_impl +from narwhals.functions import _scan_csv_impl from narwhals.functions import from_arrow as nw_from_arrow from narwhals.functions import get_level from narwhals.functions import show_versions @@ -3385,6 +3386,62 @@ def from_numpy( ) +def scan_csv( + source: str, + *, + native_namespace: ModuleType, +) -> LazyFrame[Any]: + """Lazily read from a CSV file. + + This allows the query optimizer to push down predicates and projections + to the scan level, thereby potentially reducing memory overhead. + For the libraries that do not support lazy dataframes, the function reads + a csv file eagerly and then converts the resulting dataframe to a lazyframe. + + Arguments: + source: Path to a file. + native_namespace: The native library to use for DataFrame creation. + + Returns: + LazyFrame. + + Examples: + >>> import dask.dataframe as dd + >>> import polars as pl + >>> import pyarrow as pa + >>> import narwhals as nw + >>> from narwhals.typing import IntoFrame + >>> from types import ModuleType + + Let's create an agnostic function that lazily reads a csv file with a specified native namespace: + + >>> def agnostic_scan_csv(native_namespace: ModuleType) -> IntoFrame: + ... return nw.scan_csv("file.csv", native_namespace=native_namespace).to_native() + + Then we can read the file by passing Polars or dask namespaces: + + >>> agnostic_scan_csv(native_namespace=pl) # doctest:+SKIP + shape: (3, 2) + ┌─────┬─────┐ + │ a ┆ b │ + │ --- ┆ --- │ + │ i64 ┆ i64 │ + ╞═════╪═════╡ + │ 1 ┆ 4 │ + │ 2 ┆ 5 │ + │ 3 ┆ 6 │ + └─────┴─────┘ + >>> agnostic_scan_csv(native_namespace=dd) # doctest:+SKIP + a b + 0 1 4 + 1 2 5 + 2 3 6 + """ + return _stableify( # type: ignore[no-any-return] + _scan_csv_impl(source, native_namespace=native_namespace) + ) + + __all__ = [ "Array", "Boolean", @@ -3449,6 +3506,7 @@ def from_numpy( "narwhalify", "new_series", "nth", + "scan_csv", "selectors", "show_versions", "sum", diff --git a/tests/scan_csv_test.py b/tests/scan_csv_test.py new file mode 100644 index 000000000..4f0083667 --- /dev/null +++ b/tests/scan_csv_test.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import polars as pl + +import narwhals as nw +import narwhals.stable.v1 as nw_v1 +from tests.utils import Constructor +from tests.utils import assert_equal_data + +if TYPE_CHECKING: + import pytest + +data = {"a": [1, 2, 3], "b": [4.5, 6.7, 8.9], "z": ["x", "y", "w"]} + + +def test_scan_csv( + tmpdir: pytest.TempdirFactory, + constructor: Constructor, +) -> None: + df_pl = pl.DataFrame(data) + filepath = str(tmpdir / "file.csv") # type: ignore[operator] + df_pl.write_csv(filepath) + df = nw.from_native(constructor(data)) + native_namespace = nw.get_native_namespace(df) + result = nw.scan_csv(filepath, native_namespace=native_namespace) + assert_equal_data(result.collect(), data) + assert isinstance(result, nw.LazyFrame) + + +def test_scan_csv_v1( + tmpdir: pytest.TempdirFactory, + constructor: Constructor, +) -> None: + df_pl = pl.DataFrame(data) + filepath = str(tmpdir / "file.csv") # type: ignore[operator] + df_pl.write_csv(filepath) + df = nw_v1.from_native(constructor(data)) + native_namespace = nw_v1.get_native_namespace(df) + result = nw_v1.scan_csv(filepath, native_namespace=native_namespace) + assert_equal_data(result.collect(), data) + assert isinstance(result, nw_v1.LazyFrame) From ff34f3e5fb064fd6739a6bb227483387797c4667 Mon Sep 17 00:00:00 2001 From: raisadz <34237447+raisadz@users.noreply.github.com> Date: Tue, 10 Dec 2024 14:46:56 +0000 Subject: [PATCH 2/2] add collect --- narwhals/functions.py | 6 +++++- narwhals/stable/v1/__init__.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/narwhals/functions.py b/narwhals/functions.py index bcc142090..cc61e9f37 100644 --- a/narwhals/functions.py +++ b/narwhals/functions.py @@ -968,7 +968,11 @@ def scan_csv( Let's create an agnostic function that lazily reads a csv file with a specified native namespace: >>> def agnostic_scan_csv(native_namespace: ModuleType) -> IntoFrame: - ... return nw.scan_csv("file.csv", native_namespace=native_namespace).to_native() + ... return ( + ... nw.scan_csv("file.csv", native_namespace=native_namespace) + ... .to_native() + ... .collect() + ... ) Then we can read the file by passing Polars or dask namespaces: diff --git a/narwhals/stable/v1/__init__.py b/narwhals/stable/v1/__init__.py index 3377c7628..15ca3fefb 100644 --- a/narwhals/stable/v1/__init__.py +++ b/narwhals/stable/v1/__init__.py @@ -3416,7 +3416,11 @@ def scan_csv( Let's create an agnostic function that lazily reads a csv file with a specified native namespace: >>> def agnostic_scan_csv(native_namespace: ModuleType) -> IntoFrame: - ... return nw.scan_csv("file.csv", native_namespace=native_namespace).to_native() + ... return ( + ... nw.scan_csv("file.csv", native_namespace=native_namespace) + ... .to_native() + ... .collect() + ... ) Then we can read the file by passing Polars or dask namespaces: