-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce log flood in Python PostCommit flink task #23635
Conversation
ce9a0a0
to
3c40cda
Compare
Codecov Report
@@ Coverage Diff @@
## master #23635 +/- ##
==========================================
- Coverage 73.35% 73.35% -0.01%
==========================================
Files 719 719
Lines 95799 95807 +8
==========================================
+ Hits 70276 70281 +5
- Misses 24211 24214 +3
Partials 1312 1312
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@@ -73,7 +74,7 @@ public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipeline | |||
static ExecutionEnvironment createBatchExecutionEnvironment( | |||
FlinkPipelineOptions options, List<String> filesToStage, @Nullable String confDir) { | |||
|
|||
LOG.info("Creating a Batch Execution Environment."); | |||
LOG.info("Creating a Batch Execution Environment with config {}.", confDir); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most log spams comes from org.apache.flink.runtime
during flink minicluster initiating. Tested that if assign confDir to a directory that has org.apache.flink.runtime WARN logging, it works. However, currently this parameter is always null (seems like beam not respecting parsed in flinkConfDir)
234abe6
to
4135f1d
Compare
1f55236
to
d6eb53f
Compare
7523c2b
to
29e396b
Compare
Run Python 3.8 PostCommit |
…nner * Flink runner support log_level_overrides * FlinkPortableRunner respect flinkConfDir pipeline option * Enable backslashed quotes in integration test pipeline options * Decrease Flink Runner Log Spam for Python PostCommit
29e396b
to
f15c3e2
Compare
Run Python 3.8 PostCommit |
Logs reduced from ~60 MB to 10 MB: https://ci-beam.apache.org/job/beam_PostCommit_Python38_PR/653/console |
Run Java PreCommit |
Run Typescript PreCommit |
Run SQL_Java17 PreCommit |
Run Python PreCommit |
Run Java_Kafka_IO_Direct PreCommit |
R: @tvalentyn |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
Run Java_Spark3_Versions PreCommit |
* Configure log manager's default log level and log level overrides from the sdk harness options, | ||
* and return the list of configured loggers. | ||
*/ | ||
public static List<java.util.logging.Logger> getConfiguredLoggerFromOptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was trying to find the best place to put these piece of code that sets log levels. It is now used by both :sdks:java:harness
and :runners:flink
. Seems like this module is a good place as there are other static public methods related to environments here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is SdkHarnessOptions.java
accessible from :runners:flink
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you do import that file in FlinkPipelineRunner.java so it must be
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you considered it as a home for this helper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SdkHarnessOptions.java is in sdks:java:core and yes it is accessible. Considered it and I was not sure if it is a good pattern to include some code that have side effect on the environment in the option class. If it sounds reasonable could move it in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took another look, agree that it is looks somewhat unnatural to modify the global context, especially the root logger. We can ask for second opinion on this from someone who does more work on Java SDK.
@kennknowles might have good advice.
this.filesToStage = filesToStage; | ||
} | ||
|
||
@Override | ||
public PortablePipelineResult run(final Pipeline pipeline, JobInfo jobInfo) throws Exception { | ||
MetricsEnvironment.setMetricsSupported(false); | ||
|
||
// Apply log levels settings at the beginning of pipeline run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a similar change in https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java ? Can be a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, not yet put it in because haven't tested it on spark runner. Entered #23713
5593f5c
to
3edff6a
Compare
Run Java_Spark3_Versions PreCommit |
|
||
// Use the passed in logging options to configure the various logger levels. | ||
if (loggingOptions.getDefaultSdkHarnessLogLevel() != null) { | ||
rootLogger.setLevel( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from reading this method it's not clear why rootLogger is not included into the configuredLoggers. should we modify the root logger separately, for example where this method is called?
* Configure log manager's default log level and log level overrides from the sdk harness options, | ||
* and return the list of configured loggers. | ||
*/ | ||
public static List<java.util.logging.Logger> getConfiguredLoggerFromOptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took another look, agree that it is looks somewhat unnatural to modify the global context, especially the root logger. We can ask for second opinion on this from someone who does more work on Java SDK.
@kennknowles might have good advice.
@@ -67,14 +68,18 @@ public class FlinkPipelineRunner implements PortablePipelineRunner { | |||
public FlinkPipelineRunner( | |||
FlinkPipelineOptions pipelineOptions, @Nullable String confDir, List<String> filesToStage) { | |||
this.pipelineOptions = pipelineOptions; | |||
this.confDir = confDir; | |||
// confDir takes precedence than pipelineOptions.getFlinkConfDir |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put this in the javadoc. A user needs to know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done. This is a fix, originally pipelineOptions's flinkConfDir did not take effect at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't pipeline option take precedence though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At a first glance I understood confDir
parameter as an override to pipelineOptions.getFlinkConfDir
in FlinkPipelineRunner's
constructor.
One use case is that the command lines argument of
beam/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
Line 160 in 3edff6a
public static void main(String[] args) throws Exception { |
could pass to
FlinkPipelineRunnerConfiguration
and finally appears as confDir
parameter here. According to the comment of beam/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
Line 213 in 3edff6a
+ "These properties will be set to all jobs submitted to Flink and take precedence " |
I understand that if
confDir
is set it should take precedence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found origins of this flag:
From: https://issues.apache.org/jira/browse/BEAM-14492
Sometimes it is necessary to be able to set any flink option via PipelineOptions to the runner - mostly when we submit job from vanilla Java, not being run via flink run
I think the intent is consistent with the usage that if a user specifies this via pipeline options, it should take precedence over predetermined runner configuration.
cc'ing @je-ik just in case and FYI since you mention that originally pipelineOptions's flinkConfDir did not take effect at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! That makes sense to me. confDir is per server and pipelineOptions is per pipeline. Will change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, please update the Javadoc as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I observe the option does not take effect when invoking a Python pipeline and I targeted the code here. It may be related to What @kennknowles mentioned below there is a "non-portable mode" and also a "portable mode" having different entry points for environment settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Running Python pipelines on Flink Runner should always be a portable mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, then this is in portable mode confDir settings in pipeline options did not take effect.
@@ -304,6 +304,13 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { | |||
"--environment_type=LOOPBACK", | |||
"--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", | |||
"--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}", | |||
'--sdk_harness_log_level_overrides=' + | |||
// suppress info level flink.runtime log flood | |||
'{\\"org.apache.flink.runtime\\":\\"WARN\\",' + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This basically looks funny to me because the SDK harness is only actually used in portable mode, where there is no org.apache.flink
namespace.
So the thing we are changing is when it is run in non-portable mode, and there is not actually any SDK harness.
I don't know a good solution. It is just a naming thing. Probably good to have a single flag that works now and also later. Is there not already a --log_level_overrides
? I guess then it is ambiguous whether you are applying it to the runner or to the SDK harness.
I have no solution. This is basically OK with me, but something about it is not perfectly clean.
Maybe @lukecwik has an opinion about which flags should control which log levels. I know that today we have
- Flag for the Dataflow worker
- Flag for the SDK harness
So unfortunately neither are a good choice for sharing with other runners.
I don't know if any other runner has any flag at all. My issue is that "SDK harness" is a pretty weird name for a concept that most users really don't or shouldn't care about or know exists most of the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we have two flags about log level overrides, another one is in DataflowWorkerLoggingOptions but this option class is deprecated. So I just use the existing sdk_harness_log_level_overrides
unless we want to create another option flag. I agree --log_level_overrides
would make generic sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we decide to create new options for log overrides I think it would be under another Issue.
cb81467
to
afe5330
Compare
Python PreCommit fails broken pipe on newly added Py310 test
|
Java PreCommit fails known flakes
|
It's being worked on: #23734 |
248173b
to
2a8077b
Compare
wow all Jenkins tests passed once. Rare |
Is the PreCommit failure on Python 3.10 flaky? |
It is considered flaky if not fixed in between my second last (failure) and the last push (suceeded). |
Thanks, @Abacn ! |
Fixes #23631
Flink runner support log_level_overrides
FlinkPortableRunner respect flinkConfDir pipeline option
Decrease Flink Runner Log Spam for Python PostCommit
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.