Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interchange protocol fixes and updates #2150

Merged
merged 12 commits into from
Oct 5, 2022
76 changes: 46 additions & 30 deletions packages/vaex-core/vaex/dataframe_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,21 @@ def buffer_to_ndarray(_buffer, _dtype) -> np.ndarray:
return x


def convert_categorical_column(col: ColumnObject) -> pa.DictionaryArray:
def convert_categorical_column(col: ColumnObject) -> Tuple[pa.DictionaryArray, Any]:
"""
Convert a categorical column to an arrow dictionary
"""
ordered, is_dict, mapping = col.describe_categorical
if not is_dict:
catinfo = col.describe_categorical
if not catinfo["is_dictionary"]:
raise NotImplementedError("Non-dictionary categoricals not supported yet")
assert catinfo["categories"] is not None # sanity check

if not col.describe_null[0] in (0, 2, 3, 4):
raise NotImplementedError("Only categorical columns with sentinel " "value and masks supported at the moment")
raise NotImplementedError(
"Only categorical columns with sentinel "
"value and masks supported at the moment"
)

categories = np.asarray(list(mapping.values()))
codes_buffer, codes_dtype = col.get_buffers()["data"]
codes = buffer_to_ndarray(codes_buffer, codes_dtype)

Expand All @@ -181,13 +184,18 @@ def convert_categorical_column(col: ColumnObject) -> pa.DictionaryArray:
else:
indices = pa.array(codes)

dictionary = pa.array(categories)
values = pa.DictionaryArray.from_arrays(indices, dictionary)
labels_buffer, labels_dtype = catinfo["categories"].get_buffers()["data"]
if labels_dtype[0] == _DtypeKind.STRING:
labels, _ = convert_string_column(catinfo["categories"])
else:
labels = buffer_to_ndarray(labels_buffer, labels_dtype)

values = pa.DictionaryArray.from_arrays(indices, labels)

return values, codes_buffer


def convert_string_column(col: ColumnObject) -> pa.Array:
def convert_string_column(col: ColumnObject) -> Tuple[pa.Array, list]:
"""
Convert a string column to a Arrow array.
"""
Expand Down Expand Up @@ -315,12 +323,17 @@ def __init__(self, column: vaex.expression.Expression, allow_copy: bool = True)
self._col = column
self._allow_copy = allow_copy

@property
def size(self) -> int:
"""
Size of the column, in elements.

Corresponds to DataFrame.num_rows() if column is a single chunk;
equal to size of this current chunk otherwise.

Is a method rather than a property because it may cause a (potentially
expensive) computation for some dataframe implementations.
"""
return self._col.df.count("*")
return int(len(self._col.df))

@property
def offset(self) -> int:
Expand Down Expand Up @@ -415,32 +428,35 @@ def _dtype_from_vaexdtype(self, dtype) -> Tuple[enum.IntEnum, int, str, str]:
def describe_categorical(self) -> Dict[str, Any]:
"""
If the dtype is categorical, there are two options:

- There are only values in the data buffer.
- There is a separate dictionary-style encoding for categorical values.
- There is a separate non-categorical Column encoding categorical values.

Raises RuntimeError if the dtype is not categorical

Content of returned dict:
Raises TypeError if the dtype is not categorical

Returns the dictionary with description on how to interpret the data buffer:
- "is_ordered" : bool, whether the ordering of dictionary indices is
semantically meaningful.
- "is_dictionary" : bool, whether a dictionary-style mapping of
- "is_dictionary" : bool, whether a mapping of
categorical values to other objects exists
- "mapping" : dict, Python-level only (e.g. ``{int: str}``).
None if not a dictionary-style categorical.
- "categories" : Column representing the (implicit) mapping of indices to
category values (e.g. an array of cat1, cat2, ...).
None if not a dictionary-style categorical.

TBD: are there any other in-memory representations that are needed?
"""
if not self.dtype[0] == _DtypeKind.CATEGORICAL:
raise TypeError("`describe_categorical only works on a column with " "categorical dtype!")

ordered = False
is_dictionary = True
if not isinstance(self._col.values, np.ndarray) and isinstance(self._col.values.type, pa.DictionaryType):
categories = self._col.values.dictionary.tolist()
else:
categories = self._col.df.category_labels(self._col)
mapping = {ix: val for ix, val in enumerate(categories)}
return ordered, is_dictionary, mapping
raise TypeError(
"describe_categorical only works on a column with "
"categorical dtype!"
)
df = vaex.from_dict({"labels": self._col.df.category_labels(self._col)})
labels = df["labels"]
categories = _VaexColumn(labels)
return {
"is_ordered": False,
"is_dictionary": True,
"categories": categories,
}

@property
def describe_null(self) -> Tuple[int, Any]:
Expand Down Expand Up @@ -526,15 +542,15 @@ def get_chunks(self, n_chunks: Optional[int] = None) -> Iterable["_VaexColumn"]:
See `DataFrame.get_chunks` for details on ``n_chunks``.
"""
if n_chunks == None:
size = self.size
size = self.size()
n_chunks = self.num_chunks()
i = self._col.df.evaluate_iterator(self._col, chunk_size=size // n_chunks)
iterator = []
for i1, i2, chunk in i:
iterator.append(_VaexColumn(self._col[i1:i2]))
return iterator
elif self.num_chunks == 1:
size = self.size
size = self.size()
i = self._col.df.evaluate_iterator(self._col, chunk_size=size // n_chunks)
iterator = []
for i1, i2, chunk in i:
Expand Down
139 changes: 126 additions & 13 deletions tests/dataframe_protocol_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@
from vaex.dataframe_protocol import _from_dataframe_to_vaex, _DtypeKind, _VaexBuffer, _VaexColumn, _VaexDataFrame


xfail_memory_bug = pytest.mark.xfail(
reason=(
"Erroneous due to bug where memory is released prematurely - "
"see https://github.com/vaexio/vaex/pull/2150#issuecomment-1263336551"
)
)


def test_float_only(df_factory):
df = df_factory(x=[1.5, 2.5, 3.5], y=[9.2, 10.5, 11.8])
df2 = _from_dataframe_to_vaex(df.__dataframe__())
Expand Down Expand Up @@ -48,7 +56,7 @@ def test_mixed_intfloatbool(df_factory):

# Additionl tests for _VaexColumn
assert df2.__dataframe__().get_column_by_name("x")._allow_copy == True
assert df2.__dataframe__().get_column_by_name("x").size == 3
assert df2.__dataframe__().get_column_by_name("x").size() == 3
assert df2.__dataframe__().get_column_by_name("x").offset == 0

assert df2.__dataframe__().get_column_by_name("z").dtype[0] == 2 # 2: float64
Expand Down Expand Up @@ -119,22 +127,32 @@ def test_missing_from_masked(df_factory_numpy):
assert_dataframe_equal(df.__dataframe__(), df)


@xfail_memory_bug
def test_categorical():
df = vaex.from_arrays(year=[2012, 2013, 2015, 2019], weekday=[0, 1, 4, 6])
df = df.categorize("year", min_value=2012, max_value=2019)
df = df.categorize("weekday", labels=["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"])

# Some detailed testing for correctness of dtype and null handling:
col = df.__dataframe__().get_column_by_name("year")
assert col.dtype[0] == _DtypeKind.CATEGORICAL
assert col.describe_categorical == (False, True, {0: 2012, 1: 2013, 2: 2014, 3: 2015, 4: 2016, 5: 2017, 6: 2018, 7: 2019})
assert col.dtype == (_DtypeKind.CATEGORICAL, 64, "u", "=")
catinfo = col.describe_categorical
assert not catinfo["is_ordered"]
assert catinfo["is_dictionary"]
assert catinfo["categories"]._col.tolist() == [
2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019
]
assert col.describe_null == (0, None)
assert col.dtype == (23, 64, "u", "=")

col2 = df.__dataframe__().get_column_by_name("weekday")
assert col2.dtype[0] == _DtypeKind.CATEGORICAL
assert col2.describe_categorical == (False, True, {0: "Mon", 1: "Tue", 2: "Wed", 3: "Thu", 4: "Fri", 5: "Sat", 6: "Sun"})
catinfo2 = col2.describe_categorical
assert not catinfo2["is_ordered"]
assert catinfo2["is_dictionary"]
assert catinfo2["categories"]._col.tolist() == [
"Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"
]
assert col2.dtype == (_DtypeKind.CATEGORICAL, 64, "u", "=")
assert col2.describe_null == (0, None)
assert col2.dtype == (23, 64, "u", "=")

df2 = _from_dataframe_to_vaex(df.__dataframe__())
assert df2["year"].tolist() == [2012, 2013, 2015, 2019]
Expand All @@ -143,6 +161,7 @@ def test_categorical():
assert_dataframe_equal(df.__dataframe__(), df)


@xfail_memory_bug
def test_arrow_dictionary():
indices = pa.array([0, 1, 0, 1, 2, 0, 1, 2])
dictionary = pa.array(["foo", "bar", "baz"])
Expand All @@ -152,7 +171,10 @@ def test_arrow_dictionary():
# Some detailed testing for correctness of dtype and null handling:
col = df.__dataframe__().get_column_by_name("x")
assert col.dtype[0] == _DtypeKind.CATEGORICAL
assert col.describe_categorical == (False, True, {0: "foo", 1: "bar", 2: "baz"})
catinfo = col.describe_categorical
assert not catinfo["is_ordered"]
assert catinfo["is_dictionary"]
assert catinfo["categories"]._col.tolist() == ["foo", "bar", "baz"]
if df['x'].dtype.is_arrow:
assert col.describe_null == (3, 0)
else:
Expand All @@ -166,6 +188,7 @@ def test_arrow_dictionary():
assert_dataframe_equal(df.__dataframe__(), df)


@xfail_memory_bug
def test_arrow_dictionary_missing():
indices = pa.array([0, 1, 2, 0, 1], mask=np.array([0, 1, 1, 0, 0], dtype=bool))
dictionary = pa.array(["aap", "noot", "mies"])
Expand All @@ -175,7 +198,10 @@ def test_arrow_dictionary_missing():
# Some detailed testing for correctness of dtype and null handling:
col = df.__dataframe__().get_column_by_name("x")
assert col.dtype[0] == _DtypeKind.CATEGORICAL
assert col.describe_categorical == (False, True, {0: "aap", 1: "noot", 2: "mies"})
catinfo = col.describe_categorical
assert not catinfo["is_ordered"]
assert catinfo["is_dictionary"]
assert catinfo["categories"]._col.tolist() == ["aap", "noot", "mies"]

df2 = _from_dataframe_to_vaex(df.__dataframe__())
assert df2.x.tolist() == df.x.tolist()
Expand All @@ -190,7 +216,7 @@ def test_string():
col = df.__dataframe__().get_column_by_name("A")

assert col._col.tolist() == df.A.tolist()
assert col.size == 5
assert col.size() == 5
assert col.null_count == 1
assert col.dtype[0] == _DtypeKind.STRING
assert col.describe_null == (3,0)
Expand All @@ -203,7 +229,7 @@ def test_string():

df_sliced = df[1:]
col = df_sliced.__dataframe__().get_column_by_name("A")
assert col.size == 4
assert col.size() == 4
assert col.null_count == 1
assert col.dtype[0] == _DtypeKind.STRING
assert col.describe_null == (3,0)
Expand Down Expand Up @@ -246,7 +272,7 @@ def test_object():
col = df.__dataframe__().get_column_by_name("x")

assert col._col.tolist() == df.x.tolist()
assert col.size == 3
assert col.size() == 3

with pytest.raises(ValueError):
assert col.dtype
Expand Down Expand Up @@ -324,7 +350,7 @@ def assert_buffer_equal(buffer_dtype: Tuple[_VaexBuffer, Any], vaexcol: vaex.exp


def assert_column_equal(col: _VaexColumn, vaexcol: vaex.expression.Expression):
assert col.size == vaexcol.df.count("*")
assert col.size() == vaexcol.df.count("*")
assert col.offset == 0
assert col.null_count == vaexcol.countmissing()
assert_buffer_equal(col._get_data_buffer(), vaexcol)
Expand Down Expand Up @@ -363,6 +389,17 @@ def test_null_count(df_factory, x):
assert interchange_col.null_count == 1


def test_size(df_factory):
# See https://github.com/vaexio/vaex/issues/2093
x = np.arange(5)
df = df_factory(x=x)
interchange_df = df.__dataframe__()
interchange_col = interchange_df.get_column_by_name('x')
size = interchange_col.size()
assert isinstance(size, int)
assert size == 5


def test_smoke_get_buffers_on_categorical_columns(df_factory):
# See https://github.com/vaexio/vaex/issues/2134#issuecomment-1195731379
x = np.array([3, 1, 1, 2, 0])
Expand All @@ -371,3 +408,79 @@ def test_smoke_get_buffers_on_categorical_columns(df_factory):
interchange_df = df.__dataframe__()
interchange_col = interchange_df.get_column_by_name('x')
interchange_col.get_buffers()


@pytest.mark.xfail()
def test_interchange_pandas_string_column():
import pandas as pd
data = ["foo", "bar"]
try:
from pandas.api.interchange import from_dataframe
except ImportError:
pytest.skip(f"pandas.api.interchange not found ({pd.__version__})")
pd_df = pd.DataFrame({"x": pd.Series(data, dtype=pd.StringDtype())})
pd_interchange_df = pd_df.__dataframe__()
vaex_df = _from_dataframe_to_vaex(pd_interchange_df)
assert vaex_df["x"].tolist() == data


def test_string_buffers(df_factory):
data = ["foo", "bar"]
x = np.array(data, dtype="U8")
df = df_factory(x=x)
if isinstance(df["x"].values, pa.lib.ChunkedArray):
pytest.xfail()
interchange_df = df.__dataframe__()
roundtrip_df = _from_dataframe_to_vaex(interchange_df)
assert roundtrip_df["x"].tolist() == data


@pytest.mark.parametrize(
"labels", [[10, 11, 12, 13], ["foo", "bar", "baz", "qux"]]
)
def test_describe_categorical(df_factory, labels):
# See https://github.com/vaexio/vaex/issues/2113
data = [3, 1, 1, 2, 0]
x = np.array(data)
df = df_factory(x=x)
df = df.categorize('x', labels=labels)
interchange_df = df.__dataframe__()
interchange_col = interchange_df.get_column_by_name('x')
catinfo = interchange_col.describe_categorical
assert isinstance(catinfo, dict)
assert isinstance(catinfo["is_ordered"], bool)
assert isinstance(catinfo["is_dictionary"], bool)
assert catinfo["is_dictionary"]
assert isinstance(catinfo["categories"], _VaexColumn)
assert catinfo["categories"]._col.tolist() == labels


@xfail_memory_bug
@pytest.mark.parametrize(
"labels", [[10, 11, 12, 13], ["foo", "bar", "baz", "qux"]]
)
def test_interchange_categorical_column(df_factory, labels):
data = [3, 1, 1, 2, 0]
x = np.array(data)
df = df_factory(x=x)
if isinstance(df["x"].values, pa.lib.ChunkedArray):
pytest.xfail()
df = df.categorize('x', labels=labels)
interchange_df = df.__dataframe__()
interchange_col = interchange_df.get_column_by_name('x')
roundtrip_df = _from_dataframe_to_vaex(interchange_df)
data_as_labels = [labels[i] for i in data]
assert roundtrip_df["x"].values.tolist() == data_as_labels
assert roundtrip_df.category_labels("x") == labels


@pytest.mark.parametrize("n_chunks", [None, 1])
def test_smoke_get_chunks(df_factory, n_chunks):
if n_chunks is not None:
pytest.xfail("get_chunks(n_chunks=...) doesn't work on already chunked columns")
df = df_factory(x=[0])
interchange_df = df.__dataframe__()
interchange_col = interchange_df.get_column_by_name('x')
if isinstance(df["x"].values, pa.lib.ChunkedArray):
pytest.skip("get_chunks() is slow/halts with chunked arrow arrays")
interchange_col.get_chunks(n_chunks=n_chunks)