Skip to content

Commit

Permalink
update tests, lint
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-beedie committed Oct 27, 2024
1 parent 4e7858b commit 8dcc968
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 56 deletions.
25 changes: 14 additions & 11 deletions py-polars/polars/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,26 @@
__all__ = ["Config"]

TableFormatNames: TypeAlias = Literal[
"ASCII_FULL",
"ASCII_FULL_CONDENSED",
"ASCII_NO_BORDERS",
"ASCII_BORDERS_ONLY",
"ASCII_BORDERS_ONLY_CONDENSED",
"ASCII_FULL",
"ASCII_FULL_CONDENSED",
"ASCII_HORIZONTAL_ONLY",
"ASCII_MARKDOWN",
"ASCII_NO_BORDERS",
"MARKDOWN",
"NOTHING",
"UTF8_BORDERS_ONLY",
"UTF8_FULL",
"UTF8_FULL_CONDENSED",
"UTF8_NO_BORDERS",
"UTF8_BORDERS_ONLY",
"UTF8_HORIZONTAL_ONLY",
"NOTHING",
"UTF8_NO_BORDERS",
]

# note: register all Config-specific environment variable names here; need to constrain
# which 'POLARS_' environment variables are recognized, as there are other lower-level
# and/or unstable settings that should not be saved or reset with the Config vars.
_POLARS_CFG_ENV_VARS = {
"POLARS_WARN_UNSTABLE",
"POLARS_AUTO_STRUCTIFY",
"POLARS_FMT_MAX_COLS",
"POLARS_FMT_MAX_ROWS",
Expand All @@ -67,11 +66,12 @@
"POLARS_FMT_TABLE_HIDE_DATAFRAME_SHAPE_INFORMATION",
"POLARS_FMT_TABLE_INLINE_COLUMN_DATA_TYPE",
"POLARS_FMT_TABLE_ROUNDED_CORNERS",
"POLARS_MAX_EXPR_DEPTH",
"POLARS_STREAMING_CHUNK_SIZE",
"POLARS_TABLE_WIDTH",
"POLARS_TEMP_DIR",
"POLARS_VERBOSE",
"POLARS_MAX_EXPR_DEPTH",
"POLARS_WARN_UNSTABLE",
}

# vars that set the rust env directly should declare themselves here as the Config
Expand Down Expand Up @@ -525,6 +525,10 @@ def set_temp_dir(cls, path: str | Path | None = None) -> type[Config]:
Notes
-----
* This method sets the "POLARS_TEMP_DIR" environment variable, which
is only read once per session (on first use). Any subsequent changes
to this variable will *not* be picked up.
* Temporary files may be created in several situations; for example,
a streaming mode operation may spill intermediate results to disk,
cloud-based files may need local caching on download, and sink ops
Expand All @@ -538,12 +542,11 @@ def set_temp_dir(cls, path: str | Path | None = None) -> type[Config]:
----------
path : str, Path, None
Path to a directory to use for Polars' temporary files, such as
where streaming operations may spill to disk. Set `None` to use
the default temp directory.
where streaming operations may spill to disk.
Examples
--------
>>> pl.Config(temp_dir="/tmp/my_subdir/") # doctest: +SKIP
>>> pl.Config(temp_dir="/tmp/custom_directory/") # doctest: +SKIP
"""
if path is None:
os.environ.pop("POLARS_TEMP_DIR", None)
Expand Down
23 changes: 11 additions & 12 deletions py-polars/tests/unit/streaming/test_streaming.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import os
import time
from datetime import date
from pathlib import Path
Expand Down Expand Up @@ -70,8 +69,8 @@ def test_streaming_streamable_functions(monkeypatch: Any, capfd: Any) -> None:
"b": [1, 2, 3],
}

(_, err) = capfd.readouterr()
assert "df -> function -> ordered_sink" in err
(_, err) = capfd.readouterr()
assert "df -> function -> ordered_sink" in err


@pytest.mark.slow
Expand Down Expand Up @@ -123,15 +122,15 @@ def test_streaming_apply(monkeypatch: Any, capfd: Any) -> None:
monkeypatch.setenv("POLARS_VERBOSE", "1")

q = pl.DataFrame({"a": [1, 2]}).lazy()

with pytest.warns(PolarsInefficientMapWarning, match="with this one instead"):
(
q.select(
pl.col("a").map_elements(lambda x: x * 2, return_dtype=pl.Int64)
).collect(streaming=True)
)
(_, err) = capfd.readouterr()
assert "df -> projection -> ordered_sink" in err
with pl.Config(verbose=True): # noqa: SIM117
with pytest.warns(PolarsInefficientMapWarning, match="with this one instead"):
(
q.select(
pl.col("a").map_elements(lambda x: x * 2, return_dtype=pl.Int64)
).collect(streaming=True)
)
(_, err) = capfd.readouterr()
assert "df -> projection -> ordered_sink" in err


def test_streaming_ternary() -> None:
Expand Down
82 changes: 49 additions & 33 deletions py-polars/tests/unit/streaming/test_streaming_unique.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING, Any

import pytest
Expand All @@ -19,36 +20,51 @@ def test_streaming_out_of_core_unique(
io_files_path: Path, tmp_path: Path, monkeypatch: Any, capfd: Any
) -> None:
tmp_path.mkdir(exist_ok=True)
monkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))
monkeypatch.setenv("POLARS_FORCE_OOC", "1")
monkeypatch.setenv("POLARS_VERBOSE", "1")
monkeypatch.setenv("POLARS_STREAMING_GROUPBY_SPILL_SIZE", "256")
df = pl.read_csv(io_files_path / "foods*.csv")
# this creates 10M rows
q = df.lazy()
q = q.join(q, how="cross").select(df.columns).head(10_000)

# uses out-of-core unique
df1 = q.join(q.head(1000), how="cross").unique().collect(streaming=True)
# this ensures the cross join gives equal result but uses the in-memory unique
df2 = q.join(q.head(1000), how="cross").collect(streaming=True).unique()
assert df1.shape == df2.shape

# TODO: Re-enable this check when this issue is fixed: https://github.com/pola-rs/polars/issues/10466
_ = capfd.readouterr().err
# assert "OOC group_by started" in err


def test_streaming_unique(monkeypatch: Any, capfd: Any) -> None:
monkeypatch.setenv("POLARS_VERBOSE", "1")
df = pl.DataFrame({"a": [1, 2, 2, 2], "b": [3, 4, 4, 4], "c": [5, 6, 7, 7]})
q = df.lazy().unique(subset=["a", "c"], maintain_order=False).sort(["a", "b", "c"])
assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False))

q = df.lazy().unique(subset=["b", "c"], maintain_order=False).sort(["a", "b", "c"])
assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False))

q = df.lazy().unique(subset=None, maintain_order=False).sort(["a", "b", "c"])
assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False))
(_, err) = capfd.readouterr()
assert "df -> re-project-sink -> sort_multiple" in err
with pl.Config(
temp_dir=tmp_path,
verbose=True,
):
monkeypatch.setenv("POLARS_FORCE_OOC", "1")
monkeypatch.setenv("POLARS_STREAMING_GROUPBY_SPILL_SIZE", "256")

df = pl.read_csv(io_files_path / "foods*.csv")
# this creates 10M rows
q = df.lazy()
q = q.join(q, how="cross").select(df.columns).head(10_000)

# uses out-of-core unique
df1 = q.join(q.head(1000), how="cross").unique().collect(streaming=True)

# confirm that the custom temp path was used to spill ooc op to disk
assert os.listdir(tmp_path), f"Temp directory '{tmp_path}' should not be empty"

# this ensures the cross join gives equal result but uses the in-memory unique
df2 = q.join(q.head(1000), how="cross").collect(streaming=True).unique()
assert df1.shape == df2.shape

# TODO: Re-enable this check when this issue is fixed: https://github.com/pola-rs/polars/issues/10466
_ = capfd.readouterr().err
# assert "OOC group_by started" in err


def test_streaming_unique(capfd: Any) -> None:
with pl.Config(verbose=True):
df = pl.DataFrame({"a": [1, 2, 2, 2], "b": [3, 4, 4, 4], "c": [5, 6, 7, 7]})
q = (
df.lazy()
.unique(subset=["a", "c"], maintain_order=False)
.sort(["a", "b", "c"])
)
assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False))

q = (
df.lazy()
.unique(subset=["b", "c"], maintain_order=False)
.sort(["a", "b", "c"])
)
assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False))

q = df.lazy().unique(subset=None, maintain_order=False).sort(["a", "b", "c"])
assert_frame_equal(q.collect(streaming=True), q.collect(streaming=False))
(_, err) = capfd.readouterr()
assert "df -> re-project-sink -> sort_multiple" in err

0 comments on commit 8dcc968

Please sign in to comment.