Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where SparkRapidsAdaptor could be shut down but not through RmmSpark #984

Merged
merged 1 commit into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static long getCurrentThreadId() {
*/
public static void associateThreadWithTask(long threadId, long taskId) {
synchronized (Rmm.class) {
if (sra != null) {
if (sra != null && sra.isOpen()) {
sra.associateThreadWithTask(threadId, taskId);
}
}
Expand All @@ -140,7 +140,7 @@ public static void associateCurrentThreadWithTask(long taskId) {
*/
public static void associateThreadWithShuffle(long threadId) {
synchronized (Rmm.class) {
if (sra != null) {
if (sra != null && sra.isOpen()) {
sra.associateThreadWithShuffle(threadId);
}
}
Expand All @@ -160,7 +160,7 @@ public static void associateCurrentThreadWithShuffle() {
*/
public static void removeThreadAssociation(long threadId) {
synchronized (Rmm.class) {
if (sra != null) {
if (sra != null && sra.isOpen()) {
sra.removeThreadAssociation(threadId);
}
}
Expand All @@ -180,7 +180,7 @@ public static void removeCurrentThreadAssociation() {
*/
public static void taskDone(long taskId) {
synchronized (Rmm.class) {
if (sra != null) {
if (sra != null && sra.isOpen()) {
sra.taskDone(taskId);
}
}
Expand All @@ -192,7 +192,7 @@ public static void taskDone(long taskId) {
*/
public static void threadCouldBlockOnShuffle(long threadId) {
synchronized (Rmm.class) {
if (sra != null) {
if (sra != null && sra.isOpen()) {
sra.threadCouldBlockOnShuffle(threadId);
}
}
Expand All @@ -211,7 +211,7 @@ public static void threadCouldBlockOnShuffle() {
*/
public static void threadDoneWithShuffle(long threadId) {
synchronized (Rmm.class) {
if (sra != null) {
if (sra != null && sra.isOpen()) {
sra.threadDoneWithShuffle(threadId);
}
}
Expand Down Expand Up @@ -247,7 +247,7 @@ public static void blockThreadUntilReady() {
// Technically there is a race here, but because this can block we cannot hold the Rmm
// lock while doing this, or we can deadlock. So we are going to rely on Rmm shutting down
// or being reconfigured to be rare.
if (local != null) {
if (local != null && local.isOpen()) {
local.blockThreadUntilReady();
}
}
Expand All @@ -267,7 +267,7 @@ public static void forceRetryOOM(long threadId) {
*/
public static void forceRetryOOM(long threadId, int numOOMs) {
synchronized (Rmm.class) {
if (sra != null) {
if (sra != null && sra.isOpen()) {
sra.forceRetryOOM(threadId, numOOMs);
} else {
throw new IllegalStateException("RMM has not been configured for OOM injection");
Expand All @@ -290,7 +290,7 @@ public static void forceSplitAndRetryOOM(long threadId) {
*/
public static void forceSplitAndRetryOOM(long threadId, int numOOMs) {
synchronized (Rmm.class) {
if (sra != null) {
if (sra != null && sra.isOpen()) {
sra.forceSplitAndRetryOOM(threadId, numOOMs);
} else {
throw new IllegalStateException("RMM has not been configured for OOM injection");
Expand All @@ -315,7 +315,7 @@ public static void forceCudfException(long threadId) {
*/
public static void forceCudfException(long threadId, int numTimes) {
synchronized (Rmm.class) {
if (sra != null) {
if (sra != null && sra.isOpen()) {
sra.forceCudfException(threadId, numTimes);
} else {
throw new IllegalStateException("RMM has not been configured for OOM injection");
Expand All @@ -325,7 +325,7 @@ public static void forceCudfException(long threadId, int numTimes) {

public static RmmSparkThreadState getStateOf(long threadId) {
synchronized (Rmm.class) {
if (sra != null) {
if (sra != null && sra.isOpen()) {
return sra.getStateOf(threadId);
} else {
// sra is not set so the thread is by definition unknown to it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public void close() {
super.close();
}


public boolean isOpen() {
return handle != 0;
}

/**
* Associate a thread with a given task id.
* @param threadId the thread ID to use (not java thread id)
Expand Down