Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28153][PYTHON] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) #24958

Closed
wants to merge 2 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jun 25, 2019

What changes were proposed in this pull request?

This PR proposes to use AtomicReference so that parent and child threads can access to the same file block holder.

Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets InputFileBlockHolder.set before the parent does which the parent thread is unable to read later.

  1. In this separate child thread, if it happens to call InputFileBlockHolder.set first without initialization of the parent's thread local (which is done when the ThreadLocal.get() is first called), the child thread seems calling its own initialValue to initialize.

  2. After that, the parent calls its own initialValue to initializes at the first call of ThreadLocal.get().

  3. Both now have two different references. Updating at child isn't reflected to parent.

This PR fixes it via initializing parent's thread local with AtomicReference for file status so that they can be used in each task, and children thread's update is reflected.

I also tried to explain this a bit more at #24958 (comment).

How was this patch tested?

Manually tested and unittest was added.

@HyukjinKwon
Copy link
Member Author

cc @cloud-fan, @viirya, @brkyvz

@HyukjinKwon
Copy link
Member Author

BTW, this seems a long standing issue. I could reproduce this at 2.3, 2.4.

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106862 has finished for PR 24958 at commit 3874dfc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon HyukjinKwon changed the title [SPARK-28153][PYTHON] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) [WIP][SPARK-28153][PYTHON] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) Jun 25, 2019
@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-28153][PYTHON] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) [SPARK-28153][PYTHON] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) Jun 25, 2019
@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106878 has finished for PR 24958 at commit 2c74273.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106884 has finished for PR 24958 at commit a098db5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Jun 25, 2019

Am I right that the fix here isn't so much the 'atomic'-ness but just the fact that there's a container object whose referent can be updated? like you could use a 1-element array too? that's all fine either way, just for my understanding.

@HyukjinKwon
Copy link
Member Author

Ah, virtually yes. But using atomic at least will guard the safety between child and the parent thread. For instance, the case an UDF launches another thread

@brkyvz
Copy link
Contributor

brkyvz commented Jun 25, 2019

While this fix seems fine, I'm worried it could actually lead to correctness issues. Py4j may reuse the same threads for different tasks, therefore I'm worried that we could return the wrong reference.

A more robust solution would be:

  1. InputFileBlocks are stored in a ConcurrentHashMap, from taskAttemptId (unique in a Spark cluster) -> InputFileBlock.
  2. In Scala we can just look up these values
  3. In Python and R, we already get the TaskContext as part of the protocol (in worker.py) it seems. We need to do a look up by the TaskContext.taskAttemptId.

What do you think?

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106897 has finished for PR 24958 at commit 71da1c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jun 25, 2019

I think Py4J is only used at driver side and we're safe about this concern. InputFileBlockHolder.getXXX is used within related expressions (e.g., input_file_name)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala:    InputFileBlockHolder.getInputFilePath
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala:    val className = InputFileBlockHolder.getClass.getName.stripSuffix("$")
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala:    InputFileBlockHolder.getStartOffset
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala:    val className = InputFileBlockHolder.getClass.getName.stripSuffix("$")
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala:    InputFileBlockHolder.getLength
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/inputFileBlock.scala:    val className = InputFileBlockHolder.getClass.getName.stripSuffix("$")

and InputFileBlockHolder.set happens at iterator, for hadoop, hadoop2, DS1 and DS2

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala:          InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala:          InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala:          InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length)
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala:    InputFileBlockHolder.set(file.filePath, file.start, file.length)

So they are set and get at executor's side.

Even if there are some spots I missed, Py4j reuses the same threads for different tasks but the job execution call happens one at one time due to GIL and Py4J launches another thread if one thread is busy on JVM. So, it won't happen that one JVM thread somehow launches multiple jobs at the same time and same thread.

Moreover, I opened a PR to pin thread between PVM and JVM - #24898 which might be more correct behaviour (?). If we could switch the mode, it can permanently get rid of this concern.

@HyukjinKwon
Copy link
Member Author

Cc @jose-torres and @jiangxb1987 as well. I found you guys worked in a related issue before.

@HyukjinKwon
Copy link
Member Author

gentle ping .. :-) ..

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jul 5, 2019

Test build #107269 has finished for PR 24958 at commit 71da1c0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

While I'm reading the discussion here, I can hardly understand how we implement input_file() function with InheritableThreadLocal and AtomicReference, and make it work for pyspark. On the other hand, the idea proposed by @brkyvz is really easy to understand.

@HyukjinKwon
Copy link
Member Author

Thing is, it might be easy to understand but not neccessary from my understanding. It's not only input_file() case. To me it looks overkill.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 5, 2019

Test build #107287 has finished for PR 24958 at commit 71da1c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@brkyvz
Copy link
Contributor

brkyvz commented Jul 5, 2019

@HyukjinKwon I can guarantee you, with this solution we may hit a separate corner case. We should avoid using thread locals (especially inheritable thread locals) as much as possible.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jul 6, 2019

@brkyvz and @cloud-fan, this PR targets to fix SPARK-28153 with a minimised change. If the suggestions can fix other corner cases I am not aware of commonly, we should fix everywhere else that uses thread local like that, for instance,

object EpochTracker {
// The current epoch. Note that this is a shared reference; ContinuousWriteRDD.compute() will
// update the underlying AtomicLong as it finishes epochs. Other code should only read the value.
private val currentEpoch: ThreadLocal[AtomicLong] = new ThreadLocal[AtomicLong] {
override def initialValue() = new AtomicLong(-1)
}

Currently I am not sure which case the new suggestion will fix and cover. I would like to avoid to refactor it here to fix a bug due to a vague concern but without knowing which other corner cases we fix.

Can we target it in a different ticket if this is correct, and if you guys think new suggestion should be done?

@cloud-fan
Copy link
Contributor

Since Spark already uses thread local in many places, I'm OK with this surgical fix. But I do like the proposal from @brkyvz . Can we create a new JIRA ticket for it, and add a TODO somewhere? Then I think this PR is ready to go.

@brkyvz
Copy link
Contributor

brkyvz commented Jul 9, 2019

We didn't get it right in SPARK-27711, which is the reason I would prefer a complete and robust solution this time. I'm just afraid more things would prop up. If you would like to merge this, and delete it in a month, that's fine with me, but I don't see the value in rushing this fix.

@cloud-fan
Copy link
Contributor

This PR targets master/2.4/2.3, and I think it's safer to only implement the new idea in master. What do you think @brkyvz ?

@HyukjinKwon
Copy link
Member Author

@brkyvz, we're not rushing - we're not ignoring any issue or holes actually found or merging it without discussion. Also, using thread local isn't a horrible way although it might be less preferred case by case - we can avoid to have one place that multiple tasks access but run them in parallel separately.

I get the suggestion makes sense too but adding new way isn't necessarily safe. There are always new holes that can pop up. One conservative way is usually to keep the codes with less changes (you know for instance bug compatibility).

In addition, it's rather a general design issue not specific only to this code path. For instance, the codes below:

object EpochTracker {
// The current epoch. Note that this is a shared reference; ContinuousWriteRDD.compute() will
// update the underlying AtomicLong as it finishes epochs. Other code should only read the value.
private val currentEpoch: ThreadLocal[AtomicLong] = new ThreadLocal[AtomicLong] {
override def initialValue() = new AtomicLong(-1)
}

have the almost similar issue as SPARK-28153 - the parent thread updates the current epoch but the child thread (Python write thread) cannot read it. Ideally we should identify where to fix as well.

@SparkQA
Copy link

SparkQA commented Jul 23, 2019

Test build #108027 has finished for PR 24958 at commit cb4cfde.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@HyukjinKwon
Copy link
Member Author

cc @zsxwing as well.

Okie, then shall we proceed in this PR and discuss about the alternative later separately?

@SparkQA
Copy link

SparkQA commented Jul 31, 2019

Test build #108424 has finished for PR 24958 at commit cb4cfde.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

After reading the PR description closely, seems we only need to initialize InputFileBlockHolder at the beginning of a task? Why do we need AtomicReference?

@HyukjinKwon
Copy link
Member Author

We don't need actually. I just used that to match it to SS side - #24946. It won't affect the perf I guess and it's safer anyway because there are multiple threads that access to the variable.

@HyukjinKwon
Copy link
Member Author

FYI @srowen pointed it out too #24958 (comment) . We can use 1-length array too, for instance.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jul 31, 2019

makes sense, merging to master!

@cloud-fan cloud-fan closed this in b8e13b0 Jul 31, 2019
@cloud-fan
Copy link
Contributor

Actually I failed to do the backport due to bad network. @HyukjinKwon can you do the backport yourself? thanks!

@dongjoon-hyun
Copy link
Member

I'll do that, @cloud-fan .

@dongjoon-hyun
Copy link
Member

Ur, python/pyspark/sql/tests/test_functions.py doesn't exist in branch-2.4.
To keep the test coverage, we need a backporting PR with a new test case, @cloud-fan and @HyukjinKwon .

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 31, 2019

Hi, @HyukjinKwon , @cloud-fan , @brkyvz .

It's irrelevant to this PR, but PR build seems to skip many Python UTs. Is it safe to proceed the python PRs like this?

Skipped tests in pyspark.sql.tests.test_arrow with python2.7:
    test_createDataFrame_column_name_encoding (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_does_not_modify_input (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_fallback_disabled (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_fallback_enabled (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_respect_session_timezone (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_toggle (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_with_array_type (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_with_incorrect_schema (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_with_int_col_names (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_with_names (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_with_schema (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_with_single_data_type (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_filtered_frame (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_no_partition_frame (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_null_conversion (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_pandas_round_trip (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_propagates_spark_exception (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_schema_conversion_roundtrip (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_timestamp_dst (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_timestamp_nat (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_toPandas_arrow_toggle (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_toPandas_batch_order (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_toPandas_fallback_disabled (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_toPandas_fallback_enabled (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_toPandas_respect_session_timezone (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_toPandas_with_array_type (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_column_name_encoding (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_does_not_modify_input (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_fallback_disabled (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_fallback_enabled (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_respect_session_timezone (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_toggle (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_with_array_type (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_with_incorrect_schema (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_with_int_col_names (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_with_names (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_with_schema (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_createDataFrame_with_single_data_type (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_filtered_frame (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_no_partition_frame (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_null_conversion (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_pandas_round_trip (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_propagates_spark_exception (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_schema_conversion_roundtrip (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_timestamp_dst (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_timestamp_nat (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_toPandas_arrow_toggle (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_toPandas_batch_order (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_toPandas_fallback_disabled (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_toPandas_fallback_enabled (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_toPandas_respect_session_timezone (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_toPandas_with_array_type (pyspark.sql.tests.test_arrow.EncryptionArrowTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'

Skipped tests in pyspark.sql.tests.test_dataframe with python2.7:
    test_create_dataframe_from_pandas_with_dst (pyspark.sql.tests.test_dataframe.DataFrameTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_create_dataframe_from_pandas_with_timestamp (pyspark.sql.tests.test_dataframe.DataFrameTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_to_pandas (pyspark.sql.tests.test_dataframe.DataFrameTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_to_pandas_avoid_astype (pyspark.sql.tests.test_dataframe.DataFrameTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_query_execution_listener_on_collect_with_arrow (pyspark.sql.tests.test_dataframe.QueryExecutionListenerTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'

Skipped tests in pyspark.sql.tests.test_pandas_udf with python2.7:
    test_pandas_udf_arrow_overflow (pyspark.sql.tests.test_pandas_udf.PandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_pandas_udf_basic (pyspark.sql.tests.test_pandas_udf.PandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_pandas_udf_decorator (pyspark.sql.tests.test_pandas_udf.PandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_pandas_udf_detect_unsafe_type_conversion (pyspark.sql.tests.test_pandas_udf.PandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_stopiteration_in_udf (pyspark.sql.tests.test_pandas_udf.PandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_udf_wrong_arg (pyspark.sql.tests.test_pandas_udf.PandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'

Skipped tests in pyspark.sql.tests.test_pandas_udf_grouped_agg with python2.7:
    test_alias (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_array_type (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_basic (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_complex_expressions (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_complex_groupby (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_grouped_with_empty_partition (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_invalid_args (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_manual (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_mixed_sql (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_mixed_udfs (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_multiple_udfs (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_register_vectorized_udf_basic (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_retain_group_columns (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_unsupported_types (pyspark.sql.tests.test_pandas_udf_grouped_agg.GroupedAggPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'

Skipped tests in pyspark.sql.tests.test_pandas_udf_grouped_map with python2.7:
    test_array_type_correct (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_coerce (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_column_order (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_complex_groupby (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_datatype_string (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_decorator (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_empty_groupby (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_grouped_with_empty_partition (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_mixed_scalar_udfs_followed_by_grouby_apply (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_positional_assignment_conf (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_register_grouped_map_udf (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_self_join_with_pandas (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_supported_types (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_timestamp_dst (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_udf_with_key (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_unsupported_types (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_wrong_args (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_wrong_return_type (pyspark.sql.tests.test_pandas_udf_grouped_map.GroupedMapPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'

Skipped tests in pyspark.sql.tests.test_pandas_udf_scalar with python2.7:
    test_datasource_with_udf (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_mixed_udf (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_mixed_udf_and_sql (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_nondeterministic_vectorized_udf (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_nondeterministic_vectorized_udf_in_aggregate (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_pandas_udf_nested_arrays (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_pandas_udf_tokenize (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_register_nondeterministic_vectorized_udf_basic (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_register_vectorized_udf_basic (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_scalar_iter_udf_close (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_scalar_iter_udf_close_early (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_scalar_iter_udf_init (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_timestamp_dst (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_type_annotation (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_array_type (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_basic (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_chained (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_chained_struct_type (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_check_config (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_complex (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_datatype_string (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_dates (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_decorator (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_empty_partition (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_exception (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_invalid_length (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_nested_struct (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_null_array (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_null_binary (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_null_boolean (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_null_byte (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_null_decimal (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_null_double (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_null_float (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_null_int (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_null_long (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_null_short (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_null_string (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_return_scalar (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_return_timestamp_tz (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_string_in_udf (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_struct_complex (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_struct_type (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_struct_with_empty_partition (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_timestamps (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_timestamps_respect_session_timezone (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_unsupported_types (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_varargs (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_vectorized_udf_wrong_return_type (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'

Skipped tests in pyspark.sql.tests.test_pandas_udf_window with python2.7:
    test_array_type (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_bounded_mixed (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_bounded_simple (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_growing_window (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_invalid_args (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_mixed_sql (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_mixed_sql_and_udf (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_mixed_udf (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_multiple_udfs (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_replace_existing (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_shrinking_window (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_simple (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_sliding_window (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'
    test_without_partitionBy (pyspark.sql.tests.test_pandas_udf_window.WindowPandasUDFTests) ... skipped u'Pandas >= 0.23.2 must be installed; however, your version was 0.19.2.'

@dongjoon-hyun
Copy link
Member

Oh, never mind. It seems to be covered by python3.6 at least.

Skipped tests in pyspark.sql.tests.test_dataframe with python3.6:
      test_create_dataframe_required_pandas_not_found (pyspark.sql.tests.test_dataframe.DataFrameTests) ... SKIP (0.000s)
      test_to_pandas_required_pandas_not_found (pyspark.sql.tests.test_dataframe.DataFrameTests) ... SKIP (0.000s)

Skipped tests in pyspark.streaming.tests.test_kinesis with python3.6:
      test_kinesis_stream (pyspark.streaming.tests.test_kinesis.KinesisStreamTests) ... SKIP (0.000s)
      test_kinesis_stream_api (pyspark.streaming.tests.test_kinesis.KinesisStreamTests) ... SKIP (0.000s)

Skipped tests in pyspark.tests.test_readwrite with python3.6:
      test_sequencefiles (pyspark.tests.test_readwrite.InputFormatTests) ... SKIP (0.000s)
      test_newhadoop_with_array (pyspark.tests.test_readwrite.OutputFormatTests) ... SKIP (0.000s)
      test_sequencefiles (pyspark.tests.test_readwrite.OutputFormatTests) ... SKIP (0.000s)

@HyukjinKwon
Copy link
Member Author

Most of skipped tests are because Python 2 doesn't have the required pandas and pyarrow installed in our Jenkins.I talked with Shane before but seems he's pretty busy.

For this test case, it's safe to backport. Let me open a PR to backport today.

Due to minimal version difference, we cannot simply backport pandas harrow related PRs anyway.

HyukjinKwon added a commit to HyukjinKwon/spark that referenced this pull request Aug 1, 2019
… support input_file_name with Python UDF)

This PR proposes to use `AtomicReference` so that parent and child threads can access to the same file block holder.

Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets `InputFileBlockHolder.set` before the parent does which the parent thread is unable to read later.

1. In this separate child thread, if it happens to call `InputFileBlockHolder.set` first without initialization of the parent's thread local (which is done when the `ThreadLocal.get()` is first called), the child thread seems calling its own `initialValue` to initialize.

2. After that, the parent calls its own `initialValue` to initializes at the first call of `ThreadLocal.get()`.

3. Both now have two different references. Updating at child isn't reflected to parent.

This PR fixes it via initializing parent's thread local with `AtomicReference` for file status so that they can be used in each task, and children thread's update is reflected.

I also tried to explain this a bit more at apache#24958 (comment).

Manually tested and unittest was added.

Closes apache#24958 from HyukjinKwon/SPARK-28153.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@HyukjinKwon
Copy link
Member Author

opened #25321

HyukjinKwon added a commit that referenced this pull request Aug 1, 2019
…ckHolder (to support input_file_name with Python UDF)

## What changes were proposed in this pull request?

This PR backports #24958 to branch-2.4.

This PR proposes to use `AtomicReference` so that parent and child threads can access to the same file block holder.

Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets `InputFileBlockHolder.set` before the parent does which the parent thread is unable to read later.

1. In this separate child thread, if it happens to call `InputFileBlockHolder.set` first without initialization of the parent's thread local (which is done when the `ThreadLocal.get()` is first called), the child thread seems calling its own `initialValue` to initialize.

2. After that, the parent calls its own `initialValue` to initializes at the first call of `ThreadLocal.get()`.

3. Both now have two different references. Updating at child isn't reflected to parent.

This PR fixes it via initializing parent's thread local with `AtomicReference` for file status so that they can be used in each task, and children thread's update is reflected.

I also tried to explain this a bit more at #24958 (comment).

## How was this patch tested?

Manually tested and unittest was added.

Closes #25321 from HyukjinKwon/backport-SPARK-28153.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
rluta pushed a commit to rluta/spark that referenced this pull request Sep 17, 2019
…ckHolder (to support input_file_name with Python UDF)

## What changes were proposed in this pull request?

This PR backports apache#24958 to branch-2.4.

This PR proposes to use `AtomicReference` so that parent and child threads can access to the same file block holder.

Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets `InputFileBlockHolder.set` before the parent does which the parent thread is unable to read later.

1. In this separate child thread, if it happens to call `InputFileBlockHolder.set` first without initialization of the parent's thread local (which is done when the `ThreadLocal.get()` is first called), the child thread seems calling its own `initialValue` to initialize.

2. After that, the parent calls its own `initialValue` to initializes at the first call of `ThreadLocal.get()`.

3. Both now have two different references. Updating at child isn't reflected to parent.

This PR fixes it via initializing parent's thread local with `AtomicReference` for file status so that they can be used in each task, and children thread's update is reflected.

I also tried to explain this a bit more at apache#24958 (comment).

## How was this patch tested?

Manually tested and unittest was added.

Closes apache#25321 from HyukjinKwon/backport-SPARK-28153.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Sep 26, 2019
…ckHolder (to support input_file_name with Python UDF)

## What changes were proposed in this pull request?

This PR backports apache#24958 to branch-2.4.

This PR proposes to use `AtomicReference` so that parent and child threads can access to the same file block holder.

Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets `InputFileBlockHolder.set` before the parent does which the parent thread is unable to read later.

1. In this separate child thread, if it happens to call `InputFileBlockHolder.set` first without initialization of the parent's thread local (which is done when the `ThreadLocal.get()` is first called), the child thread seems calling its own `initialValue` to initialize.

2. After that, the parent calls its own `initialValue` to initializes at the first call of `ThreadLocal.get()`.

3. Both now have two different references. Updating at child isn't reflected to parent.

This PR fixes it via initializing parent's thread local with `AtomicReference` for file status so that they can be used in each task, and children thread's update is reflected.

I also tried to explain this a bit more at apache#24958 (comment).

## How was this patch tested?

Manually tested and unittest was added.

Closes apache#25321 from HyukjinKwon/backport-SPARK-28153.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon deleted the SPARK-28153 branch March 3, 2020 01:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants