Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33277][PYSPARK][SQL] Use ContextAwareIterator to stop consuming after the task ends. #30899

Closed

Conversation

ueshin
Copy link
Member

@ueshin ueshin commented Dec 23, 2020

What changes were proposed in this pull request?

This is a retry of #30177.

This is not a complete fix, but it would take long time to complete (#30242).
As discussed offline, at least using ContextAwareIterator should be helpful enough for many cases.

As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use ContextAwareIterator to stop consuming after the task ends.

Why are the changes needed?

Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.

E.g.,:

spark.range(0, 100000, 1, 1).write.parquet(path)

spark.conf.set("spark.sql.columnVector.offheap.enabled", True)

def f(x):
    return 0

fUdf = udf(f, LongType())

spark.read.parquet(path).select(fUdf('id')).head()

This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added tests, and manually.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM @cloud-fan fyi

* A TaskContext aware iterator.
*
* As the Python evaluation consumes the parent iterator in a separate thread,
* it could consume more data from the parent even after the task ends and the parent is closed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add "If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor." here too?

Current phrase is not clear why it is bad to read closed parent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the sentence. Thanks!

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor comment.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this targeting for all release branches, master to branch-2.4?

@HyukjinKwon
Copy link
Member

Yes, @dongjoon-hyun. This is a partial fix, but still bandaids the problem.

* it could consume more data from the parent even after the task ends and the parent is closed.
* Thus, we should use ContextAwareIterator to stop consuming after the task ends.
*/
class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a general class. Can we put this into more general package instead of org.apache.sql.execution.python package as a separate file?

Copy link
Member Author

@ueshin ueshin Dec 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the class to org.apache.spark.util package. cc @gatorsmile

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented a suggestion for the package location of new ContextAwareIterator class. It would be great if we can reuse this for the other languages or parts in the future.

@dongjoon-hyun
Copy link
Member

cc @gatorsmile for the above package location discussion.

@SparkQA
Copy link

SparkQA commented Dec 23, 2020

Test build #133261 has finished for PR 30899 at commit 03c7aac.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN]

@github-actions github-actions bot added the CORE label Dec 23, 2020
* limitations under the License.
*/

package org.apache.spark.util
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we put this into org.apache.spark because we have InterruptibleIterator there?

Also, maybe we had better add @DeveloperApi annotation like InterruptibleIterator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, updated. Thanks!

@@ -28,10 +29,12 @@ import org.apache.spark.TaskContext
* which crashes the executor.
* Thus, we should use [[ContextAwareIterator]] to stop consuming after the task ends.
*/
class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] {
@DeveloperApi
class ContextAwareIterator[+T](val context: TaskContext, val delegate: Iterator[T])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thank you for revising this consistently with InterruptibleIterator. Now, it looks much better.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending CIs). Thanks, @ueshin , @HyukjinKwon , @viirya !

@SparkQA
Copy link

SparkQA commented Dec 23, 2020

Test build #133320 has finished for PR 30899 at commit 3606a4c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN]

@SparkQA
Copy link

SparkQA commented Dec 23, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37914/

@SparkQA
Copy link

SparkQA commented Dec 23, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37914/

dongjoon-hyun pushed a commit that referenced this pull request Dec 23, 2020
…g after the task ends

### What changes were proposed in this pull request?

This is a retry of #30177.

This is not a complete fix, but it would take long time to complete (#30242).
As discussed offline, at least using `ContextAwareIterator` should be helpful enough for many cases.

As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends.

### Why are the changes needed?

Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.

E.g.,:

```py
spark.range(0, 100000, 1, 1).write.parquet(path)

spark.conf.set("spark.sql.columnVector.offheap.enabled", True)

def f(x):
    return 0

fUdf = udf(f, LongType())

spark.read.parquet(path).select(fUdf('id')).head()
```

This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests, and manually.

Closes #30899 from ueshin/issues/SPARK-33277/context_aware_iterator.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 5c9b421)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
dongjoon-hyun pushed a commit that referenced this pull request Dec 23, 2020
…g after the task ends

### What changes were proposed in this pull request?

This is a retry of #30177.

This is not a complete fix, but it would take long time to complete (#30242).
As discussed offline, at least using `ContextAwareIterator` should be helpful enough for many cases.

As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends.

### Why are the changes needed?

Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.

E.g.,:

```py
spark.range(0, 100000, 1, 1).write.parquet(path)

spark.conf.set("spark.sql.columnVector.offheap.enabled", True)

def f(x):
    return 0

fUdf = udf(f, LongType())

spark.read.parquet(path).select(fUdf('id')).head()
```

This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests, and manually.

Closes #30899 from ueshin/issues/SPARK-33277/context_aware_iterator.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 5c9b421)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@dongjoon-hyun
Copy link
Member

Merged to master/3.1/3.0.

Could you make a backporting PR for branch-2.4, @ueshin ?

@dongjoon-hyun
Copy link
Member

Thank you, @ueshin , @HyukjinKwon , @viirya .

@SparkQA
Copy link

SparkQA commented Dec 24, 2020

Test build #133321 has finished for PR 30899 at commit 36176f7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

HyukjinKwon pushed a commit that referenced this pull request Dec 24, 2020
…suming after the task ends

### What changes were proposed in this pull request?

This is a backport of #30899.

This is not a complete fix, but it would take long time to complete (#30242).
As discussed offline, at least using `ContextAwareIterator` should be helpful enough for many cases.

As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends.

### Why are the changes needed?

Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.

E.g.,:

```py
spark.range(0, 100000, 1, 1).write.parquet(path)

spark.conf.set("spark.sql.columnVector.offheap.enabled", True)

def f(x):
    return 0

fUdf = udf(f, LongType())

spark.read.parquet(path).select(fUdf('id')).head()
```

This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests, and manually.

Closes #30913 from ueshin/issues/SPARK-33277/2.4/context_aware_iterator.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants