Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20642][core] Store FsHistoryProvider listing data in a KVStore. #18887

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,32 @@ public long count(Class<?> type, String index, Object indexedValue) throws Excep

@Override
public void close() throws IOException {
DB _db = this._db.getAndSet(null);
if (_db == null) {
return;
synchronized (this._db) {
DB _db = this._db.getAndSet(null);
if (_db == null) {
return;
}

try {
_db.close();
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
}
}
}

try {
_db.close();
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
/**
* Closes the given iterator if the DB is still open. Trying to close a JNI LevelDB handle
* with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
*/
void closeIterator(LevelDBIterator it) throws IOException {
synchronized (this._db) {
DB _db = this._db.get();
if (_db != null) {
it.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ public synchronized void close() throws IOException {
}
}

/**
* Because it's tricky to expose closeable iterators through many internal APIs, especially
* when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by
* the iterator will eventually be released.
*/
@Override
protected void finalize() throws Throwable {
db.closeIterator(this);
}

private byte[] loadNext() {
if (count >= max) {
return null;
Expand Down
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<artifactId>spark-launcher_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kvstore_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_${scala.binary.version}</artifactId>
Expand Down
Loading