Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20642][core] Store FsHistoryProvider listing data in a KVStore. #18887

Closed
wants to merge 14 commits into from

Conversation

vanzin
Copy link
Contributor

@vanzin vanzin commented Aug 8, 2017

The application listing is still generated from event logs, but is now stored
in a KVStore instance. By default an in-memory store is used, but a new config
allows setting a local disk path to store the data, in which case a LevelDB
store will be created.

The provider stores things internally using the public REST API types; I believe
this is better going forward since it will make it easier to get rid of the
internal history server API which is mostly redundant at this point.

I also added a finalizer to LevelDBIterator, to make sure that resources are
eventually released. This helps when code iterates but does not exhaust the
iterator, thus not triggering the auto-close code.

HistoryServerSuite was modified to not re-start the history server unnecessarily;
this makes the json validation tests run more quickly.

The application listing is still generated from event logs, but is now stored
in a KVStore instance. By default an in-memory store is used, but a new config
allows setting a local disk path to store the data, in which case a LevelDB
store will be created.

The provider stores things internally using the public REST API types; I believe
this is better going forward since it will make it easier to get rid of the
internal history server API which is mostly redundant at this point.

I also added a finalizer to LevelDBIterator, to make sure that resources are
eventually released. This helps when code iterates but does not exhaust the
iterator, thus not triggering the auto-close code.

HistoryServerSuite was modified to not re-start the history server unnecessarily;
this makes the json validation tests run more quickly.
@vanzin
Copy link
Contributor Author

vanzin commented Aug 8, 2017

For context:

@@ -86,7 +86,7 @@ This file is divided into 3 sections:
</check>

<check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
<parameters><parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter></parameters>
<parameters><parameter name="regex"><![CDATA[(config|[A-Z][A-Za-z]*)]]></parameter></parameters>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding this so that we can have an object named config; this seems cleaner than the current approach of having package object config, which requires every constant to be tagged as private, instead of just marking the whole object as private[spark].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree it makes more sense to make the whole thing private[spark], but couldn't you just turn the rule off around this object, like we do for println?

(Also I'm confused why package object config doesn't violate the rule).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be done, but I want this to be the "official" pattern going forward, and having to disable scalastyle every time it's used is kinda ugly and somewhat detracts from the official-ness of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense. I wasn't paying enough attention to the actual change at first, thought you were just turning the rule entirely off, but I agree this changes makes sense.

btw I was curious why package object config was OK -- its just a hard-coded special case: https://github.com/scalastyle/scalastyle/blob/master/src/main/scala/org/scalastyle/scalariform/ClassNamesChecker.scala#L74

Make sure the db is still open before trying to close the iterator,
otherwise it may cause a JVM crash.
@SparkQA
Copy link

SparkQA commented Aug 9, 2017

Test build #80420 has finished for PR 18887 at commit 842589d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class KVStoreMetadata(
  • case class LogInfo(

@SparkQA
Copy link

SparkQA commented Aug 9, 2017

Test build #80422 has finished for PR 18887 at commit 1f08bd7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 9, 2017

Test build #80431 has finished for PR 18887 at commit 1ec1a67.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Aug 11, 2017

hmm... @squito @ajbozarth @jerryshao

Copy link
Member

@ajbozarth ajbozarth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions and feedback, but the SHS stuff looks good, I'm not well versed in the KVStore/DB side of things so I can't vouch for those, though the code looks fine.

@@ -76,6 +76,14 @@ private[history] case class LoadedAppUI(
private[history] abstract class ApplicationHistoryProvider {

/**
* The number of applications available for listing. Separate method in case it's cheaper
* to get a count than to calculate the whole listing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow this reasoning, if the previous way of getting count was getListing().size then how does making a function of it speed it up? I don't mind adding a helping function like this, I just don't follow the reasoning of your comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interface, so this was added to allow implementations to override this method if that makes sense.

It just looks like I lost the override in one of my rebases, so let me add that back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it doesn't seem like this is used anymore and I can remove it...

new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList))
val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000

// Iterate descending over all applications whose oldest attempt is older than the maxAge.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxAge -> maxTime


val attempts = oldApp.attempts.filter(_.info.attemptId != attempt.info.attemptId) ++
List(attempt)
val oldestAttempt = attempts.map(_.info.lastUpdated.getTime()).min
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this val used anywhere?

@@ -742,53 +698,145 @@ private[history] object FsHistoryProvider {
private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\""

private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\""

private val CURRENT_VERSION = 1L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current version of?

val fileSize: Long)

private[history] class AttemptInfoWrapper(
val info: v1.ApplicationAttemptInfo,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm using this syntax because in many places there are conflicting type names in the API package and in other packages.

@@ -31,6 +33,9 @@ class ApplicationInfo private[spark](
val memoryPerExecutorMB: Option[Int],
val attempts: Seq[ApplicationAttemptInfo])

@JsonIgnoreProperties(
value = Array("startTimeEpoch", "endTimeEpoch", "lastUpdatedEpoch"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this exclude the Epoch values from the api? Because if I remember correctly we added those for the api specifically

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this just avoids trying to deserialize them, which would cause an error because these properties have no setter.

@jerryshao
Copy link
Contributor

jerryshao commented Aug 14, 2017

Looks like it is a little big to fast understand, I need to download the patch and play it at first :).

@SparkQA
Copy link

SparkQA commented Aug 14, 2017

Test build #80639 has finished for PR 18887 at commit b696f96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jerryshao jerryshao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, just some minor comments.

One question about FsHistoryProvider, looks like we removed some synchronizations here, I'm not sure if our underlying kvstore well supports concurrency, or it is not an issue any more?

import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.kvstore._

/**
* A class that provides application history from event logs stored in the file system.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw the comments here refers to the removed data structure (though it is folded here), would you please fix the comment.

db.setMetadata(new KVStoreMetadata(CURRENT_LISTING_VERSION, logDir.toString()))
db
} else if (meta.version != CURRENT_LISTING_VERSION ||
!logDir.toString().equals(meta.logDir)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: logDirhere a String, I think no need to calltoString` here and above.

@@ -220,6 +220,13 @@ The history server can be configured as follows:
Number of threads that will be used by history server to process event logs.
</td>
</tr>
<tr>
<td>spark.history.store.path</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check from the log, looks like we don't have a default path for local history store, do we need to add a default value here in doc?

val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path")
    .stringConf
    .createOptional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using this as a trigger of whether to use the disk store or not; if you set a local store directory, you're using the disk store, otherwise you're using the memory store. I do need to update it in the documentation, though.

@SparkQA
Copy link

SparkQA commented Aug 16, 2017

Test build #80701 has finished for PR 18887 at commit 519dab0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one big question -- do you think that in the initial version this should replace the old FSHistoryProvider? I feel like we should have one version where the old code is still available, controlled by a feature flag.

Other than that mostly minor things on the code.

@@ -301,6 +334,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

override def stop(): Unit = {
listing.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this throws an exception, should we still try to cleanup initThread?


// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
private val listing = storePath.map { path =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the large initializer, could you add an explicit type annotation to listing to help the reader?

prevFileSize < entry.getLen() &&
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) &&
recordedFileSize(entry.getPath()) < entry.getLen()
}
.flatMap { entry => Some(entry) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

realize this isn't your change, but what is the point of this? isn't it a no-op?

var coresPerExecutor: Option[Int] = None
var memoryPerExecutorMB: Option[Int] = None

def toView(attempts: List[AttemptInfoWrapper]): ApplicationInfoWrapper = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this is only ever called with one AttemptInfoWrapper, so simpler if you remove the List.

@vanzin
Copy link
Contributor Author

vanzin commented Aug 16, 2017

I feel like we should have one version where the old code is still available, controlled by a feature flag.

I'm not sure exactly what you're suggesting. The default behavior is still, as much as possible, the old one: everything is kept in memory. Keeping the exact old code in place would mean forking FsHistoryProvider, which is not something I see as desirable.

Also, forgot to reply to an earlier comment by @jerryshao :

looks like we removed some synchronizations here, I'm not sure if

KVStore instances are thread safe, so a lot of the old synchronization in this class does not apply. There was code before that used non-thread-safe data structures (such as attemptsToClean), which has been replaced with storing things in the KVStore instance instead.

@squito
Copy link
Contributor

squito commented Aug 16, 2017

I feel like we should have one version where the old code is still available, controlled by a feature flag.

I'm not sure exactly what you're suggesting. The default behavior is still, as much as possible, the old one: everything is kept in memory. Keeping the exact old code in place would mean forking FsHistoryProvider, which is not something I see as desirable.

long-term, definitely not desirable. My concern is that there is a lot of new code taking effect here, on some important functionality -- we don't want some bug to prevent adoption of 2.3. For one version, you could leave the old one available, rename it to something else, and put a big disclaimer in the code that its obsolete, and as long as thing are smooth for 2.3 delete it entirely for 2.4. The HS isn't as core or tricky as the network module but I'm thinking of this like netty vs. nio.

@vanzin
Copy link
Contributor Author

vanzin commented Aug 16, 2017

There's quite a lot of unit tests that cover this code; forking would mean also making those unit tests run against both versions of the code so that no one breaks anything, and potentially fixing bugs in two different places if they're found.

I'd rather avoid the overhead. The bulk of FsHistoryProvider, which is the part that actually monitors the file system and makes decisions about when to parse things, is mostly unchanged.

And the rest of this project is way more disruptive than this one change.

@SparkQA
Copy link

SparkQA commented Aug 16, 2017

Test build #80750 has finished for PR 18887 at commit dc642bd.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Aug 17, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Aug 17, 2017

Test build #80757 has finished for PR 18887 at commit dc642bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Aug 17, 2017

Just to keep others in the loop, Marcelo and I talked about this some offline. I think this PR itself is fine, but to me this is an important point in the larger history server project he's doing, so I'm going to take a look at the rest of the changes as well before I feel comfortable merging. Also he explained why it extremely complicated to keep both the old & new version available, which make sense to me, though I may poke at a version myself just to see how complex it is.

@squito
Copy link
Contributor

squito commented Aug 21, 2017

I futzed around for a while with trying to keep the old stuff around, and I realized it really would be quite a mess. The biggest problem is that the old rest api is just waay too tied into the UI, eg https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala#L26 , probably my fault in that implementation of the rest apis. You could keep the old code around, but would involve so much moving and refactoring that it seems pointless.

I'm still looking at the other changes in the larger project, and I'd encourage other reviewers to do the same.

@gatorsmile
Copy link
Member

gatorsmile commented Sep 9, 2017

I have the same concern, as @squito said above #18887 (review). Refactoring the old codes does not seem pointless to me.

To evaluate the impacts, I have a few questions:

  • What is the migration proposal/guides?
  • What is the proposal when users are using multiple different versions of Sparks in their production environment or during migration procedure?
  • What should users do when they hit serious bugs that we are unable to find at this stage?
  • What are the external behavior changes between the new and the old ones?

@vanzin
Copy link
Contributor Author

vanzin commented Sep 11, 2017

What is the migration proposal/guides?

Not sure what you mean. There's no change in behavior by default, so there's no migration of anything needed.

What should users do when they hit serious bugs that we are unable to find at this stage?

This is always a question when you introduce a change. This particular project makes it a bit worse since it's a bigger change. But the answer is the same: file a bug, we fix it, next release has the fix; if it's really important for you, you can patch your own Spark, or revert to an older release, as many people do when they find blockers.

The changes in this project do not influence whether your application will fail or not; they're isolated to the UI, so at worst it will make things harder to debug until the issues are fixed.

On the other hand, a lot can be mitigated by testing; there's already a lot of test coverage for parts of this code, but the UI is kinda the exception there. So the longer these changes stay in review instead of being committed, the less coverage they'll have in people's day to day testing, actually increasing the risk that some bug might be missed.

What are the external behavior changes between the new and the old ones?

None by default.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I quickly went over the changes and left a few comments. I might need more time to understand the potential impact.


val fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]()
private val storePath = conf.get(LOCAL_STORE_DIR)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a description on storePath or LOCAL_STORE_DIR, although we have the one in monitoring.md

.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("7d")

val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to confirm it. Except this, no change on the other parameters. Right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd better to document the default one is an in-memory store.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, other parameters are the same.

} else if (meta.version != CURRENT_LISTING_VERSION || !logDir.equals(meta.logDir)) {
logInfo("Detected mismatched config in existing DB, deleting...")
db.close()
Utils.deleteRecursively(dbPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the version does not match, we delete the files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; the code will re-create the data from the event logs when that happens.

@@ -742,53 +698,146 @@ private[history] object FsHistoryProvider {
private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\""

private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\""

/** Current version of the data written to the listing database. */
private val CURRENT_LISTING_VERSION = 1L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to find the definition and usage of CURRENT_LISTING_VERSION , but it sounds like this is not discussed. What does this really mean? When will we change this value? Do we have a complete story about this flag?

Copy link
Contributor Author

@vanzin vanzin Sep 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a more verbose comment; but this is basically me punting proper versioning to the next Spark release after this code is added. This version number is a "nuclear option"; if we break the data that is serialized to disk, we increase this number, and the new SHS will delete all old data and re-generate it from event logs.

I'm punting because there is no versioning issue in the first version; there's no existing data that the SHS might try to read.

I plan to take a closer look at versioning after all the initial PRs go in, but leaving this here gives us a choice in case there's a more urgent need to break things.


private[history] case class KVStoreMetadata(
val version: Long,
val logDir: String)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above two val are redundant.


attempt.attemptId = event.appAttemptId
attempt.startTime = new Date(event.time)
attempt.lastUpdated = new Date(clock.getTimeMillis())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clock.getTimeMillis() and event.time are always in the same time zone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are basically System.currentTimeMillis(), so yes.

.reverse()
.iterator()
.asScala
.map(_.toAppHistoryInfo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: toAppHistoryInfo()

@@ -229,10 +254,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

override def getListing(): Iterator[FsApplicationHistoryInfo] = applications.values.iterator
override def getListing(): Iterator[ApplicationHistoryInfo] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned order is descending, right? This is not straightforward from the codes. Please add a comment

initThread.join()
}
} finally {
listing.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What might happen if LevelDB is not properly closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might leave JNI handles open (a.k.a. a memory leak).

Also, this doesn't apply to this change, but later when the UI info is also written to disk, it could prevent the UI db from being replaced with an updated one, since its files would be opened.

prevFileSize < entry.getLen() &&
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) &&
recordedFileSize(entry.getPath()) < entry.getLen()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment to explain what recordedFileSize(entry.getPath()) returns? In the original code, the variable name is self descriptive. The new change does not have it any more.

@KVIndexParam logPath: String,
fileSize: Long)

private[history] class AttemptInfoWrapper(
Copy link
Contributor

@cloud-fan cloud-fan Sep 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this one doesn't have a natural index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's not directly written to the store.

.stringConf
.createWithDefault(DEFAULT_LOG_DIR)

val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does the ending S mean? seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This pattern is used in a bunch of other places to indicate the unit of time of the config.

@@ -496,7 +517,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

var provider: FsHistoryProvider = null
try {
provider = new FsHistoryProvider(conf)
provider = newProvider(conf)
Copy link
Contributor

@cloud-fan cloud-fan Sep 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems worse to have newProvider method and create a lot of diff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was part of some code I only partially reverted. Will revert these.

} catch {
case e: FileNotFoundException => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will never hit FileNotFoundException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it still can until I remove the current replay code in a later change.

appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
val appInfo = load(appId)
appInfo.attempts
.find { attempt => attempt.info.attemptId == attemptId }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: _.info.attemptId == attemptId

Some(attempt.lastUpdated), attempt.startTime)
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name,
HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
Some(attempt.info.lastUpdated.getTime()), attempt.info.startTime.getTime())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we make load return ApplicationHistoryInfo instead of ApplicationInfoWrapper? Then we can reduce a lot of unnecessary diff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AttemptInfoWrapper stores information that is not available in the public API and is used by the provider. Later changes also add more fields to ApplicationInfoWrapper that are needed by the provider.

None
}

ui.appSparkVersion = appListener.appSparkVersion.getOrElse("")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an assert(appListener.appId.isDefined)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the listing exists it's unlikely that this would ever trigger, but sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This code will also go away in subsequent changes.)

// scan for modified applications, replay and merge them
val logInfos: Seq[FileStatus] = statusList
val logInfos = statusList
Copy link
Contributor

@cloud-fan cloud-fan Sep 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just do
val logInfos = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)?

app
}

def compareAttemptInfo(a1: AttemptInfoWrapper, a2: AttemptInfoWrapper): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a isStartedEarlierThan(other) method in AttemptInfoWrapper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used here, I'd rather keep the logic local.

val attempt = app.attempts.head

val oldApp = try {
listing.read(classOf[ApplicationInfoWrapper], app.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can call load here

}
}

/**
* Replay the log files in the list and merge the list of old applications with new ones
* Replay the given log file, saving the application in the listing db.
Copy link
Contributor

@cloud-fan cloud-fan Sep 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why update the comment? We don't merge anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. If you look at the old code it did a "merge sort" kinda thing to create an updated listing. KVStore sorts things internally so there's no need for that code anymore - you just write something to it, and it's sorted magically.

iterator.get.asScala.foreach { app =>
// Applications may have multiple attempts, some of which may not need to be deleted yet.
val (remaining, toDelete) = app.attempts.partition { attempt =>
attempt.info.lastUpdated.getTime() >= maxTime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@vanzin vanzin Sep 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is comparing different things.

Old code compared how long since the last update has passed.

This code compares the absolute expiration time against the log's last update.

If I make your change, the unit test stops passing (and it hasn't changed from the previous).

appInfo.attempts.find(_.attemptId == attemptId)
}
val count = listing.count(classOf[ApplicationInfoWrapper])
s"""|FsHistoryProvider{logdir=$logDir,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one space after |

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would make the string look weird.

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82206 has finished for PR 18887 at commit 5eff2c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM, merging to master!

@asfgit asfgit closed this in 74daf62 Sep 27, 2017
@mgaido91
Copy link
Contributor

sorry, this is causing a checkstyle error for me while building the project due to the presence of a finalizer method. I can't understand how this was able to pass the Jenkins test. Does someone have any clue on that? Thanks.

@cloud-fan
Copy link
Contributor

hmmm weird, @srowen any ideas?

@srowen
Copy link
Member

srowen commented Sep 27, 2017

Oh, is it because SBT doesn't run checkstyle but Maven does, and SBT runs the PR builder? This could be my fault, for recently re-enabling checkstyle. This may be why. Well, for now, I'd at least just submit the hotfix to suppress this checkstyle rule

@cloud-fan
Copy link
Contributor

yea the PR build use SBT. But I think SBT also run checkstyle before?

@srowen
Copy link
Member

srowen commented Sep 27, 2017

I don't think so because checkstyle is for Java only and SBT won't compile Java code.
I'd kind of love to stop mixing SBT and Maven but that's quite a separate thing.

I'll put up a HOTFIX that both fixes the style violation and disables checkstyle again. Now I remember this was the reason -- it wasn't because we used to compile Java code separately from the Scala code.

@vanzin vanzin deleted the SPARK-20642 branch September 27, 2017 18:27
ghost pushed a commit to dbtsai/spark that referenced this pull request Sep 27, 2017
## What changes were proposed in this pull request?

Fix finalizer checkstyle violation by just turning it off; re-disable checkstyle as it won't be run by SBT PR builder. See apache#18887 (comment)

## How was this patch tested?

`./dev/lint-java` runs successfully

Author: Sean Owen <sowen@cloudera.com>

Closes apache#19371 from srowen/HotfixFinalizerCheckstlye.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants