-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29779][CORE] Compact old event log files and cleanup #27085
Conversation
Please note that this is a part of #26416 - related review comments are addressed. cc. @vanzin @gaborgsomogyi @Ngone51 as reviewers of #26416. |
5fd8f27
to
704c6ca
Compare
Test build #116068 has finished for PR 27085 at commit
|
Test build #116078 has finished for PR 27085 at commit
|
Test build #116075 has finished for PR 27085 at commit
|
Hmm... it seems hard to filter events accurately, assuming arbitrary out of order for events. If there's no idea to make it accurate, we may want to choose the safer approach which may filter out less events on out of order, as filtered out events will be lost once we compact and there's no way to restore them back. It may leave some unmatched events which might have been filtered out if they are in order, but (SQL) AppStatusListener have been tolerating events out of order so it seems acceptable. |
Test build #116088 has finished for PR 27085 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still a lot of code, but more manageable.
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { | ||
val stageId = stageSubmitted.stageInfo.stageId | ||
if (_stageToRDDs.get(stageId).isEmpty) { | ||
// job start event is not received yet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you seen this situation? I was under the impression you'd never see a stage submission without the parent job starting first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I admit it's just guessing that events out of order can happen in arbitrary way, as previous review comment pointed out I should consider events out of order but I have no idea about these out of order. But it's also non-trivial to sort out which cases can happen from looking into AppStatusListener code.
Why not enumerating the cases we've observed in AppStatusListener as code comment? There's no value if only part of group knows about it in their memory. This looks to be a "truck number".
Btw, have we struggle to find out why these events can be out of order, and try to fix it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order of job/stage/task events is defined by the DAGScheduler. Adding comments in AppStatusListener
would just be trying to document the scheduler behavior, so if you want to track that, it would be better to document the scheduler itself. When you add other subsystems (like SQL), then you also add other things that may arrive out of order (see how the SQL listener needs to handle job / execution events).
On the scheduler side, I think most of the out of order issues are related to tasks. I'm somewhat confident that job and stage events generally arrive in order, but tasks can be very off. e.g. see this comment in handleBeginEvent
:
// Note that there is a chance that this task is launched after the stage is cancelled.
// In that case, we wouldn't have the stage anymore in stageIdToStage.
I guess my reply here applies more to my onTaskStart
comment below; here we can assume that the events are in order. But there, it would be better to ignore events if they arrive at the wrong time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK my bad. I was confused that there's a case the message bus doesn't guarantee event order (so I felt we have to rely on history), but looks like that's not the case. Then it would be enough if there's enough comment on DAGScheduler.
Thanks for the correction. I may need to spend my time to read through DAGScheduler, but collecting the information from your comment, looks like the main point is discarding the task if the stage is not active. I'll deal with it for now.
|
||
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { | ||
totalTasks += 1 | ||
val curTasks = _stageToTasks.getOrElseUpdate(taskStart.stageId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think this should not be getOrElseUpdate
. Add the empty set to the map in onStageSubmitted
, clean it up in onJobEnd
, and only update it here if it's still in the map.
Also, maybe something for the future: if a large stage is recomputed, you could potentially filter out a lot of data by filtering the failed attempt's tasks or a subset of them (e.g. only keep the failed ones).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here; this is assuming arbitrary out of order of events. If we are sure stage submission event must happen earlier than task start event, this code doesn't make sense. But I also would like to be sure we document these cases as a reference. It's not easy to understand the intention as I don't have background knowledge and can't find it easily.
case e: SparkListenerUnpersistRDD => | ||
liveRDDs.contains(e.rddId) | ||
case e: SparkListenerExecutorMetricsUpdate => | ||
e.accumUpdates.exists { case (_, stageId, _, _) => liveStages.contains(stageId) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also check the live executors? (Stage may still be running but the executor is gone, so maybe data about it is not interesting anymore?)
Or maybe that particular check should be in BasicEventFilter
... not sure, I'm slightly confused about the role of each class here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In AppStatusListener, SparkListenerExecutorMetricsUpdate updates metrics not only for executor but also for stage and task; so ideally the event may need to be accepted if either task or stage is live. (I'll add the check for liveTasks as well)
If checking executor is needed in any way, we may want to move this to BasicEventFilter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, ok, makes sense to try to keep the stage metrics accurate.
case e: SparkListenerExecutorRemoved => liveExecutors.contains(e.executorId) | ||
case e: SparkListenerExecutorBlacklisted => liveExecutors.contains(e.executorId) | ||
case e: SparkListenerExecutorUnblacklisted => liveExecutors.contains(e.executorId) | ||
case e: SparkListenerStageExecutorMetrics => liveExecutors.contains(e.execId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also SparkListenerBlockManagerAdded
/ SparkListenerBlockManagerRemoved
. That has an extra complication that the driver is a block manager and generates an add event, and that one should never be filtered out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention is not rejecting these kinds of events as any implementations of EventFilter don't know how to determine whether it should be accepted or not, but if we have some cases whether it should never be rejected, it would be better to explicitly accept these cases for safety.
Thanks for guiding on the case as driver being a block manager. Looking into the code of AppStatusListener, there seems to be no information to determine whether the block manager is driver or not; it seems safer to accept all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be accepting block manager events for executors that are dead. That basically means that even after you compact the log files, you'll get all the executors in the UI. It defeats the filtering of the executor added / removed events.
The driver is different because it does not generate an executor added event, just a block manager added event. So when filtering block manager events you need to check whether the block manager ID refers either to the driver or a live executor.
* heuristic; task events tend to take most size in event log. | ||
* | ||
* This class assumes caller will provide the sorted list of files which are sorted by the index of | ||
* event log file, with "at most" one compact file placed first if it exists. Caller should keep in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no quotes around "at most"
assertNone(acceptFn(SparkListenerLogStart("testVersion"))) | ||
} | ||
|
||
private def assertFilterJobEvents( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is called in a single place. Inline.
originalFiles: Seq[FileStatus], | ||
compactRet: (CompactionResult.Value, Option[Long]), | ||
expectedCompactRet: CompactionResult.Value): Unit = { | ||
assert(compactRet._1 === expectedCompactRet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation
|
||
writer.stop() | ||
|
||
val filters = Seq(new TestEventFilter1, new TestEventFilter2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These filter are only used here (even in your other PRs, AFAICT). It would be easier to follow if they were also defined here, since the comments above explaining the expectation reflect the behavior of these filters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good finding. I might have been used for other tests as well but removed afterwards. Will move.
linesLength += 1 | ||
assert(expectedLines.contains(line)) | ||
} | ||
assert(linesLength === expectedLines.length, "The number of lines for rewritten file " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assert already generates pretty much that same error message.
stageToTaskIds: Map[Int, Seq[Long]], | ||
stageToRddIds: Map[Int, Seq[Int]]) | ||
|
||
def pushJobEventsWithoutJobEnd( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this is only used in a later PR, might be good to add it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK will remove it here and restore in later PR.
Also I'd reword the PR title a little, especially in the subsequent PRs. |
Thanks for the detailed reviews, as always! I've addressed review comments, or left comments. Please take a look again. Thanks in advance! |
Regarding the title, it seems OK to drop "part" as follow-up PRs could have another meaningful title; this PR seems to be better to be original title, although this PR doesn't integrate the new feature into SHS. Btw, would it be better to file a new issue for each follow-up PR, or OK to put all of them into SPARK-29779? |
Test build #116126 has finished for PR 27085 at commit
|
Test failure on build 116126 is below:
Doesn't look to be related, though I'm seeing this failure at the first time. |
retest this, please |
Test build #116132 has finished for PR 27085 at commit
|
retest this, please |
Test build #116148 has finished for PR 27085 at commit
|
Test failure on build 116148 is below:
Also doesn't look to be related. |
retest this, please |
Test build #116159 has finished for PR 27085 at commit
|
Test failure on build 116159 is below:
There's a PR #27010 to address above two test failures. For the last test failure, we may have to file a new one if there's no JIRA issue for this. |
retest this, please |
Test build #116171 has finished for PR 27085 at commit
|
Test build #116211 has finished for PR 27085 at commit
|
I'd create new tasks for those. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor things but I think this is looking ok now.
} | ||
|
||
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { | ||
_stageToTasks.get(taskStart.stageId) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use foreach
instead of match
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { | ||
_stageToTasks.get(taskStart.stageId) match { | ||
case Some(tasks) => | ||
totalTasks += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't you increment totalTasks always? Since this task event will be filtered (and thus it counts towards the statistics about how much compaction will save).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. My bad. I felt we want to ignore the event but yes it should be counted in any way.
} | ||
|
||
/** | ||
* This class rewrites the event log files into one compact file: the compact file will only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/class/method
* @param compactIndex The index of compact file if the compaction is successful. | ||
* Otherwise it will be None. | ||
*/ | ||
case class CompactionResult(code: CompactionResultCode.Value, compactIndex: Option[Long]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can probably be private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used as the return type of compact
, hence have to follow the scope of the method - it's public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we marked the scope of EventFilter as private[spark]
, I'll change EventLogFileCompactor to private[spark]
and change CompactionResult/CompactionResultCode to private[spark]
as well.
Maybe we could make it tighter (private[history]
or private[deploy]
) - please let me know if it's preferred. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I guess this package is not considered a public API (as nothing under deploy
is), so in those cases I've been leaning towards just leaving the class as public. private
makes sense if the class doesn't exit the scope of the package, which is not the case here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK will leave them as public. Thanks.
*/ | ||
case class CompactionResult(code: CompactionResultCode.Value, compactIndex: Option[Long]) | ||
|
||
object CompactionResultCode extends Enumeration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can probably be private
// filterApplicationEnd: Some(true) & Some(true) => filter in | ||
expectedLines += writeEventToWriter(writer, SparkListenerApplicationEnd(0)) | ||
|
||
// filterBlockManagerAdded: Some(true) & Some(false) => filter out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OOC, can you give an example of this behavior in a real log file?
It seems a bit weird to filter out an event when at least one filter has decided the event should be kept (maybe because it's needed for something?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... I agree that's somewhat unintuitive and not working same as another things of chain of filters we've seen.
It's due to the difference of acceptance of jobs between event filter in CORE and event filter in SQL. In SQL event filter, we don't drop finished job if the job relates to the SQL execution and the SQL execution is live.
In BasicEventFilter there's no way to indicate SQL execution event so it rejects all finished jobs, but SQLEventFilter will work a bit differently as I described, so there will be some jobs which BasicEventFilter decides to reject whereas SQLEventFilter decides to accept - we should accept these jobs in this case.
That was the main reason I had to introduce None along with true / false, so that the event filter can say "I don't know about the event and don't care. I'll defer to other event filter.".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here will be some jobs which BasicEventFilter decides to reject whereas SQLEventFilter decides to accept - we should accept these jobs in this case
So in that case the basic filter will say Some(false)
, and SQL will say Some(true)
, and the event should be accepted, right? Isn't that the opposite what this comment is saying?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wait you're right. Sorry for the confusion. Looks like something was messed up when introducing None. Let me fix it immediately.
Test build #116328 has finished for PR 27085 at commit
|
Test build #116330 has finished for PR 27085 at commit
|
I don't think I have more comments; will leave this here a little bit to see if anyone shows up. Only thing that crossed my mind is that maybe it might be useful to separate compaction from cleaning; you can have compaction to speed up generation of the history UI, but keep all the log files if you need to debug something in detail. |
Thanks for the reviews, as always. Given we decide to break down PR into multiple incremental PRs for faster reviewing, I'd also like to kindly ask about classifying items into "mandatory" and "good to have", and deal with "mandatory" items with fast iterations. The main goal for me is getting all of parts be merged within feature freeze date. I guess tentative date is set to Jan 31, but there's lunar new year in later this month which is one of most big holidays in S.Korea (China as well), so may need to exclude around a week (I'll try to stick with this though), and then there's not much time. Hopefully there're relatively smaller PRs left for reviewing, but integration part may also need to have enough time to review as the feature starts to make effect after that. Details/improvements could be addressed afterwards once the feature is in, and then the feature is no longer be tied with only me, anyone should be able to deal with these items. |
Enumerated items in JIRA issue which were mentioned but didn't addressed as they weren't considered as mandatory. We can add items whenever we find more, and create sub-task if we feel something as mandatory, or someone wants to collaborate. |
Ok, merging this one to master. On to the next. |
Thanks for reviewing and merging! I'll rebase the next PR and ping you. |
### What changes were proposed in this pull request? This patch addresses adding event filter to handle SQL related events. This patch is next task of SPARK-29779 (#27085), please refer the description of PR #27085 to see overall rationalization of this patch. Below functionalities will be addressed in later parts: * integrate compaction into FsHistoryProvider * documentation about new configuration ### Why are the changes needed? One of major goal of SPARK-28594 is to prevent the event logs to become too huge, and SPARK-29779 achieves the goal. We've got another approach in prior, but the old approach required models in both KVStore and live entities to guarantee compatibility, while they're not designed to do so. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UTs. Closes #27164 from HeartSaVioR/SPARK-30479. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…Server ### What changes were proposed in this pull request? This patch addresses remaining functionality on event log compaction: integrate compaction into FsHistoryProvider. This patch is next task of SPARK-30479 (#27164), please refer the description of PR #27085 to see overall rationalization of this patch. ### Why are the changes needed? One of major goal of SPARK-28594 is to prevent the event logs to become too huge, and SPARK-29779 achieves the goal. We've got another approach in prior, but the old approach required models in both KVStore and live entities to guarantee compatibility, while they're not designed to do so. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UT. Closes #27208 from HeartSaVioR/SPARK-30481. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@apache.org>
@@ -0,0 +1 @@ | |||
org.apache.spark.deploy.history.BasicEventFilterBuilder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EventFilterBuilder
is private in Spark. Do you mind if I ask why we use service loader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core module cannot refer SQL/core module which also has EventFilterBuilder. So that should be the way to discover and load implementations of EventFilterBuilder"s".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I think that's possible via simply using reflection which I think is easier to read the codes. I think we're already doing this in few places such as FileCommitProtocol.instantiate
Seems a bit odds to use service loader for internal classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Lines 1226 to 1228 in a5c7090
private def loadPlugins(): Iterable[AppHistoryServerPlugin] = { | |
ServiceLoader.load(classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala | |
} |
That is the way how FsHistoryProvider already deals with loading class in SQL core module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, thanks. at least it's consistent.
|
||
private[spark] val EVENT_LOG_COMPACTION_SCORE_THRESHOLD = | ||
ConfigBuilder("spark.eventLog.rolling.compaction.score.threshold") | ||
.internal() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have added some docs here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was to ease of test for compaction but after reviewing it's no longer need to be a configuration.
I'll just remove this and convert it to be constant unless we figure out needs to adjust manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized there's test to modify the value to ensure compaction doesn't skip with low value. I'll retain the configuration and add some doc.
"the overall size of event log files.") | ||
.intConf | ||
.checkValue(_ > 0, "Max event log files to retain should be higher than 0.") | ||
.createWithDefault(Integer.MAX_VALUE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why didn't we make it optional, or defaults to -1 to express "all event log files will be retained"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this default value "naturally" fits the semantic of the configuration even we don't indicate Integer.MAX_VALUE as special value. No need to care the configuration specially. This approach seems to be already used for couple of configurations.
…ts on post-hoc review ### What changes were proposed in this pull request? This PR reflects review comments on post-hoc review among PRs for SPARK-29779 (#27085), SPARK-30479 (#27164). The list of review comments this PR addresses are below: * #27085 (comment) * #27164 (comment) * #27164 (comment) * #27164 (comment) I also applied review comments to the CORE module (BasicEventFilterBuilder.scala) as well, as the review comments for SQL/core module (SQLEventFilterBuilder.scala) can be applied there as well. ### Why are the changes needed? There're post-hoc reviews on PRs for such issues, like links in above section. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing UTs. Closes #27414 from HeartSaVioR/SPARK-28869-SPARK-29779-SPARK-30479-FOLLOWUP-posthoc-reviews. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
This patch proposes to compact old event log files when end users enable rolling event log, and clean up these files after compaction.
Here the "compaction" really mean is filtering out listener events for finished/removed things - like jobs which take most of space for event log file except SQL related events. To achieve this, compactor does two phases reading: 1) tracking the live jobs (and more to add) 2) filtering events via leveraging the information about live things and rewriting to the "compacted" file.
This approach retains the ability of compatibility on event log file and adds the possibility of reducing the overall size of event logs. There's a downside here as well: executor metrics for tasks would be inaccurate, as compactor will filter out the task events which job is finished, but I don't feel it as a blocker.
Please note that SPARK-29779 leaves below functionalities for future JIRA issue as the patch for SPARK-29779 is too huge and we decided to break down:
Why are the changes needed?
One of major goal of SPARK-28594 is to prevent the event logs to become too huge, and SPARK-29779 achieves the goal. We've got another approach in prior, but the old approach required models in both KVStore and live entities to guarantee compatibility, while they're not designed to do so.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added UTs.