Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33277][PYSPARK][SQL][3.0] Use ContextAwareIterator to stop consuming after the task ends #30217

Conversation

ueshin
Copy link
Member

@ueshin ueshin commented Nov 1, 2020

What changes were proposed in this pull request?

This is a backport of #30177.

As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use ContextAwareIterator to stop consuming after the task ends.

Why are the changes needed?

Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.

E.g.,:

spark.range(0, 100000, 1, 1).write.parquet(path)

spark.conf.set("spark.sql.columnVector.offheap.enabled", True)

def f(x):
    return 0

fUdf = udf(f, LongType())

spark.read.parquet(path).select(fUdf('id')).head()

This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added tests, and manually.

…g after the task ends

As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends.

Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.

E.g.,:

```py
spark.range(0, 100000, 1, 1).write.parquet(path)

spark.conf.set("spark.sql.columnVector.offheap.enabled", True)

def f(x):
    return 0

fUdf = udf(f, LongType())

spark.read.parquet(path).select(fUdf('id')).head()
```

This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.

No.

Added tests, and manually.

Closes apache#30177 from ueshin/issues/SPARK-33277/python_pandas_udf.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@SparkQA
Copy link

SparkQA commented Nov 1, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35104/

@SparkQA
Copy link

SparkQA commented Nov 1, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35104/

@HyukjinKwon
Copy link
Member

Merged to branch-3.0

HyukjinKwon pushed a commit that referenced this pull request Nov 2, 2020
…suming after the task ends

### What changes were proposed in this pull request?

This is a backport of #30177.

As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends.

### Why are the changes needed?

Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.

E.g.,:

```py
spark.range(0, 100000, 1, 1).write.parquet(path)

spark.conf.set("spark.sql.columnVector.offheap.enabled", True)

def f(x):
    return 0

fUdf = udf(f, LongType())

spark.read.parquet(path).select(fUdf('id')).head()
```

This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests, and manually.

Closes #30217 from ueshin/issues/SPARK-33277/3.0/python_pandas_udf.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon closed this Nov 2, 2020
@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Test build #130500 has finished for PR 30217 at commit a2680b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants