Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22340][PYTHON] Add a mode to pin Python thread into JVM's #24898

Closed
wants to merge 6 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jun 18, 2019

What changes were proposed in this pull request?

This PR proposes to add Single threading model design (pinned thread model) mode which is an experimental mode to sync threads on PVM and JVM. See https://www.py4j.org/advanced_topics.html#using-single-threading-model-pinned-thread

Multi threading model

Currently, PySpark uses this model. Threads on PVM and JVM are independent. For instance, in a different Python thread, callbacks are received and relevant Python codes are executed. JVM threads are reused when possible.

Py4J will create a new thread every time a command is received and there is no thread available. See the current model we're using - https://www.py4j.org/advanced_topics.html#the-multi-threading-model

One problem in this model is that we can't sync threads on PVM and JVM out of the box. This leads to some problems in particular at some codes related to threading in JVM side. See:

protected[spark] val localProperties = new InheritableThreadLocal[Properties] {

Due to reusing JVM threads, seems the job groups in Python threads cannot be set in each thread as described in the JIRA.

Single threading model design (pinned thread model)

This mode pins and syncs the threads on PVM and JVM to work around the problem above. For instance, in the same Python thread, callbacks are received and relevant Python codes are executed. See https://www.py4j.org/advanced_topics.html#the-single-threading-model

Even though this mode can sync threads on PVM and JVM for other thread related code paths,
this might cause another problem: seems unable to inherit properties as below (assuming multi-thread mode still creates new threads when existing threads are busy, I suspect this issue already exists when multiple jobs are submitted in multi-thread mode; however, it can be always seen in single threading mode):

$ PYSPARK_PIN_THREAD=true ./bin/pyspark
import threading

spark.sparkContext.setLocalProperty("a", "hi")
def print_prop():
    print(spark.sparkContext.getLocalProperty("a"))

threading.Thread(target=print_prop).start()
None

Unlike Scala side:

spark.sparkContext.setLocalProperty("a", "hi")
new Thread(new Runnable {
  def run() = println(spark.sparkContext.getLocalProperty("a"))
}).start()
hi

This behaviour potentially could cause weird issues but this PR currently does not target this fix this for now since this mode is experimental.

How does this PR fix?

Basically there are two types of Py4J servers GatewayServer and ClientServer. The former is for multi threading and the latter is for single threading. This PR adds a switch to use the latter.

In Scala side:
The logic to select a server is encapsulated in Py4JServer and use Py4JServer at PythonRunner for Spark summit and PythonGatewayServer for Spark shell. Each uses ClientServer when PYSPARK_PIN_THREAD is true and GatewayServer otherwise.

In Python side:
Simply do an if-else to switch the server to talk. It uses ClientServer when PYSPARK_PIN_THREAD is true and GatewayServer otherwise.

This is disabled by default for now.

How was this patch tested?

Manually tested. This can be tested via:

PYSPARK_PIN_THREAD=true ./bin/pyspark

and/or

cd python
./run-tests --python-executables=python --testnames "pyspark.tests.test_pin_thread"

Also, ran the Jenkins tests with PYSPARK_PIN_THREAD enabled.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@HyukjinKwon HyukjinKwon force-pushed the pinned-thread branch 2 times, most recently from 863eb58 to 201eb4a Compare June 19, 2019 06:09
@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-22340][PYTHON] Add a mode to pin Python thread into JVM's [SPARK-22340][PYTHON] Add a mode to pin Python thread into JVM's Jun 19, 2019
@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@WeichenXu123

This comment has been minimized.

@HyukjinKwon

This comment has been minimized.

@HyukjinKwon

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Nov 1, 2019

Test build #113066 has finished for PR 24898 at commit f72a38d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 4, 2019

Test build #113191 has finished for PR 24898 at commit f72a38d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 4, 2019

Test build #113203 has finished for PR 24898 at commit f72a38d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm other than some very minor things

# When thread is pinned, job group should be set for each thread for now.
# Local properties seem not being inherited like Scala side does.
if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true":
sc.setJobGroup('test_progress_api', '', True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, this test probably isn't reliable outside of pinned mode, right? the java side could arbitrarily decide to switch threads at any point.

anyway, just something to keep in mind if we notice flakiness in this test in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah .. I think so, though, at least this test hasn't been detected as a flaky test yet. I was actually thinking of removing this test out even but .. let me leave this out of this PR scope for now.

is_job_cancelled[index] = False
except Exception:
# Assume that exception means job cancellation.
is_job_cancelled[index] = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have always been confused about the guarantees of python around mutating a variable like this from multiple threads -- I can't find anything which makes it clear that this mutation is visible to other threads. The section on the GIL says they'll be atomic (https://docs.python.org/3/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe) but that isn't quite the same.

I guess this OK? again something to be aware of it we see flakiness

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, such pattern is considered safe given my experience. I think D[x] = y infers this case .. ? I think it's fine anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, there's dis package to check Python's opcodes (e.g., import dis; func = lambda: 1 + 1; dis.dis(func)). seems assignment is a single atomic instruction in Python so looks fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that link above says it'll be atomic, but that's not exactly the same as knowing the change is visible -- there will be some per-core cache which isn't always flushed. Or, at least, its not in lower-level languages, but maybe it really is in python? I guess it must be (or flushed every time the GIL changes threads); otherwise this would have to be discussed somewhere in python docs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Yeah, we were talking about visibility. I think it must be ...

@HyukjinKwon
Copy link
Member Author

Thanks @squito for the thorough review. @WeichenXu123 do you have some comments on this? Otherwise, looks we're good to go.

@SparkQA
Copy link

SparkQA commented Nov 7, 2019

Test build #113380 has finished for PR 24898 at commit 9e2d832.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

I think we will go in this direction .. I am merging this given the sign-off and i'm pretty confident of this change.

But still let me know guys here if you have any concern or issue. We can still consider reverting this and going to another direction if we find that's better.

@HyukjinKwon
Copy link
Member Author

This will actually fix many potential issues.

cc @brkyvz FYI since we talked about threads in PySpark before.

@HyukjinKwon
Copy link
Member Author

Merged to master.

@HyukjinKwon HyukjinKwon deleted the pinned-thread branch March 3, 2020 01:17
HyukjinKwon added a commit that referenced this pull request Jul 30, 2020
…s and fixing a thread leak issue in pinned thread mode

### What changes were proposed in this pull request?

This PR proposes:

1. To introduce `InheritableThread` class, that works identically with `threading.Thread` but it can inherit the inheritable attributes of a JVM thread such as `InheritableThreadLocal`.

    This was a problem from the pinned thread mode, see also #24898. Now it works as below:

    ```python
    import pyspark

    spark.sparkContext.setLocalProperty("a", "hi")
    def print_prop():
        print(spark.sparkContext.getLocalProperty("a"))

    pyspark.InheritableThread(target=print_prop).start()
    ```

    ```
    hi
    ```

2. Also, it adds the resource leak fix into `InheritableThread`. Py4J leaks the thread and does not close the connection from Python to JVM. In `InheritableThread`, it manually closes the connections when PVM garbage collection happens. So, JVM threads finish safely. I manually verified by profiling but there's also another easy way to verify:

    ```bash
    PYSPARK_PIN_THREAD=true ./bin/pyspark
    ```

    ```python
    >>> from threading import Thread
    >>> Thread(target=lambda: spark.range(1000).collect()).start()
    >>> Thread(target=lambda: spark.range(1000).collect()).start()
    >>> Thread(target=lambda: spark.range(1000).collect()).start()
    >>> spark._jvm._gateway_client.deque
    deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])
    >>> Thread(target=lambda: spark.range(1000).collect()).start()
    >>> spark._jvm._gateway_client.deque
    deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc08d0>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])
    ```

    This issue is fixed now.

3. Because now we have a fix for the issue here, it also proposes to deprecate `collectWithJobGroup` which was a temporary workaround added to avoid this leak issue.

### Why are the changes needed?

To support pinned thread mode properly without a resource leak, and a proper inheritable local properties.

### Does this PR introduce _any_ user-facing change?

Yes, it adds an API `InheritableThread` class for pinned thread mode.

### How was this patch tested?

Manually tested as described above, and unit test was added as well.

Closes #28968 from HyukjinKwon/SPARK-32010.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request Jun 18, 2021
### What changes were proposed in this pull request?

PySpark added pinned thread mode at #24898 to sync Python thread to JVM thread. Previously, one JVM thread could be reused which ends up with messed inheritance hierarchy such as thread local especially when multiple jobs run in parallel. To completely fix this, we should enable this mode by default.

### Why are the changes needed?

To correctly support parallel job submission and management.

### Does this PR introduce _any_ user-facing change?

Yes, now Python thread is mapped to JVM thread one to one.

### How was this patch tested?

Existing tests should cover it.

Closes #32429 from HyukjinKwon/SPARK-35303.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit to py4j/py4j that referenced this pull request Mar 14, 2022
…d mode (#471)

## What is the problem?

Correctly, there is resource leak when using the pinned thread mode (see also apache/spark#24898).

For example, if you repeat the codes below multiple times to create Py4J connections in multiple threads,

```python
# PySpark application
import threading

def print_prop():
    # Py4J connection is used under the hood.
    print(spark.sparkContext.getLocalProperty("a"))

threading.Thread(target=print_prop).start()
```

the number of leftover connections grows:

```python
spark._jvm._gateway_client.deque
deque([<py4j.clientserver.ClientServerConnection object at 0x7fdc60170940>, <py4j.clientserver.ClientServerConnection object at 0x7fdca011e760>, <py4j.clientserver.ClientServerConnection object at 0x7fdcb01acdc0>, <py4j.clientserver.ClientServerConnection object at 0x7fdc60170100>, <py4j.clientserver.ClientServerConnection object at 0x7fdcb0232d30>])
```

In the environment where multiple threads are used without a pool, it easily causes "Too many files open" due to the lack of file descriptors (as they are all occupied by unclosed sockets).

## How do you fix?

This PR adds another variable to thread local that cleans up the connection right before the thread is finished. We need it as a separate thread local because `connection` is NOT cleaned because the reference is being held at `JavaClient.deque`.

See also 50fe45e for more details.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants