Skip to content

Commit

Permalink
[SPARK-51112][CONNECT] Avoid using pyarrow's to_pandas on an empty …
Browse files Browse the repository at this point in the history
…table

### What changes were proposed in this pull request?

When the `pyarrow` table is empty, avoid calling the `to_pandas` method due to potential segfault failures. Instead, an empty pandas dataframe is created manually.

### Why are the changes needed?

Consider the following code:
```python
from pyspark.sql.types import StructField, ArrayType, StringType, StructType, IntegerType
import faulthandler
faulthandler.enable()
spark = SparkSession.builder \
    .remote("sc://localhost:15002") \
    .getOrCreate()
sp_df = spark.createDataFrame(
    data = [],
    schema=StructType(
        [
            StructField(
                name='b_int',
                dataType=IntegerType(),
                nullable=False,
            ),
            StructField(
                name='b',
                dataType=ArrayType(ArrayType(StringType(), True), True),
                nullable=True,
            ),
        ]
    )
)
print(sp_df)
print('Spark dataframe generated.')
print(sp_df.toPandas())
print('Pandas dataframe generated.')
```
Executing this may lead to a segfault when the line `sp_df.toPandas()` is run.
Example:
```
Thread 0x00000001f1904f40 (most recent call first):
  File "/Users/venkata.gudesa/spark/test_env/lib/python3.13/site-packages/pyarrow/pandas_compat.py", line 808 in table_to_dataframe
  File "/Users/venkata.gudesa/spark/test_env/lib/python3.13/site-packages/pyspark/sql/connect/client/core.py", line 949 in to_pandas
  File "/Users/venkata.gudesa/spark/test_env/lib/python3.13/site-packages/pyspark/sql/connect/dataframe.py", line 1857 in toPandas
  File "<python-input-3>", line 1 in <module>
  File "/opt/homebrew/Cellar/python3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/code.py", line 92 in runcode
  File "/opt/homebrew/Cellar/python3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/_pyrepl/console.py", line 205 in runsource
  File "/opt/homebrew/Cellar/python3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/code.py", line 313 in push
  File "/opt/homebrew/Cellar/python3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/_pyrepl/simple_interact.py", line 160 in run_multiline_interactive_console
  File "/opt/homebrew/Cellar/python3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/_pyrepl/main.py", line 59 in interactive_console
  File "/opt/homebrew/Cellar/python3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/_pyrepl/__main__.py", line 6 in <module>
  File "<frozen runpy>", line 88 in _run_code
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49834 from vicennial/SPARK-51112.

Lead-authored-by: vicennial <venkata.gudesa@databricks.com>
Co-authored-by: Venkata Sai Akhil Gudesa <gvs.akhil1997@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
2 people authored and HyukjinKwon committed Feb 11, 2025
1 parent c91fdd4 commit 9d88020
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 27 deletions.
60 changes: 33 additions & 27 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,33 +933,39 @@ def to_pandas(
schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True)
assert schema is not None and isinstance(schema, StructType)

# Rename columns to avoid duplicated column names.
renamed_table = table.rename_columns([f"col_{i}" for i in range(table.num_columns)])

pandas_options = {}
if self_destruct:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
# split_blocks - create a separate Pandas block for each column
# use_threads - convert one column at a time
pandas_options.update(
{
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
)
if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"):
# A legacy option to coerce date32, date64, duration, and timestamp
# time units to nanoseconds when converting to pandas.
# This option can only be added since 13.0.0.
pandas_options.update(
{
"coerce_temporal_nanoseconds": True,
}
)
pdf = renamed_table.to_pandas(**pandas_options)
pdf.columns = schema.names
# SPARK-51112: If the table is empty, we avoid using pyarrow to_pandas to create the
# DataFrame, as it may fail with a segmentation fault. Instead, we create an empty pandas
# DataFrame manually with the correct schema.
if table.num_rows == 0:
pdf = pd.DataFrame(columns=schema.names)
else:
# Rename columns to avoid duplicated column names.
renamed_table = table.rename_columns([f"col_{i}" for i in range(table.num_columns)])

pandas_options = {}
if self_destruct:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
# split_blocks - create a separate Pandas block for each column
# use_threads - convert one column at a time
pandas_options.update(
{
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
)
if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"):
# A legacy option to coerce date32, date64, duration, and timestamp
# time units to nanoseconds when converting to pandas.
# This option can only be added since 13.0.0.
pandas_options.update(
{
"coerce_temporal_nanoseconds": True,
}
)
pdf = renamed_table.to_pandas(**pandas_options)
pdf.columns = schema.names

if len(pdf.columns) > 0:
timezone: Optional[str] = None
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/sql/tests/connect/test_parity_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def test_to_pandas_with_duplicated_column_names(self):
def test_to_pandas_from_mixed_dataframe(self):
self.check_to_pandas_from_mixed_dataframe()

def test_to_pandas_for_empty_df_with_nested_array_columns(self):
self.check_to_pandas_for_empty_df_with_nested_array_columns()


if __name__ == "__main__":
import unittest
Expand Down
37 changes: 37 additions & 0 deletions python/pyspark/sql/tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import unittest

from pyspark.sql.types import (
ArrayType,
StringType,
IntegerType,
StructType,
StructField,
BooleanType,
DateType,
TimestampType,
Expand All @@ -35,6 +37,7 @@
pandas_requirement_message,
pyarrow_requirement_message,
)
from pyspark.testing.utils import assertDataFrameEqual


class DataFrameCollectionTestsMixin:
Expand Down Expand Up @@ -289,6 +292,40 @@ def check_to_pandas_for_array_of_struct(self, is_arrow_enabled):
else:
self.assertEqual(type(pdf["array_struct_col"][0]), list)

@unittest.skipIf(
not have_pandas or not have_pyarrow,
pandas_requirement_message or pyarrow_requirement_message,
)
def test_to_pandas_for_empty_df_with_nested_array_columns(self):
for arrow_enabled in [False, True]:
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}):
self.check_to_pandas_for_empty_df_with_nested_array_columns()

def check_to_pandas_for_empty_df_with_nested_array_columns(self):
# SPARK-51112: Segfault must not occur when converting empty DataFrame with nested array
# columns to pandas DataFrame.
import pandas as pd

df = self.spark.createDataFrame(
data=[],
schema=StructType(
[
StructField(
name="b_int",
dataType=IntegerType(),
nullable=False,
),
StructField(
name="b",
dataType=ArrayType(ArrayType(StringType(), True), True),
nullable=True,
),
]
),
)
expected_pdf = pd.DataFrame(columns=["b_int", "b"])
assertDataFrameEqual(df.toPandas(), expected_pdf)

def test_to_local_iterator(self):
df = self.spark.range(8, numPartitions=4)
expected = df.collect()
Expand Down

0 comments on commit 9d88020

Please sign in to comment.