Skip to content

Commit

Permalink
SHS-NG M2: Store FsHistoryProvider listing data in a KVStore.
Browse files Browse the repository at this point in the history
The application listing is still generated from event logs, but is now stored
in a KVStore instance. By default an in-memory store is used, but a new config
allows setting a local disk path to store the data, in which case a LevelDB
store will be created.

The provider stores things internally using the public REST API types; I believe
this is better going forward since it will make it easier to get rid of the
internal history server API which is mostly redundant at this point.

I also added a finalizer to LevelDBIterator, to make sure that resources are
eventually released. This helps when code iterates but does not exhaust the
iterator, thus not triggering the auto-close code.

HistoryServerSuite was modified to not re-start the history server unnecessarily;
this makes the json validation tests run more quickly.
  • Loading branch information
Marcelo Vanzin committed Aug 8, 2017
1 parent 2c1bfb4 commit 842589d
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 325 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ public synchronized void close() throws IOException {
}
}

/**
* Because it's tricky to expose closeable iterators through many internal APIs, especially
* when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by
* the iterator will eventually be released.
*/
@Override
protected void finalize() throws Throwable {
try {
close();
} catch (Exception e) {
// Ignore error here, db may have been closed already.
}
}

private byte[] loadNext() {
if (count >= max) {
return null;
Expand Down
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<artifactId>spark-launcher_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kvstore_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ private[history] case class LoadedAppUI(

private[history] abstract class ApplicationHistoryProvider {

/**
* The number of applications available for listing. Separate method in case it's cheaper
* to get a count than to calculate the whole listing.
*
* @return The number of available applications.
*/
def getAppCount(): Int = getListing().size

/**
* Returns the count of application event logs that the provider is currently still processing.
* History Server UI can use this to indicate to a user that the application listing on the UI
Expand Down
Loading

0 comments on commit 842589d

Please sign in to comment.