-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30481][CORE] Integrate event log compactor into Spark History Server #27208
Conversation
19ad5be
to
85b3bba
Compare
Only the last commit belongs to this PR. Will remove WIP label once #27164 is merged. |
Test build #116745 has finished for PR 27208 at commit
|
Test build #116746 has finished for PR 27208 at commit
|
Test build #116748 has finished for PR 27208 at commit
|
85b3bba
to
05e5074
Compare
try { | ||
val info = listing.read(classOf[LogInfo], reader.rootPath.toString) | ||
if (info.lastIndexToRunCompaction.isEmpty || | ||
info.lastIndexToRunCompaction.get < lastIndex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent more
* Returns a tuple containing two values. Each element means: | ||
* - 1st (Boolean): true if the list of event log files are changed, false otherwise. | ||
* - 2nd (Option[Long]): Some(value) if the method succeeds to try compaction, | ||
* value indicates the last event log index to try compaction. None otherwise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it mean to "try compaction"? Does it mean that when this method returns, no compaction was actually done, it was just tried?
The tuple being returned sounds a bit confusing. instead, why not return just an Option[Long]
telling you both whether compaction ran, and what's the index of the first non-compacted file (or last compacted file, not sure what's being tracked really)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uh, I tried to differentiate "compacted log index" and "the log index Spark tries to compact", but the words seem to be not sufficient or appropriate. (I admit it sounds bad naming and maybe also bad explanation but cannot find any better.) It refers latter.
So the reason why we store the log index into LogInfo is to avoid calling compact
if possible since it's a heavy operation. How?
Given we know how compaction works (especially it excludes the log file of the last index since it may be changing), the result of compaction is idempotent if we provide the same list of event log files.
In other words, once we tried out for certain set of event log files, we don't need to try out again. For example, assuming there're 2.compact, 3, 4 in list of event log files. If we tried out compaction with the list once, regardless of the result (succeed, low score, not enough files), we don't need to try it again, unless we see 5 in the list of event log files.
In fact it's a bit simplified and there're some exceptional cases, like exception happens when compacting, or configurations changed during restart of SHS. Former case is simple, we will fail to store the index into LogInfo anyway, so it should try again in next chance of checking logs. Latter case actually prevents us to leverage the fact, but I'd ignore it as trade-off to gain performance. If we address caching of state in compactor or filter then it may not a big deal to just call compact, but until then I guess we need this.
try { | ||
val (shouldReload, lastCompactionIndex) = compact(reader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there anyway to do compaction as a separate task? Otherwise it seems like this could slow down the creation of the app listing, especially if you're not using disk caching or it's the first time the SHS is creating the listing db.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I totally agree it's ideal to do so, but it made me thinking the problem too complicated because compaction "modifies" the event log files which "app listing" and "app rebuild" are reading or are memorizing them as a list of files.
Actually I had to deal with similar thing in loadDiskStore
and createInMemoryStore
(there's retry mechanism), but I'm not sure it doesn't get more complicated if we also take app listing into account.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that instead of doing this inline here, you could just submit a task to replayExecutor
after the listing data is updated, so that you give other tasks fetching listing data a chance to run before you try compaction. (You'd call endProcessing()
at the end of that separate task.)
Any reason why that would not work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess what you suggest is separating two tasks and applying lock (processing) for each task, especially let all listing tasks submitted first and all compaction tasks submitted later.
That would work, but we may also want to consider the difference between cleaning logs and compaction - cleaning logs have its own interval and triggered independently, but compaction is triggered conditionally, only when the return value of shouldReload is true. That means we may want to always do compaction for compaction task, irrelevant of status of processing
- we wouldn't want to skip, but it's not ideal if we make the task be stuck and wait for processing
. Maybe has to resubmit at the end of task if the task cannot process due to processing
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed with my own idea in ec3ffd9 - please take a look and suggest other idea if you have better idea.
@@ -197,8 +197,6 @@ package object config { | |||
|
|||
private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN = | |||
ConfigBuilder("spark.eventLog.rolling.maxFilesToRetain") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... this is kind of a history server config, right? Or even more specifically, a FsHistoryProvider
config.
Might be better to have it grouped with other configs in History.scala
, and make it a constructor argument in EventLogFileCompactor
.
(In fact the same applies to EVENT_LOG_COMPACTION_SCORE_THRESHOLD
. Should that be not internal()
anymore, also?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... this is kind of a history server config, right? Or even more specifically, a FsHistoryProvider config.
Might be better to have it grouped with other configs in History.scala, and make it a constructor argument in EventLogFileCompactor.
Yeah... the name and place was constructed when I wasn't fully sure where to put, so I just put near neighborhood configuration. But I think it makes sense to move to History, and also add history
prefix into configuration name to indicate end users that it won't run at driver side. Making it as a part of constructor argument would be also good.
(In fact the same applies to EVENT_LOG_COMPACTION_SCORE_THRESHOLD. Should that be not internal() anymore, also?)
I'm not 100 % sure who would tune the score manually. Original intention was to make it flexible to ease testing so placed it to configuration and set it to be "internal". If we want to expose this configuration, we should maybe also provide the information on score and the statistics so that end users can see and tune the threshold. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to keep the score threshold internal. We can make it public later if there's a good reason for it.
updateAndCheck(provider) { _ => | ||
verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2)) | ||
val info = provider.listing.read(classOf[LogInfo], writer.logPath) | ||
assert(info.lastIndexToRunCompaction === Some(2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused by this check. 2
was not compacted, right? Only 1
was. So why is lastIndexToRunCompaction
2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refer the comment #27208 (comment) to see what it means. It's intentional to be 2 as the last index to execute compaction is 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From that comment it seems that what you're saying is that, in this case, the log with index "2" isn't yet compacted, but it's been processed and found that it's not yet worth it to compact it. Is that right?
I think the variable name is confusing because it mixes "last" (which to me evokes something that has happened already) and "to run" (which evokes a future action). So I can't really parse the meaning of the name since it seems to contradict itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If my understanding is correct then perhaps a better name is lastEvaluatedForCompaction
(didn't add "index" because the name is already pretty long as it is).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From that comment it seems that what you're saying is that, in this case, the log with index "2" isn't yet compacted, but it's been processed and found that it's not yet worth it to compact it. Is that right?
Yes, correct, it applies regardless of the result of compaction.
If my understanding is correct then perhaps a better name is lastEvaluatedForCompaction (didn't add "index" because the name is already pretty long as it is).
That's a better name. Thanks! Will address.
docs/configuration.md
Outdated
<td>Int.MaxValue</td> | ||
<td> | ||
The maximum number of event log files which will be retained as non-compacted. | ||
By default, all event log files will be retained. Please set the configuration and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to clarify where to configure things, since this config is for the history server, but spark.eventLog.rolling.maxFileSize
needs to be set in the applications.
Test build #116798 has finished for PR 27208 at commit
|
Test build #116804 has finished for PR 27208 at commit
|
// to take any more submissions at this time | ||
case e: Exception => | ||
logError(s"Exception while submitting task for compaction", e) | ||
endProcessing(rootPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think checkLog seems to have a bug - if replayExecutor.submit fails and throws an exception, the log is not removed from processing
which would block processing for the log in both checkLogs
and cleanLogs
.
I'm OK to fix it here as well, or fix it in another PR to avoid mixing up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if replayExecutor.submit fails
That should be extremely rare, but seems simple to handle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes that's being handled as I extracted method and addressed there.
Test build #117004 has finished for PR 27208 at commit
|
Test build #117031 has finished for PR 27208 at commit
|
try { | ||
val info = listing.read(classOf[LogInfo], reader.rootPath.toString) | ||
if (info.lastEvaluatedForCompaction.isEmpty || | ||
info.lastEvaluatedForCompaction.get < lastIndex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea why it's back to previous. Will fix.
docs/configuration.md
Outdated
<td><code>spark.history.fs.eventLog.rolling.maxFilesToRetain</code></td> | ||
<td>Int.MaxValue</td> | ||
<td> | ||
The maximum number of event log files which will be retained as non-compacted. By default, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark currently doesn't have a "release notes" document in the docs dir, but I'm wondering if we should mention somewhere that this is a new feature and may not be completely stable (i.e. use with caution, it may delete more data than you expect, cause some UI issues we haven't thought about, etc). There are migration guides, but that doesn't seem like the right place...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are using JIRA issue to write content for release note if it's necessary. Labeled as "release-notes" and content in "Docs text".
https://issues.apache.org/jira/browse/SPARK-25016
I'd be pretty much appreciated if you can write the release note content for this issue, as JIRA issue is not good to review and make change continuously.
if (info.lastEvaluatedForCompaction.isEmpty || | ||
info.lastEvaluatedForCompaction.get < lastIndex) { | ||
// haven't tried compaction for this index, do compaction | ||
fileCompactor.compact(reader.listEventLogFiles) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So one thing that feels a tiny bit odd is that when deciding whether to compact, you're actually considering the last log file, which you won't consider during actual compaction, right?
Wouldn't that cause unnecessary (or too aggressive) compaction at the end of the application, when potentially a bunch of jobs finish and "release" lots of tasks, inflating the compation scoe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So one thing that feels a tiny bit odd is that when deciding whether to compact, you're actually considering the last log file, which you won't consider during actual compaction, right?
Wouldn't that cause unnecessary (or too aggressive) compaction at the end of the application, when potentially a bunch of jobs finish and "release" lots of tasks, inflating the compation scoe?
That's the intention that callers of compactor don't care about how many files are actually affected. Callers of compactor just need to know that same list of log files would bring same result, unless it fails and throws exception. How many files are excluded in compaction is just a configuration, and the last log file should be excluded is an implementation detail. (We prevent it in both configuration and compactor via having 1 as min value for max retain log file.)
Compactor will ignore the last log file in any way as configured, so unless the rare case where the log is rolled just before the app is finished, it won't happen. And most probably end users would avoid to set the value to 1 if they read the doc and understand how it works.
Test build #117259 has finished for PR 27208 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of small things. Regarding the release notes, perhaps it would be better to have more explicit documentation about this feature in monitoring.md
? A separate section in that document would be easier to follow than a lot of text in the config parameters table.
(Feel free to do that as a separate task.)
@@ -1175,6 +1237,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | |||
} | |||
deleted | |||
} | |||
|
|||
/** NOTE: 'task' should ensure it executes 'endProcessing' at the end */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, perhaps this method could take care of calling endProcessing
too (e.g. by wrapping the task)? Should be just a small adjustment at the call site.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finally
statement in mergeApplicationListing
makes it be complicated, because we should handle the reentrance of lock. (We're submitting another task in the task.)
If we move endProcessing from finally
statement in mergeApplicationListing
to the end of task here, processing
on the compaction task would be executed earlier than calling endProcessing
on the listing task. Marking lock from compaction task would succeed but effectively no-op, and releasing lock from listing task would remove the mark for compaction task as well, which makes the compaction task run without proper lock.
So either we need to make lock much smarter, or document the requirement on caller side. I'm feeling that former one is more complicated than latter one.
docs/configuration.md
Outdated
@@ -1023,6 +1023,26 @@ Apart from these, the following properties are also available, and may be useful | |||
The max size of event log file before it's rolled over. | |||
</td> | |||
</tr> | |||
<tr> | |||
<td><code>spark.history.fs.eventLog.rolling.maxFilesToRetain</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually should be with the other SHS configs in monitoring.md
.
Yeah I agree that adding all the explanation about the feature into the description of one configuration isn't ideal. As we may have couple of months (at least a month) to address docs/minors after feature freeze, I'd like to deal with it via separate PR. Thanks for the suggestion! |
Test build #117419 has finished for PR 27208 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small comment nit, otherwise looks ok.
FsHistoryProvider.scala
is getting pretty large and hard to follow, at some point it might be good to break it down and move things like UI replay logic, cleaning and compaction out of it...
assert(appInfo.id === "app") | ||
assert(appInfo.name === "app") | ||
|
||
// all events in retained file should be available, even they're related to finished jobs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all execs? (also, should be "all live execs"?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uh, even the latter verification code technically checks the events for executors (as it's relatively easier to check with AppStatusStore), the comment is still correct - we don't filter out any events in "retained" file(s). I'm refining the comment a bit to clarify these events will include removed executors as well.
Totally agreed. It would be also good for FsHistoryProviderSuite to follow the change of FsHistoryProvider which may be the chance to reduce down its size as well. |
Test build #117460 has finished for PR 27208 at commit
|
Alright, merging to master. |
Finally! Thanks again @vanzin for shepherding the feature and reviewing thoughtfully. BTW I might start to handle the follow-up TODO in couple of days later (maybe next week?) - stuck with some work. |
…section of monitoring.md ### What changes were proposed in this pull request? This is a FOLLOW-UP PR for review comment on #27208 : #27208 (review) This PR documents a new feature `Eventlog Compaction` into the new section of `monitoring.md`, as it only has one configuration on the SHS side and it's hard to explain everything on the description on the single configuration. ### Why are the changes needed? Event log compaction lacks the documentation for what it is and how it helps. This PR will explain it. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Built docs via jekyll. > change on the new section <img width="951" alt="Screen Shot 2020-02-16 at 2 23 18 PM" src="https://user-images.githubusercontent.com/1317309/74599587-eb9efa80-50c7-11ea-942c-f7744268e40b.png"> > change on the table <img width="1126" alt="Screen Shot 2020-01-30 at 5 08 12 PM" src="https://user-images.githubusercontent.com/1317309/73431190-2e9c6680-4383-11ea-8ce0-815f10917ddd.png"> Closes #27398 from HeartSaVioR/SPARK-30481-FOLLOWUP-document-new-feature. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…section of monitoring.md ### What changes were proposed in this pull request? This is a FOLLOW-UP PR for review comment on #27208 : #27208 (review) This PR documents a new feature `Eventlog Compaction` into the new section of `monitoring.md`, as it only has one configuration on the SHS side and it's hard to explain everything on the description on the single configuration. ### Why are the changes needed? Event log compaction lacks the documentation for what it is and how it helps. This PR will explain it. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Built docs via jekyll. > change on the new section <img width="951" alt="Screen Shot 2020-02-16 at 2 23 18 PM" src="https://user-images.githubusercontent.com/1317309/74599587-eb9efa80-50c7-11ea-942c-f7744268e40b.png"> > change on the table <img width="1126" alt="Screen Shot 2020-01-30 at 5 08 12 PM" src="https://user-images.githubusercontent.com/1317309/73431190-2e9c6680-4383-11ea-8ce0-815f10917ddd.png"> Closes #27398 from HeartSaVioR/SPARK-30481-FOLLOWUP-document-new-feature. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 02f8165) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…section of monitoring.md ### What changes were proposed in this pull request? This is a FOLLOW-UP PR for review comment on apache#27208 : apache#27208 (review) This PR documents a new feature `Eventlog Compaction` into the new section of `monitoring.md`, as it only has one configuration on the SHS side and it's hard to explain everything on the description on the single configuration. ### Why are the changes needed? Event log compaction lacks the documentation for what it is and how it helps. This PR will explain it. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Built docs via jekyll. > change on the new section <img width="951" alt="Screen Shot 2020-02-16 at 2 23 18 PM" src="https://user-images.githubusercontent.com/1317309/74599587-eb9efa80-50c7-11ea-942c-f7744268e40b.png"> > change on the table <img width="1126" alt="Screen Shot 2020-01-30 at 5 08 12 PM" src="https://user-images.githubusercontent.com/1317309/73431190-2e9c6680-4383-11ea-8ce0-815f10917ddd.png"> Closes apache#27398 from HeartSaVioR/SPARK-30481-FOLLOWUP-document-new-feature. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
This patch addresses remaining functionality on event log compaction: integrate compaction into FsHistoryProvider.
This patch is next task of SPARK-30479 (#27164), please refer the description of PR #27085 to see overall rationalization of this patch.
Why are the changes needed?
One of major goal of SPARK-28594 is to prevent the event logs to become too huge, and SPARK-29779 achieves the goal. We've got another approach in prior, but the old approach required models in both KVStore and live entities to guarantee compatibility, while they're not designed to do so.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added UT.