-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-42204][CORE] Add option to disable redundant logging of TaskMetrics internal accumulators in event logs #39763
Conversation
This is marked as "WIP" because I'm still deciding whether this merits a note in a migration guide / release note. It's possible that I'm being overly cautious regarding backwards-compatibility for theoretical non-Spark consumers of these event logs, but I'm trying to avoid a situation where an impacted user has no straightforward short-term mitigation. |
* We use this instead of passing SparkConf directly because it lets us avoid | ||
* repeated re-parsing of configuration values on each read. | ||
*/ | ||
private[spark] class JsonProtocolOptions(conf: SparkConf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern is a bit weird, so I'd like to explain it in greater detail here:
To date, JsonProtocol has just been a big object with no configuration options of its own. All other event logging configurations take effect at higher levels, such as EventLoggingListener. We cannot implement this particular change at those levels, though: we shouldn't mutate the mutable TaskInfo or StageInfo objects referenced by the listener events and adding copy()
methods to them would be expensive.
JsonProtocol is currently implemented as a global object, so there's no straightforward way to make it easily configurable and testable. If I did something like "get the active SparkContext's configuration and read the configuration value" on each call then this would destroy performance because configuration reading is not cheap.
One approach would be to refactor this so that JsonProtocol is no longer a singleton: we could give it a proper constructor which accepts a SparkConf. That's a much larger refactoring than I want to undertake right now, though, and it would involve significant updates in a bunch of test code.
The approach taken here is to thread through an JsonProtocolOptions
class which holds the configuration values. Right now it only holds a single value, but this is done as future-proofing in case we add additional configurability. The lone caller in EventLoggingListener constructs the JsonProtocolOptions
using SparkConf, so the configuration is only read from the conf once (avoiding conf reader performance overheads).
This is a more C-style approach than the usual OOP approach that we generally take, but I think it's okay in this limited internal context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
val stageInfoWithoutTaskMetricsAccums = makeStageInfo(100, 200, 300, 400L, 500L) | ||
stageInfoWithoutTaskMetricsAccums.accumulables.clear() | ||
|
||
// Test events which should be impacted by the config. Due to SPARK-42205 we need to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See https://issues.apache.org/jira/browse/SPARK-42205 for more details. Once that issue is fixed, we can drop the testcases for TaskStart
, JobStart
, and StageSubmitted
here.
@@ -60,13 +71,19 @@ private[spark] object JsonProtocol { | |||
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) | |||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) | |||
|
|||
val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false)) | |||
|
|||
/** ------------------------------------------------- * | |||
* JSON serialization methods for SparkListenerEvents | | |||
* -------------------------------------------------- */ | |||
|
|||
def sparkEventToJsonString(event: SparkListenerEvent): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method using default configurations is retained for the purposes of avoiding changes in a bunch of test code across several suites.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just couple of minor comments, looks good to me.
Thanks for fixing this @JoshRosen !
@@ -60,13 +71,19 @@ private[spark] object JsonProtocol { | |||
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) | |||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) | |||
|
|||
val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false)) | |||
|
|||
/** ------------------------------------------------- * | |||
* JSON serialization methods for SparkListenerEvents | | |||
* -------------------------------------------------- */ | |||
|
|||
def sparkEventToJsonString(event: SparkListenerEvent): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a note for this sparkEventToJsonString
that it is only for tests ?
I have seen usages of this unfortunately - so having the flag would help. |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
…redundant-logging-of-taskmetrics-internal-accumulators-in-jsonprotocol
We should remove the |
I will; since it's been a while since I looked at this code, I want to do another self-review and another round of manual testing before I merge. I re-opened the PR to fix merge conflicts and let CI run the tests. I'm considering whether we might want to change this default for Spark 4.0; if so, I would propose to do that in a separate followup PR to also update docs + migration guide. |
This looks good to me as well (non-binding). Thanks a lot for reviving this improvement! The |
I pushed an empty commit to retest this. The only failing test is Since I've been so delayed in actually merging this, I took another look to see if any intervening changes might have introduced logical conflicts but I didn't spot any potentially problematic changes. Especially given that this is flagged-off-by-default opt-in functionality, I feel that this is a low-risk change. Therefore I'm going to merge this now (finally!). In separate followups I may end up building on the configuration-plumbing infrastructure introduced here to add other opt-in size reduction optimizations. |
…trics internal accumulators in event logs ### What changes were proposed in this pull request? This PR adds an off-by-default option to JsonProtocol to have it exclude certain redundant accumulator information from Spark event logs in order to save space and processing time. Several event logs types contain both TaskMetrics and Accumulables, but there is redundancy in how the TaskMetrics data is stored: - TaskMetrics are stored in a map called "Task Metrics" which maps from metric names to metric values. - An "Accumulables" field contains information on accumulator updates from the task, but this field includes updates from the TaskMetrics internal accumulators (both the value from the task, plus a running "sum-so-far" from all of the tasks completed up to that point). The redundant task metrics accumulables are not actually used by the Spark History Server: I verified this by reading AppStatusListener and SQLAppStatusListener. I believe that this redundancy was introduced back in [SPARK-10620](https://issues.apache.org/jira/browse/SPARK-10620) when Spark 1.x's separate TaskMetrics implementation was replaced by the current accumulator-based version. In this PR, I add logic to exclude TaskMetrics internal accumulators when writing this field (if a new flag is enabled). The new `spark.eventLog.includeTaskMetricsAccumulators` configuration (default `false`, meaning "keep the redundant information") can be set to `true` to exclude these redundant internal accumulator updates. For now, I am merging this off-by-default, but in a followup PR for Spark 4.0.0 we might consider a change-of-default to `true` (in which case the flag would serve as an "escape-hatch" for users who want to restore the old behavior. Although I think it's somewhat unlikely that third-party non-Spark consumers of the event logs would be relying on this redundant information, this is changing a longstanding user-facing data format and thus needs a flag. ### Why are the changes needed? This change reduces the size of Spark event logs, especially for logs from applications that run many tasks. It should also have slight benefits on event log read and write speed (although I haven't tried to quantify this). ### Does this PR introduce _any_ user-facing change? No user-facing changes in Spark History Server. This flag's effects could be considered a user-facing change from the perspective of third-party code which does its own direct processing of Spark event logs, hence the config. However, in this PR (by itself) the flag is off-by-default. Out-of-the-box user-facing changes will be discussed / proposed in a separate flag-flip PR. ### How was this patch tested? New unit tests in `JsonProtocolSuite`. Manual tests of event log size in `spark-shell` with a job that runs `spark.parallelize(1 to 1000, 1000).count()`. For this toy query, this PR's change shrunk the uncompressed event log size by ~15%. The relative size reduction will be even greater once other issues like https://issues.apache.org/jira/browse/SPARK-42206 or https://issues.apache.org/jira/browse/SPARK-42203 are fixed. The relative reduction will be smaller for tasks with many SQL metrics because those accumulables cannot be excluded. Closes apache#39763 from JoshRosen/SPARK-42204-remove-redundant-logging-of-taskmetrics-internal-accumulators-in-jsonprotocol. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com>
…trics internal accumulators in event logs ### What changes were proposed in this pull request? This PR adds an off-by-default option to JsonProtocol to have it exclude certain redundant accumulator information from Spark event logs in order to save space and processing time. Several event logs types contain both TaskMetrics and Accumulables, but there is redundancy in how the TaskMetrics data is stored: - TaskMetrics are stored in a map called "Task Metrics" which maps from metric names to metric values. - An "Accumulables" field contains information on accumulator updates from the task, but this field includes updates from the TaskMetrics internal accumulators (both the value from the task, plus a running "sum-so-far" from all of the tasks completed up to that point). The redundant task metrics accumulables are not actually used by the Spark History Server: I verified this by reading AppStatusListener and SQLAppStatusListener. I believe that this redundancy was introduced back in [SPARK-10620](https://issues.apache.org/jira/browse/SPARK-10620) when Spark 1.x's separate TaskMetrics implementation was replaced by the current accumulator-based version. In this PR, I add logic to exclude TaskMetrics internal accumulators when writing this field (if a new flag is enabled). The new `spark.eventLog.includeTaskMetricsAccumulators` configuration (default `false`, meaning "keep the redundant information") can be set to `true` to exclude these redundant internal accumulator updates. For now, I am merging this off-by-default, but in a followup PR for Spark 4.0.0 we might consider a change-of-default to `true` (in which case the flag would serve as an "escape-hatch" for users who want to restore the old behavior. Although I think it's somewhat unlikely that third-party non-Spark consumers of the event logs would be relying on this redundant information, this is changing a longstanding user-facing data format and thus needs a flag. ### Why are the changes needed? This change reduces the size of Spark event logs, especially for logs from applications that run many tasks. It should also have slight benefits on event log read and write speed (although I haven't tried to quantify this). ### Does this PR introduce _any_ user-facing change? No user-facing changes in Spark History Server. This flag's effects could be considered a user-facing change from the perspective of third-party code which does its own direct processing of Spark event logs, hence the config. However, in this PR (by itself) the flag is off-by-default. Out-of-the-box user-facing changes will be discussed / proposed in a separate flag-flip PR. ### How was this patch tested? New unit tests in `JsonProtocolSuite`. Manual tests of event log size in `spark-shell` with a job that runs `spark.parallelize(1 to 1000, 1000).count()`. For this toy query, this PR's change shrunk the uncompressed event log size by ~15%. The relative size reduction will be even greater once other issues like https://issues.apache.org/jira/browse/SPARK-42206 or https://issues.apache.org/jira/browse/SPARK-42203 are fixed. The relative reduction will be smaller for tasks with many SQL metrics because those accumulables cannot be excluded. Closes apache#39763 from JoshRosen/SPARK-42204-remove-redundant-logging-of-taskmetrics-internal-accumulators-in-jsonprotocol. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com>
…metrics accumulator logging flag from SPARK-42204 ### What changes were proposed in this pull request? This PR corrects an unintentional default behavior change from #39763 That PR introduced a new configuration, `spark.eventLog.includeTaskMetricsAccumulators`, to provide an ability for users to disable the redundant logging of task metrics information via the Accumulables field in the Spark event log task end logs. I made a mistake in updating that PR description and code from the original version: the description says that the intent is to not change out of the box behavior, but the actual flag default was the opposite. This new PR corrects both the flag default and the flag description to reflect the original intent of not changing default behavior. ### Why are the changes needed? Roll back an unintentional behavior change. ### Does this PR introduce _any_ user-facing change? Yes, it rolls back an unintentional default behavior change. ### How was this patch tested? Existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48372 from JoshRosen/fix-event-log-accumulable-defaults. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…trics internal accumulators in event logs ### What changes were proposed in this pull request? This PR adds an off-by-default option to JsonProtocol to have it exclude certain redundant accumulator information from Spark event logs in order to save space and processing time. Several event logs types contain both TaskMetrics and Accumulables, but there is redundancy in how the TaskMetrics data is stored: - TaskMetrics are stored in a map called "Task Metrics" which maps from metric names to metric values. - An "Accumulables" field contains information on accumulator updates from the task, but this field includes updates from the TaskMetrics internal accumulators (both the value from the task, plus a running "sum-so-far" from all of the tasks completed up to that point). The redundant task metrics accumulables are not actually used by the Spark History Server: I verified this by reading AppStatusListener and SQLAppStatusListener. I believe that this redundancy was introduced back in [SPARK-10620](https://issues.apache.org/jira/browse/SPARK-10620) when Spark 1.x's separate TaskMetrics implementation was replaced by the current accumulator-based version. In this PR, I add logic to exclude TaskMetrics internal accumulators when writing this field (if a new flag is enabled). The new `spark.eventLog.includeTaskMetricsAccumulators` configuration (default `false`, meaning "keep the redundant information") can be set to `true` to exclude these redundant internal accumulator updates. For now, I am merging this off-by-default, but in a followup PR for Spark 4.0.0 we might consider a change-of-default to `true` (in which case the flag would serve as an "escape-hatch" for users who want to restore the old behavior. Although I think it's somewhat unlikely that third-party non-Spark consumers of the event logs would be relying on this redundant information, this is changing a longstanding user-facing data format and thus needs a flag. ### Why are the changes needed? This change reduces the size of Spark event logs, especially for logs from applications that run many tasks. It should also have slight benefits on event log read and write speed (although I haven't tried to quantify this). ### Does this PR introduce _any_ user-facing change? No user-facing changes in Spark History Server. This flag's effects could be considered a user-facing change from the perspective of third-party code which does its own direct processing of Spark event logs, hence the config. However, in this PR (by itself) the flag is off-by-default. Out-of-the-box user-facing changes will be discussed / proposed in a separate flag-flip PR. ### How was this patch tested? New unit tests in `JsonProtocolSuite`. Manual tests of event log size in `spark-shell` with a job that runs `spark.parallelize(1 to 1000, 1000).count()`. For this toy query, this PR's change shrunk the uncompressed event log size by ~15%. The relative size reduction will be even greater once other issues like https://issues.apache.org/jira/browse/SPARK-42206 or https://issues.apache.org/jira/browse/SPARK-42203 are fixed. The relative reduction will be smaller for tasks with many SQL metrics because those accumulables cannot be excluded. Closes apache#39763 from JoshRosen/SPARK-42204-remove-redundant-logging-of-taskmetrics-internal-accumulators-in-jsonprotocol. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com>
…metrics accumulator logging flag from SPARK-42204 ### What changes were proposed in this pull request? This PR corrects an unintentional default behavior change from apache#39763 That PR introduced a new configuration, `spark.eventLog.includeTaskMetricsAccumulators`, to provide an ability for users to disable the redundant logging of task metrics information via the Accumulables field in the Spark event log task end logs. I made a mistake in updating that PR description and code from the original version: the description says that the intent is to not change out of the box behavior, but the actual flag default was the opposite. This new PR corrects both the flag default and the flag description to reflect the original intent of not changing default behavior. ### Why are the changes needed? Roll back an unintentional behavior change. ### Does this PR introduce _any_ user-facing change? Yes, it rolls back an unintentional default behavior change. ### How was this patch tested? Existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#48372 from JoshRosen/fix-event-log-accumulable-defaults. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR adds an off-by-default option to JsonProtocol to have it exclude certain redundant accumulator information from Spark event logs in order to save space and processing time.
Several event logs types contain both TaskMetrics and Accumulables, but there is redundancy in how the TaskMetrics data is stored:
The redundant task metrics accumulables are not actually used by the Spark History Server: I verified this by reading AppStatusListener and SQLAppStatusListener.
I believe that this redundancy was introduced back in SPARK-10620 when Spark 1.x's separate TaskMetrics implementation was replaced by the current accumulator-based version.
In this PR, I add logic to exclude TaskMetrics internal accumulators when writing this field (if a new flag is enabled).
The new
spark.eventLog.includeTaskMetricsAccumulators
configuration (defaultfalse
, meaning "keep the redundant information") can be set totrue
to exclude these redundant internal accumulator updates.For now, I am merging this off-by-default, but in a followup PR for Spark 4.0.0 we might consider a change-of-default to
true
(in which case the flag would serve as an "escape-hatch" for users who want to restore the old behavior. Although I think it's somewhat unlikely that third-party non-Spark consumers of the event logs would be relying on this redundant information, this is changing a longstanding user-facing data format and thus needs a flag.Why are the changes needed?
This change reduces the size of Spark event logs, especially for logs from applications that run many tasks. It should also have slight benefits on event log read and write speed (although I haven't tried to quantify this).
Does this PR introduce any user-facing change?
No user-facing changes in Spark History Server.
This flag's effects could be considered a user-facing change from the perspective of third-party code which does its own direct processing of Spark event logs, hence the config. However, in this PR (by itself) the flag is off-by-default. Out-of-the-box user-facing changes will be discussed / proposed in a separate flag-flip PR.
How was this patch tested?
New unit tests in
JsonProtocolSuite
.Manual tests of event log size in
spark-shell
with a job that runsspark.parallelize(1 to 1000, 1000).count()
. For this toy query, this PR's change shrunk the uncompressed event log size by ~15%. The relative size reduction will be even greater once other issues like https://issues.apache.org/jira/browse/SPARK-42206 or https://issues.apache.org/jira/browse/SPARK-42203 are fixed. The relative reduction will be smaller for tasks with many SQL metrics because those accumulables cannot be excluded.