Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32010][PYTHON][CORE] Add InheritableThread for local properties and fixing a thread leak issue in pinned thread mode #28968

Closed
wants to merge 1 commit into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jul 1, 2020

What changes were proposed in this pull request?

This PR proposes:

  1. To introduce InheritableThread class, that works identically with threading.Thread but it can inherit the inheritable attributes of a JVM thread such as InheritableThreadLocal.

    This was a problem from the pinned thread mode, see also [SPARK-22340][PYTHON] Add a mode to pin Python thread into JVM's #24898. Now it works as below:

    import pyspark
    
    spark.sparkContext.setLocalProperty("a", "hi")
    def print_prop():
        print(spark.sparkContext.getLocalProperty("a"))
    
    pyspark.InheritableThread(target=print_prop).start()
    hi
    
  2. Also, it adds the resource leak fix into InheritableThread. Py4J leaks the thread and does not close the connection from Python to JVM. In InheritableThread, it manually closes the connections when PVM garbage collection happens. So, JVM threads finish safely. I manually verified by profiling but there's also another easy way to verify:

    PYSPARK_PIN_THREAD=true ./bin/pyspark
    >>> from threading import Thread
    >>> Thread(target=lambda: spark.range(1000).collect()).start()
    >>> Thread(target=lambda: spark.range(1000).collect()).start()
    >>> Thread(target=lambda: spark.range(1000).collect()).start()
    >>> spark._jvm._gateway_client.deque
    deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])
    >>> Thread(target=lambda: spark.range(1000).collect()).start()
    >>> spark._jvm._gateway_client.deque
    deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc08d0>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])

    This issue is fixed now.

  3. Because now we have a fix for the issue here, it also proposes to deprecate collectWithJobGroup which was a temporary workaround added to avoid this leak issue.

Why are the changes needed?

To support pinned thread mode properly without a resource leak, and a proper inheritable local properties.

Does this PR introduce any user-facing change?

Yes, it adds an API InheritableThread class for pinned thread mode.

How was this patch tested?

Manually tested as described above, and unit test was added as well.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jul 1, 2020

@HyukjinKwon HyukjinKwon changed the title [SPARK-32010][PYTHON] Add InheritableThread for local properties and fixing a thread leak issue in pinned thread mode [SPARK-32010][PYTHON][CORE] Add InheritableThread for local properties and fixing a thread leak issue in pinned thread mode Jul 1, 2020
@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 1, 2020

Just a question: do we need [CORE] tag for this kind of Python-code-specific change? Although this touches PySpark RDD, we usually use [CORE] for core module literally.

@SparkQA

This comment has been minimized.

@HyukjinKwon
Copy link
Member Author

I think that's what people usually do. In particular, ML side often. I think it's better to classify it more explicitly.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@dongjoon-hyun
Copy link
Member

Oh, got it. It seems that I misunderstand the standard. Thanks. :)

I think that's what people usually do. In particular, ML side often. I think it's better to classify it more explicitly.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA

This comment has been minimized.

@HyukjinKwon
Copy link
Member Author

@dongjoon-hyun no problem :-). I think this is more about preference things .. If it causes any problem or confusion, I will change my way.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 2, 2020

Test build #124839 has finished for PR 28968 at commit f369666.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

gentle ping for a review :-).

@SparkQA
Copy link

SparkQA commented Jul 14, 2020

Test build #125802 has finished for PR 28968 at commit a78fd43.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InheritableThread(threading.Thread):

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 21, 2020

Test build #126222 has finished for PR 28968 at commit a78fd43.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InheritableThread(threading.Thread):

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 21, 2020

Test build #126231 has finished for PR 28968 at commit a78fd43.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InheritableThread(threading.Thread):

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 25, 2020

Test build #126528 has finished for PR 28968 at commit a78fd43.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InheritableThread(threading.Thread):

@HyukjinKwon HyukjinKwon deleted the SPARK-32010 branch July 27, 2020 07:43
@HyukjinKwon HyukjinKwon restored the SPARK-32010 branch July 27, 2020 07:46
@HyukjinKwon HyukjinKwon reopened this Jul 27, 2020
@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 27, 2020

Test build #126640 has finished for PR 28968 at commit a78fd43.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InheritableThread(threading.Thread):

@WeichenXu123
Copy link
Contributor

sync with @HyukjinKwon offline, LGTM except one concern:
if we add this thread class instead of fixing it in py4j, can we enable pin thread mode by default in future release ?
if enable pin thread mode by default, many user existing code which use plain thread may meet memory leak issue.

But fixing it in py4j seems to be difficult, py4j do not know which thread is about to be GCed except thread notifying py4j initiatively

@HyukjinKwon
Copy link
Member Author

Thanks @WeichenXu123. I will leave it open few more days before merging it.

@HyukjinKwon
Copy link
Member Author

Merged to master.

each thread with its own local properties. To work around this, you should manually copy and set the
local properties from the parent thread to the child thread when you create another thread in PVM.
to `true`. This pinned thread mode allows one PVM thread has one corresponding JVM thread. With this mode,
`pyspark.InheritableThread` is recommanded to use together for a PVM thread to inherit the interitable attributes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: interitable -> inheritable


if isinstance(sc._gateway, ClientServer):
# Here's when the pinned-thread mode (PYSPARK_PIN_THREAD) is on.
properties = sc._jsc.sc().getLocalProperties().clone()
Copy link
Member

@viirya viirya Aug 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to clone? Doesn't sc.localProperties get clone in childValue already?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we're mimicking that behaviour here because the thread in JVM does not respect the inheritance here since the thread is always sepearately created via the JVM gateway whereas Scala Java side we can keep the inheritance by creating a thread within a thread.

@viirya
Copy link
Member

viirya commented Aug 29, 2020

I found I missed this and looked at now. LGTM. I'm just wondering we should use InheritableThread in the PR description to verify the fix?

>>> from threading import Thread
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> spark._jvm._gateway_client.deque
deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> spark._jvm._gateway_client.deque
deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc08d0>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])

@HyukjinKwon
Copy link
Member Author

Oh yeah we should use InheritableThread instead of Thread to verify this PR.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Aug 29, 2020

Thank you for taking a look @viirya.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants