-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33277][PYSPARK][SQL] Use ContextAwareIterator to stop consuming after the task ends. #30177
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already reviewed this. LGTM
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
Outdated
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
cc @BryanCutler for another look if you find some time. |
Kubernetes integration test status success |
Is this only for Apache Spark 3.1 and 3.0, @ueshin and @HyukjinKwon ? |
I believe this is a long standing bug even for 2.4 releases .. |
Thanks. Could you update the affected version of SPARK-33277 accordingly? |
class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] { | ||
|
||
override def hasNext: Boolean = | ||
!context.isCompleted() && !context.isInterrupted() && iter.hasNext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, one thing I would like to note that this is not a clean shot.
This is rather a bandaid fix because the consumption in the iterator is async-ed from the main task thread. So, the close can happen at any point in the upstream, e.g. in the middle of hasNext
, and it still can cause the same issue.
To completely fix this, IMHO, we should sync completely. Then there's no point of having a separate thread to process Python UDFs.
I think the cause is basically similar with that input_file_name
due to un-sync between this thread and main thread (see #24958 (comment)).
If there's a better option, it'd be great but I think this fix is good enough (given that I see similar approach in ContinuousQueuedDataReader
).
Let me know if I missed something here.
Test build #130388 has finished for PR 30177 at commit
|
Test build #130389 has finished for PR 30177 at commit
|
I am pretty sure this is the best effort we can make with a minimized change for now. I am merging it in, but please let me know if you guys have a different thought. Merged to master. @ueshin seems like it has a conflict. Would you mind opening a PR for branch-3.0 and 2.4? |
…g after the task ends As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. No. Added tests, and manually. Closes apache#30177 from ueshin/issues/SPARK-33277/python_pandas_udf. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…g after the task ends As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. No. Added tests, and manually. Closes apache#30177 from ueshin/issues/SPARK-33277/python_pandas_udf. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…suming after the task ends ### What changes were proposed in this pull request? This is a backport of #30177. As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. ### Why are the changes needed? Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests, and manually. Closes #30217 from ueshin/issues/SPARK-33277/3.0/python_pandas_udf. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…sumin… ### What changes were proposed in this pull request? This is a backport of #30177. As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. ### Why are the changes needed? Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests, and manually. Closes #30218 from ueshin/issues/SPARK-33277/2.4/python_pandas_udf. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@ueshin, seems the test can be flaky for the reason we discussed at #30177 (comment), see https://github.com/apache/spark/runs/1338412158: Shall we revert the test cases for now? I think the reproducer in the PR description is good enough for now. |
Sure, could you revert it for now? I'll take a look later. |
Let me monitor a bit more before reverting it. I just saw it once. |
@ueshin, for some reasons, the tests became pretty flaky .. I will just revert as you said. |
Thank you for reverting, @HyukjinKwon ! |
@HyukjinKwon Thanks, I'll include the reverted changes in #30242. |
…g after the task ends ### What changes were proposed in this pull request? This is a retry of #30177. This is not a complete fix, but it would take long time to complete (#30242). As discussed offline, at least using `ContextAwareIterator` should be helpful enough for many cases. As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. ### Why are the changes needed? Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests, and manually. Closes #30899 from ueshin/issues/SPARK-33277/context_aware_iterator. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…g after the task ends ### What changes were proposed in this pull request? This is a retry of #30177. This is not a complete fix, but it would take long time to complete (#30242). As discussed offline, at least using `ContextAwareIterator` should be helpful enough for many cases. As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. ### Why are the changes needed? Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests, and manually. Closes #30899 from ueshin/issues/SPARK-33277/context_aware_iterator. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 5c9b421) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…g after the task ends ### What changes were proposed in this pull request? This is a retry of #30177. This is not a complete fix, but it would take long time to complete (#30242). As discussed offline, at least using `ContextAwareIterator` should be helpful enough for many cases. As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. ### Why are the changes needed? Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests, and manually. Closes #30899 from ueshin/issues/SPARK-33277/context_aware_iterator. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 5c9b421) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…g after the task ends ### What changes were proposed in this pull request? This is a retry of apache#30177. This is not a complete fix, but it would take long time to complete (apache#30242). As discussed offline, at least using `ContextAwareIterator` should be helpful enough for many cases. As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. ### Why are the changes needed? Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests, and manually.
… task completion listener returns ### What changes were proposed in this pull request? Python UDFs in Spark SQL are run in a separate Python process. The Python process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). This writer thread drives the child plan by pulling rows from its output iterator and serializing them across a socket. When the child exec node is the off-heap vectorized Parquet reader, these rows are backed by off-heap memory. The child node uses a task completion listener to free the off-heap memory at the end of the task, which invalidates the output iterator and any rows it has produced. Since task completion listeners are registered bottom-up and executed in reverse order of registration, this is safe as long as an exec node never accesses its input after its task completion listener has executed.[^1] The BasePythonRunner task completion listener violates this assumption. It interrupts the writer thread, but does not wait for it to exit. This causes a race condition that can lead to an executor crash: 1. The Python writer thread is processing a row backed by off-heap memory. 2. The task finishes, for example because it has reached a row limit. 3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread, but the writer thread does not check it immediately. 4. The child plan's task completion listener frees its off-heap memory, invalidating the row that the Python writer thread is processing. 5. The Python writer thread attempts to access the invalidated row. The use-after-free triggers a segfault that crashes the executor. This PR fixes the bug by making the BasePythonRunner task completion listener wait for the writer thread to exit before returning. This prevents its input from being invalidated while the thread is running. The sequence of events is now as follows: 1. The Python writer thread is processing a row backed by off-heap memory. 2. The task finishes, for example because it has reached a row limit. 3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread and waits for the writer thread to exit. 4. The child plan's task completion listener can safely free its off-heap memory without invalidating live rows. TaskContextImpl previously held a lock while invoking the task completion listeners. This would now cause a deadlock because the writer thread's exception handler calls `TaskContextImpl#isCompleted()`, which needs to acquire the same lock. To avoid deadlock, this PR modifies TaskContextImpl to release the lock before invoking the listeners, while still maintaining sequential execution of listeners. [^1]: This guarantee was not historically recognized, leading to similar bugs as far back as 2014 ([SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019?focusedCommentId=13953661&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13953661)). The root cause was the lack of a reliably-ordered mechanism for operators to free resources at the end of a task. Such a mechanism (task completion listeners) was added and gradually refined, and we can now make this guarantee explicit. (An alternative approach is to use closeable iterators everywhere, but this would be a major change.) ### Why are the changes needed? Without this PR, attempting to use Python UDFs while the off-heap vectorized Parquet reader is enabled (`spark.sql.columnVector.offheap.enabled true`) can cause executors to segfault. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A [previous PR](#30177) reduced the likelihood of encountering this race condition, but did not eliminate it. The accompanying tests were therefore flaky and had to be disabled. This PR eliminates the race condition, allowing us to re-enable these tests. One of the tests, `test_pandas_udf_scalar`, previously failed 30/1000 times and now always succeeds. An internal workload previously failed with a segfault about 40% of the time when run with `spark.sql.columnVector.offheap.enabled true`, and now succeeds 100% of the time. Closes #34245 from ankurdave/SPARK-33277-thread-join. Authored-by: Ankur Dave <ankurdave@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… task completion listener returns ### What changes were proposed in this pull request? Python UDFs in Spark SQL are run in a separate Python process. The Python process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). This writer thread drives the child plan by pulling rows from its output iterator and serializing them across a socket. When the child exec node is the off-heap vectorized Parquet reader, these rows are backed by off-heap memory. The child node uses a task completion listener to free the off-heap memory at the end of the task, which invalidates the output iterator and any rows it has produced. Since task completion listeners are registered bottom-up and executed in reverse order of registration, this is safe as long as an exec node never accesses its input after its task completion listener has executed.[^1] The BasePythonRunner task completion listener violates this assumption. It interrupts the writer thread, but does not wait for it to exit. This causes a race condition that can lead to an executor crash: 1. The Python writer thread is processing a row backed by off-heap memory. 2. The task finishes, for example because it has reached a row limit. 3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread, but the writer thread does not check it immediately. 4. The child plan's task completion listener frees its off-heap memory, invalidating the row that the Python writer thread is processing. 5. The Python writer thread attempts to access the invalidated row. The use-after-free triggers a segfault that crashes the executor. This PR fixes the bug by making the BasePythonRunner task completion listener wait for the writer thread to exit before returning. This prevents its input from being invalidated while the thread is running. The sequence of events is now as follows: 1. The Python writer thread is processing a row backed by off-heap memory. 2. The task finishes, for example because it has reached a row limit. 3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread and waits for the writer thread to exit. 4. The child plan's task completion listener can safely free its off-heap memory without invalidating live rows. TaskContextImpl previously held a lock while invoking the task completion listeners. This would now cause a deadlock because the writer thread's exception handler calls `TaskContextImpl#isCompleted()`, which needs to acquire the same lock. To avoid deadlock, this PR modifies TaskContextImpl to release the lock before invoking the listeners, while still maintaining sequential execution of listeners. [^1]: This guarantee was not historically recognized, leading to similar bugs as far back as 2014 ([SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019?focusedCommentId=13953661&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13953661)). The root cause was the lack of a reliably-ordered mechanism for operators to free resources at the end of a task. Such a mechanism (task completion listeners) was added and gradually refined, and we can now make this guarantee explicit. (An alternative approach is to use closeable iterators everywhere, but this would be a major change.) ### Why are the changes needed? Without this PR, attempting to use Python UDFs while the off-heap vectorized Parquet reader is enabled (`spark.sql.columnVector.offheap.enabled true`) can cause executors to segfault. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A [previous PR](#30177) reduced the likelihood of encountering this race condition, but did not eliminate it. The accompanying tests were therefore flaky and had to be disabled. This PR eliminates the race condition, allowing us to re-enable these tests. One of the tests, `test_pandas_udf_scalar`, previously failed 30/1000 times and now always succeeds. An internal workload previously failed with a segfault about 40% of the time when run with `spark.sql.columnVector.offheap.enabled true`, and now succeeds 100% of the time. Closes #34245 from ankurdave/SPARK-33277-thread-join. Authored-by: Ankur Dave <ankurdave@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit dfca1d1) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… task completion listener returns ### What changes were proposed in this pull request? Python UDFs in Spark SQL are run in a separate Python process. The Python process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). This writer thread drives the child plan by pulling rows from its output iterator and serializing them across a socket. When the child exec node is the off-heap vectorized Parquet reader, these rows are backed by off-heap memory. The child node uses a task completion listener to free the off-heap memory at the end of the task, which invalidates the output iterator and any rows it has produced. Since task completion listeners are registered bottom-up and executed in reverse order of registration, this is safe as long as an exec node never accesses its input after its task completion listener has executed.[^1] The BasePythonRunner task completion listener violates this assumption. It interrupts the writer thread, but does not wait for it to exit. This causes a race condition that can lead to an executor crash: 1. The Python writer thread is processing a row backed by off-heap memory. 2. The task finishes, for example because it has reached a row limit. 3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread, but the writer thread does not check it immediately. 4. The child plan's task completion listener frees its off-heap memory, invalidating the row that the Python writer thread is processing. 5. The Python writer thread attempts to access the invalidated row. The use-after-free triggers a segfault that crashes the executor. This PR fixes the bug by making the BasePythonRunner task completion listener wait for the writer thread to exit before returning. This prevents its input from being invalidated while the thread is running. The sequence of events is now as follows: 1. The Python writer thread is processing a row backed by off-heap memory. 2. The task finishes, for example because it has reached a row limit. 3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread and waits for the writer thread to exit. 4. The child plan's task completion listener can safely free its off-heap memory without invalidating live rows. TaskContextImpl previously held a lock while invoking the task completion listeners. This would now cause a deadlock because the writer thread's exception handler calls `TaskContextImpl#isCompleted()`, which needs to acquire the same lock. To avoid deadlock, this PR modifies TaskContextImpl to release the lock before invoking the listeners, while still maintaining sequential execution of listeners. [^1]: This guarantee was not historically recognized, leading to similar bugs as far back as 2014 ([SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019?focusedCommentId=13953661&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13953661)). The root cause was the lack of a reliably-ordered mechanism for operators to free resources at the end of a task. Such a mechanism (task completion listeners) was added and gradually refined, and we can now make this guarantee explicit. (An alternative approach is to use closeable iterators everywhere, but this would be a major change.) ### Why are the changes needed? Without this PR, attempting to use Python UDFs while the off-heap vectorized Parquet reader is enabled (`spark.sql.columnVector.offheap.enabled true`) can cause executors to segfault. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A [previous PR](apache#30177) reduced the likelihood of encountering this race condition, but did not eliminate it. The accompanying tests were therefore flaky and had to be disabled. This PR eliminates the race condition, allowing us to re-enable these tests. One of the tests, `test_pandas_udf_scalar`, previously failed 30/1000 times and now always succeeds. An internal workload previously failed with a segfault about 40% of the time when run with `spark.sql.columnVector.offheap.enabled true`, and now succeeds 100% of the time. Closes apache#34245 from ankurdave/SPARK-33277-thread-join. Authored-by: Ankur Dave <ankurdave@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit dfca1d1) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… task completion listener returns ### What changes were proposed in this pull request? Python UDFs in Spark SQL are run in a separate Python process. The Python process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). This writer thread drives the child plan by pulling rows from its output iterator and serializing them across a socket. When the child exec node is the off-heap vectorized Parquet reader, these rows are backed by off-heap memory. The child node uses a task completion listener to free the off-heap memory at the end of the task, which invalidates the output iterator and any rows it has produced. Since task completion listeners are registered bottom-up and executed in reverse order of registration, this is safe as long as an exec node never accesses its input after its task completion listener has executed.[^1] The BasePythonRunner task completion listener violates this assumption. It interrupts the writer thread, but does not wait for it to exit. This causes a race condition that can lead to an executor crash: 1. The Python writer thread is processing a row backed by off-heap memory. 2. The task finishes, for example because it has reached a row limit. 3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread, but the writer thread does not check it immediately. 4. The child plan's task completion listener frees its off-heap memory, invalidating the row that the Python writer thread is processing. 5. The Python writer thread attempts to access the invalidated row. The use-after-free triggers a segfault that crashes the executor. This PR fixes the bug by making the BasePythonRunner task completion listener wait for the writer thread to exit before returning. This prevents its input from being invalidated while the thread is running. The sequence of events is now as follows: 1. The Python writer thread is processing a row backed by off-heap memory. 2. The task finishes, for example because it has reached a row limit. 3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread and waits for the writer thread to exit. 4. The child plan's task completion listener can safely free its off-heap memory without invalidating live rows. TaskContextImpl previously held a lock while invoking the task completion listeners. This would now cause a deadlock because the writer thread's exception handler calls `TaskContextImpl#isCompleted()`, which needs to acquire the same lock. To avoid deadlock, this PR modifies TaskContextImpl to release the lock before invoking the listeners, while still maintaining sequential execution of listeners. [^1]: This guarantee was not historically recognized, leading to similar bugs as far back as 2014 ([SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019?focusedCommentId=13953661&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13953661)). The root cause was the lack of a reliably-ordered mechanism for operators to free resources at the end of a task. Such a mechanism (task completion listeners) was added and gradually refined, and we can now make this guarantee explicit. (An alternative approach is to use closeable iterators everywhere, but this would be a major change.) ### Why are the changes needed? Without this PR, attempting to use Python UDFs while the off-heap vectorized Parquet reader is enabled (`spark.sql.columnVector.offheap.enabled true`) can cause executors to segfault. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A [previous PR](apache#30177) reduced the likelihood of encountering this race condition, but did not eliminate it. The accompanying tests were therefore flaky and had to be disabled. This PR eliminates the race condition, allowing us to re-enable these tests. One of the tests, `test_pandas_udf_scalar`, previously failed 30/1000 times and now always succeeds. An internal workload previously failed with a segfault about 40% of the time when run with `spark.sql.columnVector.offheap.enabled true`, and now succeeds 100% of the time. Closes apache#34245 from ankurdave/SPARK-33277-thread-join. Authored-by: Ankur Dave <ankurdave@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit dfca1d1) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… task completion listener returns ### What changes were proposed in this pull request? Python UDFs in Spark SQL are run in a separate Python process. The Python process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). This writer thread drives the child plan by pulling rows from its output iterator and serializing them across a socket. When the child exec node is the off-heap vectorized Parquet reader, these rows are backed by off-heap memory. The child node uses a task completion listener to free the off-heap memory at the end of the task, which invalidates the output iterator and any rows it has produced. Since task completion listeners are registered bottom-up and executed in reverse order of registration, this is safe as long as an exec node never accesses its input after its task completion listener has executed.[^1] The BasePythonRunner task completion listener violates this assumption. It interrupts the writer thread, but does not wait for it to exit. This causes a race condition that can lead to an executor crash: 1. The Python writer thread is processing a row backed by off-heap memory. 2. The task finishes, for example because it has reached a row limit. 3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread, but the writer thread does not check it immediately. 4. The child plan's task completion listener frees its off-heap memory, invalidating the row that the Python writer thread is processing. 5. The Python writer thread attempts to access the invalidated row. The use-after-free triggers a segfault that crashes the executor. This PR fixes the bug by making the BasePythonRunner task completion listener wait for the writer thread to exit before returning. This prevents its input from being invalidated while the thread is running. The sequence of events is now as follows: 1. The Python writer thread is processing a row backed by off-heap memory. 2. The task finishes, for example because it has reached a row limit. 3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread and waits for the writer thread to exit. 4. The child plan's task completion listener can safely free its off-heap memory without invalidating live rows. TaskContextImpl previously held a lock while invoking the task completion listeners. This would now cause a deadlock because the writer thread's exception handler calls `TaskContextImpl#isCompleted()`, which needs to acquire the same lock. To avoid deadlock, this PR modifies TaskContextImpl to release the lock before invoking the listeners, while still maintaining sequential execution of listeners. [^1]: This guarantee was not historically recognized, leading to similar bugs as far back as 2014 ([SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019?focusedCommentId=13953661&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13953661)). The root cause was the lack of a reliably-ordered mechanism for operators to free resources at the end of a task. Such a mechanism (task completion listeners) was added and gradually refined, and we can now make this guarantee explicit. (An alternative approach is to use closeable iterators everywhere, but this would be a major change.) ### Why are the changes needed? Without this PR, attempting to use Python UDFs while the off-heap vectorized Parquet reader is enabled (`spark.sql.columnVector.offheap.enabled true`) can cause executors to segfault. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A [previous PR](apache#30177) reduced the likelihood of encountering this race condition, but did not eliminate it. The accompanying tests were therefore flaky and had to be disabled. This PR eliminates the race condition, allowing us to re-enable these tests. One of the tests, `test_pandas_udf_scalar`, previously failed 30/1000 times and now always succeeds. An internal workload previously failed with a segfault about 40% of the time when run with `spark.sql.columnVector.offheap.enabled true`, and now succeeds 100% of the time. Closes apache#34245 from ankurdave/SPARK-33277-thread-join. Authored-by: Ankur Dave <ankurdave@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit dfca1d1) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded. More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving [race conditions](https://issues.apache.org/jira/browse/SPARK-33277) and [deadlocks](https://issues.apache.org/jira/browse/SPARK-38677) between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., [fix 1](https://issues.apache.org/jira/browse/SPARK-22535), [fix 2](#30177), [fix 3](243c321). Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon [monitor threads](https://github.com/apache/spark/blob/a3a32912be04d3760cb34eb4b79d6d481bbec502/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L579) to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. #### Current Execution Model in Spark for Python UDFs For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result. The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output. #### Proposed Fix The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model. In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s [SocketChannel](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html). See the `read()` method in the code below for approximately how this is achieved. ``` case class PythonUDFRunner { private var nextRow: Row = _ private var endOfStream = false private var childHasNext = true private var buffer: ByteBuffer = _ def hasNext(): Boolean = nextRow != null || { if (!endOfStream) { read(buffer) nextRow = deserialize(buffer) hasNext } else { false } } def next(): Row = { if (hasNext) { val outputRow = nextRow nextRow = null outputRow } else { null } } def read(buf: Array[Byte]): Row = { var n = 0 while (n == 0) { // Alternate between reading/writing to the Python worker using async I/O if (pythonWorker.isReadable) { n = pythonWorker.read(buf) } if (pythonWorker.isWritable) { consumeChildPlanAndWriteDataToPythonWorker() } } def consumeChildPlanAndWriteDataToPythonWorker(): Unit = { // Tracks whether the connection to the Python worker can be written to. var socketAcceptsInput = true while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) { if (!buffer.hasRemaining && childHasNext) { // Consume data from the child and buffer it. writeToBuffer(childPlan.next(), buffer) childHasNext = childPlan.hasNext() if (!childHasNext) { // Exhausted child plan’s output. Write a keyword to the Python worker signaling the end of data input. writeToBuffer(endOfStream) } } // Try to write as much buffered data as possible to the Python worker. while (buffer.hasRemaining && socketAcceptsInput) { val n = writeToPythonWorker(buffer) // `writeToPythonWorker()` returns 0 when the socket cannot accept more data right now. socketAcceptsInput = n > 0 } } } } ``` ### Why are the changes needed? This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #42385 from utkarsh39/SPARK-44705. Authored-by: Utkarsh <utkarsh.agarwal@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded. More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving [race conditions](https://issues.apache.org/jira/browse/SPARK-33277) and [deadlocks](https://issues.apache.org/jira/browse/SPARK-38677) between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., [fix 1](https://issues.apache.org/jira/browse/SPARK-22535), [fix 2](apache#30177), [fix 3](apache@243c321). Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon [monitor threads](https://github.com/apache/spark/blob/a3a32912be04d3760cb34eb4b79d6d481bbec502/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L579) to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. #### Current Execution Model in Spark for Python UDFs For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result. The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output. #### Proposed Fix The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model. In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s [SocketChannel](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html). See the `read()` method in the code below for approximately how this is achieved. ``` case class PythonUDFRunner { private var nextRow: Row = _ private var endOfStream = false private var childHasNext = true private var buffer: ByteBuffer = _ def hasNext(): Boolean = nextRow != null || { if (!endOfStream) { read(buffer) nextRow = deserialize(buffer) hasNext } else { false } } def next(): Row = { if (hasNext) { val outputRow = nextRow nextRow = null outputRow } else { null } } def read(buf: Array[Byte]): Row = { var n = 0 while (n == 0) { // Alternate between reading/writing to the Python worker using async I/O if (pythonWorker.isReadable) { n = pythonWorker.read(buf) } if (pythonWorker.isWritable) { consumeChildPlanAndWriteDataToPythonWorker() } } def consumeChildPlanAndWriteDataToPythonWorker(): Unit = { // Tracks whether the connection to the Python worker can be written to. var socketAcceptsInput = true while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) { if (!buffer.hasRemaining && childHasNext) { // Consume data from the child and buffer it. writeToBuffer(childPlan.next(), buffer) childHasNext = childPlan.hasNext() if (!childHasNext) { // Exhausted child plan’s output. Write a keyword to the Python worker signaling the end of data input. writeToBuffer(endOfStream) } } // Try to write as much buffered data as possible to the Python worker. while (buffer.hasRemaining && socketAcceptsInput) { val n = writeToPythonWorker(buffer) // `writeToPythonWorker()` returns 0 when the socket cannot accept more data right now. socketAcceptsInput = n > 0 } } } } ``` ### Why are the changes needed? This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes apache#42385 from utkarsh39/SPARK-44705. Authored-by: Utkarsh <utkarsh.agarwal@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded. More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving [race conditions](https://issues.apache.org/jira/browse/SPARK-33277) and [deadlocks](https://issues.apache.org/jira/browse/SPARK-38677) between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., [fix 1](https://issues.apache.org/jira/browse/SPARK-22535), [fix 2](apache#30177), [fix 3](apache@243c321). Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon [monitor threads](https://github.com/apache/spark/blob/a3a32912be04d3760cb34eb4b79d6d481bbec502/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L579) to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. #### Current Execution Model in Spark for Python UDFs For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result. The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output. #### Proposed Fix The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model. In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s [SocketChannel](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html). See the `read()` method in the code below for approximately how this is achieved. ``` case class PythonUDFRunner { private var nextRow: Row = _ private var endOfStream = false private var childHasNext = true private var buffer: ByteBuffer = _ def hasNext(): Boolean = nextRow != null || { if (!endOfStream) { read(buffer) nextRow = deserialize(buffer) hasNext } else { false } } def next(): Row = { if (hasNext) { val outputRow = nextRow nextRow = null outputRow } else { null } } def read(buf: Array[Byte]): Row = { var n = 0 while (n == 0) { // Alternate between reading/writing to the Python worker using async I/O if (pythonWorker.isReadable) { n = pythonWorker.read(buf) } if (pythonWorker.isWritable) { consumeChildPlanAndWriteDataToPythonWorker() } } def consumeChildPlanAndWriteDataToPythonWorker(): Unit = { // Tracks whether the connection to the Python worker can be written to. var socketAcceptsInput = true while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) { if (!buffer.hasRemaining && childHasNext) { // Consume data from the child and buffer it. writeToBuffer(childPlan.next(), buffer) childHasNext = childPlan.hasNext() if (!childHasNext) { // Exhausted child plan’s output. Write a keyword to the Python worker signaling the end of data input. writeToBuffer(endOfStream) } } // Try to write as much buffered data as possible to the Python worker. while (buffer.hasRemaining && socketAcceptsInput) { val n = writeToPythonWorker(buffer) // `writeToPythonWorker()` returns 0 when the socket cannot accept more data right now. socketAcceptsInput = n > 0 } } } } ``` ### Why are the changes needed? This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes apache#42385 from utkarsh39/SPARK-44705. Authored-by: Utkarsh <utkarsh.agarwal@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded. More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving [race conditions](https://issues.apache.org/jira/browse/SPARK-33277) and [deadlocks](https://issues.apache.org/jira/browse/SPARK-38677) between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., [fix 1](https://issues.apache.org/jira/browse/SPARK-22535), [fix 2](apache#30177), [fix 3](apache@243c321). Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon [monitor threads](https://github.com/apache/spark/blob/a3a32912be04d3760cb34eb4b79d6d481bbec502/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L579) to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. #### Current Execution Model in Spark for Python UDFs For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result. The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output. #### Proposed Fix The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model. In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s [SocketChannel](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html). See the `read()` method in the code below for approximately how this is achieved. ``` case class PythonUDFRunner { private var nextRow: Row = _ private var endOfStream = false private var childHasNext = true private var buffer: ByteBuffer = _ def hasNext(): Boolean = nextRow != null || { if (!endOfStream) { read(buffer) nextRow = deserialize(buffer) hasNext } else { false } } def next(): Row = { if (hasNext) { val outputRow = nextRow nextRow = null outputRow } else { null } } def read(buf: Array[Byte]): Row = { var n = 0 while (n == 0) { // Alternate between reading/writing to the Python worker using async I/O if (pythonWorker.isReadable) { n = pythonWorker.read(buf) } if (pythonWorker.isWritable) { consumeChildPlanAndWriteDataToPythonWorker() } } def consumeChildPlanAndWriteDataToPythonWorker(): Unit = { // Tracks whether the connection to the Python worker can be written to. var socketAcceptsInput = true while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) { if (!buffer.hasRemaining && childHasNext) { // Consume data from the child and buffer it. writeToBuffer(childPlan.next(), buffer) childHasNext = childPlan.hasNext() if (!childHasNext) { // Exhausted child plan’s output. Write a keyword to the Python worker signaling the end of data input. writeToBuffer(endOfStream) } } // Try to write as much buffered data as possible to the Python worker. while (buffer.hasRemaining && socketAcceptsInput) { val n = writeToPythonWorker(buffer) // `writeToPythonWorker()` returns 0 when the socket cannot accept more data right now. socketAcceptsInput = n > 0 } } } } ``` ### Why are the changes needed? This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes apache#42385 from utkarsh39/SPARK-44705. Authored-by: Utkarsh <utkarsh.agarwal@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use
ContextAwareIterator
to stop consuming after the task ends.Why are the changes needed?
Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.
E.g.,:
This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added tests, and manually.