From d6a4eb966fbc47277e07b79e7c64939a62eb1d54 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Sat, 3 Feb 2024 13:17:36 +0100 Subject: [PATCH] [FLINK-34417] Log Job ID via MDC --- .../docs/deployment/advanced/logging.md | 15 ++ .../docs/deployment/advanced/logging.md | 14 ++ .../apache/flink/util/MdcAwareExecutor.java | 39 ++++ .../flink/util/MdcAwareExecutorService.java | 114 +++++++++ .../MdcAwareScheduledExecutorService.java | 61 +++++ .../java/org/apache/flink/util/MdcUtils.java | 112 +++++++++ .../org/apache/flink/util/MdcUtilsTest.java | 148 ++++++++++++ .../rpc/pekko/FencedPekkoRpcActor.java | 7 +- .../runtime/rpc/pekko/PekkoRpcActor.java | 75 +++--- .../runtime/rpc/pekko/PekkoRpcService.java | 11 +- .../flink/runtime/rpc/FencedRpcEndpoint.java | 14 +- .../apache/flink/runtime/rpc/RpcEndpoint.java | 33 ++- .../apache/flink/runtime/rpc/RpcService.java | 7 +- .../checkpoint/CheckpointCoordinator.java | 47 ++-- ...annelStateWriteRequestExecutorFactory.java | 3 +- .../ChannelStateWriteRequestExecutorImpl.java | 60 +++-- .../flink/runtime/dispatcher/Dispatcher.java | 41 ++-- ...bMasterServiceLeadershipRunnerFactory.java | 4 +- .../executiongraph/DefaultExecutionGraph.java | 10 +- .../flink/runtime/jobmaster/JobMaster.java | 21 +- .../resourcemanager/ResourceManager.java | 48 ++-- .../runtime/taskexecutor/TaskExecutor.java | 183 ++++++++------- .../flink/runtime/taskmanager/Task.java | 28 ++- ...nnelStateWriteRequestExecutorImplTest.java | 29 ++- .../flink/runtime/rpc/TestingRpcService.java | 7 +- .../streaming/runtime/tasks/StreamTask.java | 24 +- .../logging/LoggerAuditingExtension.java | 21 +- flink-tests/pom.xml | 2 +- .../OperatorEventSendingCheckpointITCase.java | 6 +- .../flink/test/misc/JobIDLoggingITCase.java | 220 ++++++++++++++++++ .../src/test/resources/log4j2-test.properties | 2 +- 31 files changed, 1148 insertions(+), 258 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutor.java create mode 100644 flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutorService.java create mode 100644 flink-core/src/main/java/org/apache/flink/util/MdcAwareScheduledExecutorService.java create mode 100644 flink-core/src/main/java/org/apache/flink/util/MdcUtils.java create mode 100644 flink-core/src/test/java/org/apache/flink/util/MdcUtilsTest.java create mode 100644 flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java diff --git a/docs/content.zh/docs/deployment/advanced/logging.md b/docs/content.zh/docs/deployment/advanced/logging.md index abb4b1025f037..432336946de9c 100644 --- a/docs/content.zh/docs/deployment/advanced/logging.md +++ b/docs/content.zh/docs/deployment/advanced/logging.md @@ -40,6 +40,21 @@ Flink 中的日志记录是使用 [SLF4J](http://www.slf4j.org/) 日志接口实 +### Structured logging + +Flink adds the following fields to [MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log messages (experimental feature): +- Job ID + - key: `flink-job-id` + - format: string + - length 32 + +This is most useful in environments with structured logging and allows you to quickly filter the relevant logs. + +The MDC is propagated by slf4j to the logging backend which usually adds it to the log records automatically (e.g. in [log4j json layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html)). +Alternatively, it can be configured explicitly - [log4j pattern layout](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html) might look like this: + +`[%-32X{flink-job-id}] %c{0} %m%n`. + ## 配置 Log4j 2 Log4j 2 是通过 property 配置文件进行配置的。 diff --git a/docs/content/docs/deployment/advanced/logging.md b/docs/content/docs/deployment/advanced/logging.md index 6c01e1ddff1d6..cc2d0201e17ab 100644 --- a/docs/content/docs/deployment/advanced/logging.md +++ b/docs/content/docs/deployment/advanced/logging.md @@ -38,6 +38,20 @@ This allows you to use any logging framework that supports SLF4J, without having By default, [Log4j 2](https://logging.apache.org/log4j/2.x/index.html) is used as the underlying logging framework. +### Structured logging + +Flink adds the following fields to [MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log messages (experimental feature): +- Job ID + - key: `flink-job-id` + - format: string + - length 32 + +This is most useful in environments with structured logging and allows you to quickly filter the relevant logs. + +The MDC is propagated by slf4j to the logging backend which usually adds it to the log records automatically (e.g. in [log4j json layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html)). +Alternatively, it can be configured explicitly - [log4j pattern layout](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html) might look like this: + +`[%-32X{flink-job-id}] %c{0} %m%n`. ## Configuring Log4j 2 diff --git a/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutor.java b/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutor.java new file mode 100644 index 0000000000000..a6f9c662dab33 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +class MdcAwareExecutor implements Executor { + protected final Map contextData; + protected final T delegate; + + protected MdcAwareExecutor(T delegate, Map contextData) { + this.delegate = checkNotNull(delegate); + this.contextData = Collections.unmodifiableMap(checkNotNull(contextData)); + } + + public void execute(Runnable command) { + delegate.execute(MdcUtils.wrapRunnable(contextData, command)); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutorService.java b/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutorService.java new file mode 100644 index 0000000000000..693a247481bd0 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutorService.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.MdcUtils.wrapCallable; +import static org.apache.flink.util.MdcUtils.wrapRunnable; + +class MdcAwareExecutorService extends MdcAwareExecutor + implements ExecutorService { + + public MdcAwareExecutorService(S delegate, Map contextData) { + super(delegate, contextData); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return delegate.submit(wrapCallable(contextData, task)); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate.submit(wrapRunnable(contextData, task), result); + } + + @Override + public Future submit(Runnable task) { + return delegate.submit(wrapRunnable(contextData, task)); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + return delegate.invokeAll(wrapCallables(tasks)); + } + + @Override + public List> invokeAll( + Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return delegate.invokeAll(wrapCallables(tasks), timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + return delegate.invokeAny(wrapCallables(tasks)); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(wrapCallables(tasks), timeout, unit); + } + + private List> wrapCallables(Collection> tasks) { + List> list = new ArrayList<>(tasks.size()); + for (Callable task : tasks) { + list.add(wrapCallable(contextData, task)); + } + return list; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/util/MdcAwareScheduledExecutorService.java b/flink-core/src/main/java/org/apache/flink/util/MdcAwareScheduledExecutorService.java new file mode 100644 index 0000000000000..1fa71dd659fdb --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/MdcAwareScheduledExecutorService.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.MdcUtils.wrapCallable; +import static org.apache.flink.util.MdcUtils.wrapRunnable; + +class MdcAwareScheduledExecutorService extends MdcAwareExecutorService + implements ScheduledExecutorService { + + public MdcAwareScheduledExecutorService( + ScheduledExecutorService delegate, Map contextData) { + super(delegate, contextData); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(wrapRunnable(contextData, command), delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return delegate.schedule(wrapCallable(contextData, callable), delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate.scheduleAtFixedRate( + wrapRunnable(contextData, command), initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate.scheduleWithFixedDelay( + wrapRunnable(contextData, command), initialDelay, delay, unit); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/util/MdcUtils.java b/flink-core/src/main/java/org/apache/flink/util/MdcUtils.java new file mode 100644 index 0000000000000..c448c9837dbfd --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/MdcUtils.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.api.common.JobID; + +import org.slf4j.MDC; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Utility class to manage common Flink attributes in {@link MDC} (only {@link JobID} ATM). */ +public class MdcUtils { + + public static final String JOB_ID = "flink-job-id"; + + /** + * Replace MDC contents with the provided one and return a closeable object that can be used to + * restore the original MDC. + * + * @param context to put into MDC + */ + public static MdcCloseable withContext(Map context) { + final Map orig = MDC.getCopyOfContextMap(); + MDC.setContextMap(context); + return () -> MDC.setContextMap(orig); + } + + /** {@link AutoCloseable } that restores the {@link MDC} contents on close. */ + public interface MdcCloseable extends AutoCloseable { + @Override + void close(); + } + + /** + * Wrap the given {@link Runnable} so that the given data is added to {@link MDC} before its + * execution and removed afterward. + */ + public static Runnable wrapRunnable(Map contextData, Runnable command) { + return () -> { + try (MdcCloseable ctx = withContext(contextData)) { + command.run(); + } + }; + } + + /** + * Wrap the given {@link Callable} so that the given data is added to {@link MDC} before its + * execution and removed afterward. + */ + public static Callable wrapCallable( + Map contextData, Callable command) { + return () -> { + try (MdcCloseable ctx = withContext(contextData)) { + return command.call(); + } + }; + } + + /** + * Wrap the given {@link Executor} so that the given {@link JobID} is added before it executes + * any submitted commands and removed afterward. + */ + public static Executor scopeToJob(JobID jobID, Executor executor) { + checkArgument(!(executor instanceof MdcAwareExecutor)); + return new MdcAwareExecutor<>(executor, asContextData(jobID)); + } + + /** + * Wrap the given {@link ExecutorService} so that the given {@link JobID} is added before it + * executes any submitted commands and removed afterward. + */ + public static ExecutorService scopeToJob(JobID jobID, ExecutorService delegate) { + checkArgument(!(delegate instanceof MdcAwareExecutorService)); + return new MdcAwareExecutorService<>(delegate, asContextData(jobID)); + } + + /** + * Wrap the given {@link ScheduledExecutorService} so that the given {@link JobID} is added + * before it executes any submitted commands and removed afterward. + */ + public static ScheduledExecutorService scopeToJob(JobID jobID, ScheduledExecutorService ses) { + checkArgument(!(ses instanceof MdcAwareScheduledExecutorService)); + return new MdcAwareScheduledExecutorService(ses, asContextData(jobID)); + } + + public static Map asContextData(JobID jobID) { + return Collections.singletonMap(JOB_ID, jobID.toHexString()); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/util/MdcUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/MdcUtilsTest.java new file mode 100644 index 0000000000000..35917d28c7289 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/util/MdcUtilsTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.testutils.logging.LoggerAuditingExtension; +import org.apache.flink.util.MdcUtils.MdcCloseable; +import org.apache.flink.util.concurrent.Executors; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.apache.logging.log4j.core.LogEvent; +import org.assertj.core.api.AbstractObjectAssert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.MdcUtils.asContextData; +import static org.apache.flink.util.MdcUtils.wrapCallable; +import static org.apache.flink.util.MdcUtils.wrapRunnable; +import static org.assertj.core.api.Assertions.assertThat; +import static org.slf4j.event.Level.DEBUG; + +/** Tests for the {@link MdcUtils}. */ +class MdcUtilsTest { + private static final Logger LOGGER = LoggerFactory.getLogger(MdcUtilsTest.class); + private static final Runnable LOGGING_RUNNABLE = () -> LOGGER.info("ignore"); + + @RegisterExtension + public final LoggerAuditingExtension loggerExtension = + new LoggerAuditingExtension(MdcUtilsTest.class, DEBUG); + + @Test + public void testJobIDAsContext() { + JobID jobID = new JobID(); + assertThat(MdcUtils.asContextData(jobID)) + .isEqualTo(Collections.singletonMap("flink-job-id", jobID.toHexString())); + } + + @Test + public void testMdcCloseableAddsJobId() throws Exception { + assertJobIDLogged( + jobID -> { + try (MdcCloseable ignored = MdcUtils.withContext(asContextData(jobID))) { + LOGGER.warn("ignore"); + } + }); + } + + @Test + public void testMdcCloseableRemovesJobId() { + JobID jobID = new JobID(); + try (MdcCloseable ignored = MdcUtils.withContext(asContextData(jobID))) { + // ... + } + LOGGER.warn("with-job"); + assertJobIdLogged(null); + } + + @Test + public void testWrapRunnable() throws Exception { + assertJobIDLogged(jobID -> wrapRunnable(asContextData(jobID), LOGGING_RUNNABLE).run()); + } + + @Test + public void testWrapCallable() throws Exception { + assertJobIDLogged( + jobID -> + wrapCallable( + asContextData(jobID), + () -> { + LOGGER.info("ignore"); + return null; + }) + .call()); + } + + @Test + public void testScopeExecutor() throws Exception { + assertJobIDLogged( + jobID -> + MdcUtils.scopeToJob(jobID, Executors.directExecutor()) + .execute(LOGGING_RUNNABLE)); + } + + @Test + public void testScopeExecutorService() throws Exception { + assertJobIDLogged( + jobID -> + MdcUtils.scopeToJob(jobID, Executors.newDirectExecutorService()) + .submit(LOGGING_RUNNABLE) + .get()); + } + + @Test + public void testScopeScheduledExecutorService() throws Exception { + ScheduledExecutorService ses = + java.util.concurrent.Executors.newSingleThreadScheduledExecutor(); + try { + assertJobIDLogged( + jobID -> + MdcUtils.scopeToJob(jobID, ses) + .schedule(LOGGING_RUNNABLE, 1L, TimeUnit.MILLISECONDS) + .get()); + } finally { + ses.shutdownNow(); + } + } + + private void assertJobIDLogged(ThrowingConsumer action) throws Exception { + JobID jobID = new JobID(); + action.accept(jobID); + assertJobIdLogged(jobID); + } + + private void assertJobIdLogged(JobID jobId) { + AbstractObjectAssert extracting = + assertThat(loggerExtension.getEvents()) + .singleElement() + .extracting(LogEvent::getContextData) + .extracting(m -> m.getValue("flink-job-id")); + if (jobId == null) { + extracting.isNull(); + } else { + extracting.isEqualTo(jobId.toHexString()); + } + } +} diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/FencedPekkoRpcActor.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/FencedPekkoRpcActor.java index 860a263f53976..b2ebf0a732050 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/FencedPekkoRpcActor.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/FencedPekkoRpcActor.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException; import java.io.Serializable; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -47,14 +48,16 @@ public FencedPekkoRpcActor( int version, final long maximumFramesize, final boolean forceSerialization, - ClassLoader flinkClassLoader) { + ClassLoader flinkClassLoader, + final Map loggingContext) { super( rpcEndpoint, terminationFuture, version, maximumFramesize, forceSerialization, - flinkClassLoader); + flinkClassLoader, + loggingContext); } @Override diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java index dc4e342f35a92..a98777968677b 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException; import org.apache.flink.types.Either; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MdcUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.FutureUtils; @@ -52,6 +53,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -102,6 +104,7 @@ class PekkoRpcActor extends AbstractActor { private final AtomicBoolean rpcEndpointStopped; private final boolean forceSerialization; + private final Map loggingContext; private volatile RpcEndpointTerminationResult rpcEndpointTerminationResult; @@ -113,7 +116,9 @@ class PekkoRpcActor extends AbstractActor { final int version, final long maximumFramesize, final boolean forceSerialization, - final ClassLoader flinkClassLoader) { + final ClassLoader flinkClassLoader, + final Map loggingContext) { + this.loggingContext = loggingContext; checkArgument(maximumFramesize > 0, "Maximum framesize must be positive."); this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); @@ -161,30 +166,32 @@ public Receive createReceive() { } private void handleMessage(final Object message) { - if (state.isRunning()) { - mainThreadValidator.enterMainThread(); + try (MdcUtils.MdcCloseable ctx = MdcUtils.withContext(loggingContext)) { + if (state.isRunning()) { + mainThreadValidator.enterMainThread(); + + try { + handleRpcMessage(message); + } finally { + mainThreadValidator.exitMainThread(); + } + } else { + log.info( + "The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", + rpcEndpoint.getClass().getName(), + message); - try { - handleRpcMessage(message); - } finally { - mainThreadValidator.exitMainThread(); + sendErrorIfSender( + new EndpointNotStartedException( + String.format( + "Discard message %s, because the rpc endpoint %s has not been started yet.", + message, getSelf().path()))); } - } else { - log.info( - "The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", - rpcEndpoint.getClass().getName(), - message); - - sendErrorIfSender( - new EndpointNotStartedException( - String.format( - "Discard message %s, because the rpc endpoint %s has not been started yet.", - message, getSelf().path()))); } } private void handleControlMessage(ControlMessages controlMessage) { - try { + try (MdcUtils.MdcCloseable ctx = MdcUtils.withContext(loggingContext)) { switch (controlMessage) { case START: state = state.start(this, flinkClassLoader); @@ -237,20 +244,22 @@ protected void handleRpcMessage(Object message) { } private void handleHandshakeMessage(RemoteHandshakeMessage handshakeMessage) { - if (!isCompatibleVersion(handshakeMessage.getVersion())) { - sendErrorIfSender( - new HandshakeException( - String.format( - "Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.", - handshakeMessage.getVersion(), getVersion()))); - } else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) { - sendErrorIfSender( - new HandshakeException( - String.format( - "The rpc endpoint does not support the gateway %s.", - handshakeMessage.getRpcGateway().getSimpleName()))); - } else { - getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf()); + try (MdcUtils.MdcCloseable ctx = MdcUtils.withContext(loggingContext)) { + if (!isCompatibleVersion(handshakeMessage.getVersion())) { + sendErrorIfSender( + new HandshakeException( + String.format( + "Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.", + handshakeMessage.getVersion(), getVersion()))); + } else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) { + sendErrorIfSender( + new HandshakeException( + String.format( + "The rpc endpoint does not support the gateway %s.", + handshakeMessage.getRpcGateway().getSimpleName()))); + } else { + getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf()); + } } } diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcService.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcService.java index c9d276685edcb..ab8bee418228d 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcService.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcService.java @@ -260,10 +260,12 @@ public > CompletableFuture } @Override - public RpcServer startServer(C rpcEndpoint) { + public RpcServer startServer( + C rpcEndpoint, Map loggingContext) { checkNotNull(rpcEndpoint, "rpc endpoint"); - final SupervisorActor.ActorRegistration actorRegistration = registerRpcActor(rpcEndpoint); + final SupervisorActor.ActorRegistration actorRegistration = + registerRpcActor(rpcEndpoint, loggingContext); final ActorRef actorRef = actorRegistration.getActorRef(); final CompletableFuture actorTerminationFuture = actorRegistration.getTerminationFuture(); @@ -336,7 +338,7 @@ public RpcServer startServer(C rpcEndpoint) } private SupervisorActor.ActorRegistration registerRpcActor( - C rpcEndpoint) { + C rpcEndpoint, Map loggingContext) { final Class rpcActorType; if (rpcEndpoint instanceof FencedRpcEndpoint) { @@ -359,7 +361,8 @@ private SupervisorActor.ActorRegistration r getVersion(), configuration.getMaximumFramesize(), configuration.isForceRpcInvocationSerialization(), - flinkClassLoader), + flinkClassLoader, + loggingContext), rpcEndpoint.getEndpointId()); final SupervisorActor.ActorRegistration actorRegistration = diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java index e6ed59f737d55..a6fbd2792a4f1 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java @@ -21,6 +21,8 @@ import org.apache.flink.util.Preconditions; import java.io.Serializable; +import java.util.Collections; +import java.util.Map; import java.util.UUID; /** @@ -34,8 +36,12 @@ public abstract class FencedRpcEndpoint extends RpcEndpo private final F fencingToken; - protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F fencingToken) { - super(rpcService, endpointId); + protected FencedRpcEndpoint( + RpcService rpcService, + String endpointId, + F fencingToken, + Map loggingContext) { + super(rpcService, endpointId, loggingContext); Preconditions.checkNotNull(fencingToken, "The fence token should be null"); Preconditions.checkNotNull(rpcServer, "The rpc server should be null"); @@ -43,6 +49,10 @@ protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F fencingT this.fencingToken = fencingToken; } + protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F fencingToken) { + this(rpcService, endpointId, fencingToken, Collections.emptyMap()); + } + protected FencedRpcEndpoint(RpcService rpcService, F fencingToken) { this(rpcService, UUID.randomUUID().toString(), fencingToken); } diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 47b64d1200eba..b1fda4a044326 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -19,10 +19,12 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ScheduledFutureAdapter; import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.MdcUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -34,9 +36,12 @@ import java.io.Closeable; import java.io.IOException; import java.time.Duration; +import java.util.Collections; +import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledExecutorService; @@ -134,11 +139,12 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { * @param rpcService The RPC server that dispatches calls to this RPC endpoint. * @param endpointId Unique identifier for this endpoint */ - protected RpcEndpoint(final RpcService rpcService, final String endpointId) { + protected RpcEndpoint( + RpcService rpcService, String endpointId, Map loggingContext) { this.rpcService = checkNotNull(rpcService, "rpcService"); this.endpointId = checkNotNull(endpointId, "endpointId"); - this.rpcServer = rpcService.startServer(this); + this.rpcServer = rpcService.startServer(this, loggingContext); this.resourceRegistry = new CloseableRegistry(); this.mainThreadExecutor = @@ -146,6 +152,16 @@ protected RpcEndpoint(final RpcService rpcService, final String endpointId) { registerResource(this.mainThreadExecutor); } + /** + * Initializes the RPC endpoint. + * + * @param rpcService The RPC server that dispatches calls to this RPC endpoint. + * @param endpointId Unique identifier for this endpoint + */ + protected RpcEndpoint(final RpcService rpcService, final String endpointId) { + this(rpcService, endpointId, Collections.emptyMap()); + } + /** * Initializes the RPC endpoint with a random endpoint id. * @@ -342,6 +358,19 @@ protected MainThreadExecutor getMainThreadExecutor() { return mainThreadExecutor; } + /** + * Gets the main thread execution context. The main thread execution context can be used to + * execute tasks in the main thread of the underlying RPC endpoint. + * + * @param jobID the {@link JobID} to scope the returned {@link ComponentMainThreadExecutor} to, + * i.e. add/remove before/after the invocations using the returned executor + * @return Main thread execution context + */ + protected Executor getMainThreadExecutor(JobID jobID) { + // todo: consider caching + return MdcUtils.scopeToJob(jobID, getMainThreadExecutor()); + } + /** * Gets the endpoint's RPC service. * diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 9d60fd0053483..788f2cb93e57a 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -23,6 +23,7 @@ import org.apache.flink.util.concurrent.ScheduledExecutor; import java.io.Serializable; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -93,11 +94,13 @@ > CompletableFuture con /** * Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint. * - * @param rpcEndpoint Rpc protocol to dispatch the rpcs to * @param Type of the rpc endpoint + * @param rpcEndpoint Rpc protocol to dispatch the rpcs to + * @param loggingContext * @return Self gateway to dispatch remote procedure calls to oneself */ - RpcServer startServer(C rpcEndpoint); + RpcServer startServer( + C rpcEndpoint, Map loggingContext); /** * Stop the underlying rpc server of the provided self gateway. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index d74f106276740..25afade023935 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.MdcUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.apache.flink.util.clock.Clock; @@ -795,29 +796,31 @@ private void triggerCheckpointRequest( triggerTasks(request, timestamp, checkpoint) .exceptionally( failure -> { - LOG.info( - "Triggering Checkpoint {} for job {} failed due to {}", - checkpoint.getCheckpointID(), - job, - failure); - - final CheckpointException cause; - if (failure instanceof CheckpointException) { - cause = (CheckpointException) failure; - } else { - cause = - new CheckpointException( - CheckpointFailureReason - .TRIGGER_CHECKPOINT_FAILURE, - failure); + try (MdcUtils.MdcCloseable ignored = + MdcUtils.withContext(MdcUtils.asContextData(job))) { + LOG.info( + "Triggering Checkpoint {} for job {} failed due to {}", + checkpoint.getCheckpointID(), + job, + failure); + final CheckpointException cause; + if (failure instanceof CheckpointException) { + cause = (CheckpointException) failure; + } else { + cause = + new CheckpointException( + CheckpointFailureReason + .TRIGGER_CHECKPOINT_FAILURE, + failure); + } + timer.execute( + () -> { + synchronized (lock) { + abortPendingCheckpoint(checkpoint, cause); + } + }); + return null; } - timer.execute( - () -> { - synchronized (lock) { - abortPendingCheckpoint(checkpoint, cause); - } - }); - return null; }); // It is possible that the tasks has finished diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java index 8854c0976d169..a05e47a191665 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java @@ -70,7 +70,8 @@ checkpointStorage, jobID, new ChannelStateSerializerImpl()), checkState(this.executor == executor); this.executor = null; }, - lock); + lock, + jobID); if (startExecutor) { executor.start(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java index b20388e1be04a..25fa06c9f2d44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java @@ -18,9 +18,11 @@ package org.apache.flink.runtime.checkpoint.channel; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FileSystemSafetyNet; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MdcUtils; import org.apache.flink.util.function.RunnableWithException; import org.slf4j.Logger; @@ -91,17 +93,21 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx @GuardedBy("registerLock") private final Consumer onRegistered; + private final JobID jobID; + ChannelStateWriteRequestExecutorImpl( ChannelStateWriteRequestDispatcher dispatcher, int maxSubtasksPerChannelStateFile, Consumer onRegistered, - Object registerLock) { + Object registerLock, + JobID jobID) { this( dispatcher, new ArrayDeque<>(), maxSubtasksPerChannelStateFile, registerLock, - onRegistered); + onRegistered, + jobID); } ChannelStateWriteRequestExecutorImpl( @@ -109,44 +115,48 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx Deque deque, int maxSubtasksPerChannelStateFile, Object registerLock, - Consumer onRegistered) { + Consumer onRegistered, + JobID jobID) { this.dispatcher = dispatcher; this.deque = deque; this.maxSubtasksPerChannelStateFile = maxSubtasksPerChannelStateFile; this.registerLock = registerLock; this.onRegistered = onRegistered; - this.thread = new Thread(this::run, "Channel state writer "); + this.thread = new Thread(this::run, "Channel state writer"); this.subtasks = new HashSet<>(); this.thread.setDaemon(true); + this.jobID = jobID; } @VisibleForTesting void run() { - try { - FileSystemSafetyNet.initializeSafetyNetForThread(); - loop(); - } catch (Exception ex) { - thrown = ex; - } finally { + try (MdcUtils.MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobID))) { try { - closeAll( - this::cleanupRequests, - () -> { - Throwable cause; - synchronized (lock) { - cause = thrown == null ? new CancellationException() : thrown; - } - dispatcher.fail(cause); - }); - } catch (Exception e) { - synchronized (lock) { - //noinspection NonAtomicOperationOnVolatileField - thrown = ExceptionUtils.firstOrSuppressed(e, thrown); + FileSystemSafetyNet.initializeSafetyNetForThread(); + loop(); + } catch (Exception ex) { + thrown = ex; + } finally { + try { + closeAll( + this::cleanupRequests, + () -> { + Throwable cause; + synchronized (lock) { + cause = thrown == null ? new CancellationException() : thrown; + } + dispatcher.fail(cause); + }); + } catch (Exception e) { + synchronized (lock) { + //noinspection NonAtomicOperationOnVolatileField + thrown = ExceptionUtils.firstOrSuppressed(e, thrown); + } } + FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); } - FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); + LOG.debug("loop terminated"); } - LOG.debug("loop terminated"); } private void loop() throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index fed987e98197e..5b89203095e41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -96,6 +96,8 @@ import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.MdcUtils; +import org.apache.flink.util.MdcUtils.MdcCloseable; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.concurrent.FutureUtils; @@ -403,7 +405,8 @@ private void runRecoveredJob(final JobGraph recoveredJob) { initJobClientExpiredTime(recoveredJob); - try { + try (MdcCloseable ignored = + MdcUtils.withContext(MdcUtils.asContextData(recoveredJob.getJobID()))) { runJob(createJobMasterRunner(recoveredJob), ExecutionType.RECOVERY); } catch (Throwable throwable) { onFatalError( @@ -431,7 +434,7 @@ private void initJobClientExpiredTime(JobGraph jobGraph) { .getScheduledExecutor() .scheduleWithFixedDelay( () -> - getMainThreadExecutor() + getMainThreadExecutor(jobID) .execute(this::checkJobClientAliveness), 0L, jobClientAlivenessCheckInterval, @@ -513,7 +516,9 @@ private void stopDispatcherServices() throws Exception { @Override public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) { final JobID jobID = jobGraph.getJobID(); - log.info("Received JobGraph submission '{}' ({}).", jobGraph.getName(), jobID); + try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobID))) { + log.info("Received JobGraph submission '{}' ({}).", jobGraph.getName(), jobID); + } return isInGloballyTerminalState(jobID) .thenComposeAsync( isTerminated -> { @@ -547,7 +552,7 @@ public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) return internalSubmitJob(jobGraph); } }, - getMainThreadExecutor()); + getMainThreadExecutor(jobID)); } @Override @@ -636,7 +641,7 @@ private CompletableFuture handleTermination( new JobSubmissionException( jobId, "Failed to submit job.", strippedThrowable)); }, - getMainThreadExecutor()); + getMainThreadExecutor(jobId)); } return CompletableFuture.completedFuture(Acknowledge.get()); } @@ -668,7 +673,7 @@ private JobManagerRunner createJobCleanupRunner(JobResult dirtyJobResult) throws dirtyJobResult, highAvailabilityServices.getCheckpointRecoveryFactory(), configuration, - ioExecutor); + getIoExecutor(dirtyJobResult.getJobId())); } private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionType) @@ -698,7 +703,7 @@ private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionTy jobId, JobStatus.FAILED, throwable)); } }, - getMainThreadExecutor()) + getMainThreadExecutor(jobId)) .thenCompose(Function.identity()); final CompletableFuture jobTerminationFuture = @@ -1185,7 +1190,7 @@ public CompletableFuture updateJobResourceRequirements( e)); } }, - ioExecutor) + getIoExecutor(jobId)) .thenComposeAsync( ignored -> performOperationOnJobMasterGateway( @@ -1193,7 +1198,7 @@ public CompletableFuture updateJobResourceRequirements( jobMasterGateway -> jobMasterGateway.updateJobResourceRequirements( jobResourceRequirements)), - getMainThreadExecutor()) + getMainThreadExecutor(jobId)) .whenComplete( (ack, error) -> { if (error != null) { @@ -1254,7 +1259,7 @@ private void registerJobManagerRunnerTerminationFuture( jobManagerRunnerTerminationFutures.put(jobId, terminationFuture); } }, - getMainThreadExecutor()); + getMainThreadExecutor(jobId)); } private CompletableFuture removeJob(JobID jobId, CleanupJobState cleanupJobState) { @@ -1280,7 +1285,7 @@ private CompletableFuture removeJob(JobID jobId, CleanupJobState cleanupJo () -> runPostJobGloballyTerminated( jobId, cleanupJobState.getJobStatus()), - getMainThreadExecutor()); + getMainThreadExecutor(jobId)); } else { return localResourceCleaner.cleanupAsync(jobId); } @@ -1400,7 +1405,7 @@ private CompletableFuture registerGloballyTerminatedJobInJobRes } return CleanupJobState.globalCleanup(terminalJobStatus); }, - getMainThreadExecutor()); + getMainThreadExecutor(jobId)); } /** @@ -1474,7 +1479,8 @@ private CompletableFuture archiveExecutionGraphToHistoryServer( } return Acknowledge.get(); }, - getMainThreadExecutor()); + getMainThreadExecutor( + executionGraphInfo.getArchivedExecutionGraph().getJobID())); } private void jobMasterFailed(JobID jobId, Throwable cause) { @@ -1562,7 +1568,7 @@ private CompletableFuture waitForTerminatingJob( return FutureUtils.thenAcceptAsyncIfNotDone( jobManagerTerminationFuture, - getMainThreadExecutor(), + getMainThreadExecutor(jobId), FunctionUtils.uncheckedConsumer( (ignored) -> { jobManagerRunnerTerminationFutures.remove(jobId); @@ -1586,7 +1592,7 @@ private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) { } public CompletableFuture onRemovedJobGraph(JobID jobId) { - return CompletableFuture.runAsync(() -> terminateJob(jobId), getMainThreadExecutor()); + return CompletableFuture.runAsync(() -> terminateJob(jobId), getMainThreadExecutor(jobId)); } private void applyParallelismOverrides(JobGraph jobGraph) { @@ -1607,4 +1613,9 @@ private void applyParallelismOverrides(JobGraph jobGraph) { } } } + + private Executor getIoExecutor(JobID jobID) { + // todo: consider caching + return MdcUtils.scopeToJob(jobID, ioExecutor); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java index ecd67e1f3d426..bf7c69fb07709 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElection; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.MdcUtils; import org.apache.flink.util.Preconditions; import java.util.Collection; @@ -98,7 +99,8 @@ public JobManagerRunner createJobManagerRunner( final DefaultJobMasterServiceFactory jobMasterServiceFactory = new DefaultJobMasterServiceFactory( - jobManagerServices.getIoExecutor(), + MdcUtils.scopeToJob( + jobGraph.getJobID(), jobManagerServices.getIoExecutor()), rpcService, jobMasterConfiguration, jobGraph, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index 168a7436b2862..1a853d851b6e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -85,6 +85,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.IterableUtils; +import org.apache.flink.util.MdcUtils; import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TernaryBoolean; @@ -484,9 +485,12 @@ public void failJobDueToTaskFailure( checkState(checkpointCoordinatorTimer == null); checkpointCoordinatorTimer = - Executors.newSingleThreadScheduledExecutor( - new DispatcherThreadFactory( - Thread.currentThread().getThreadGroup(), "Checkpoint Timer")); + MdcUtils.scopeToJob( + getJobID(), + Executors.newSingleThreadScheduledExecutor( + new DispatcherThreadFactory( + Thread.currentThread().getThreadGroup(), + "Checkpoint Timer"))); // create the coordinator that triggers and commits checkpoints and holds the state checkpointCoordinator = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 6edaa00ef07c2..3b288089a46bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -104,6 +104,7 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.MdcUtils; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.concurrent.FutureUtils; @@ -250,7 +251,11 @@ public JobMaster( long initializationTimestamp) throws Exception { - super(rpcService, RpcServiceUtils.createRandomName(JOB_MANAGER_NAME), jobMasterId); + super( + rpcService, + RpcServiceUtils.createRandomName(JOB_MANAGER_NAME), + jobMasterId, + MdcUtils.asContextData(jobGraph.getJobID())); final ExecutionDeploymentReconciliationHandler executionStateReconciliationHandler = new ExecutionDeploymentReconciliationHandler() { @@ -291,6 +296,10 @@ public void onUnknownDeploymentsOf( } } }; + final String jobName = jobGraph.getName(); + final JobID jid = jobGraph.getJobID(); + + log.info("Initializing job '{}' ({}).", jobName, jid); this.executionDeploymentTracker = executionDeploymentTracker; this.executionDeploymentReconciler = @@ -302,8 +311,9 @@ public void onUnknownDeploymentsOf( this.rpcTimeout = jobMasterConfiguration.getRpcTimeout(); this.highAvailabilityServices = checkNotNull(highAvailabilityService); this.blobWriter = jobManagerSharedServices.getBlobWriter(); - this.futureExecutor = jobManagerSharedServices.getFutureExecutor(); - this.ioExecutor = jobManagerSharedServices.getIoExecutor(); + this.futureExecutor = + MdcUtils.scopeToJob(jid, jobManagerSharedServices.getFutureExecutor()); + this.ioExecutor = MdcUtils.scopeToJob(jid, jobManagerSharedServices.getIoExecutor()); this.jobCompletionActions = checkNotNull(jobCompletionActions); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.userCodeLoader = checkNotNull(userCodeLoader); @@ -313,11 +323,6 @@ public void onUnknownDeploymentsOf( .getConfiguration() .get(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME); - final String jobName = jobGraph.getName(); - final JobID jid = jobGraph.getJobID(); - - log.info("Initializing job '{}' ({}).", jobName, jid); - resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 45aa27eca692e..422bbecb30e5d 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -85,6 +85,8 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkExpectedException; +import org.apache.flink.util.MdcUtils; +import org.apache.flink.util.MdcUtils.MdcCloseable; import org.apache.flink.util.concurrent.FutureUtils; import javax.annotation.Nullable; @@ -435,7 +437,7 @@ public CompletableFuture registerJobMaster( new FlinkException(declineMessage)); } }, - getMainThreadExecutor()); + getMainThreadExecutor(jobId)); // handle exceptions which might have occurred in one of the futures inputs of combine return registrationResponseFuture.handleAsync( @@ -572,30 +574,34 @@ public void disconnectJobManager( public CompletableFuture declareRequiredResources( JobMasterId jobMasterId, ResourceRequirements resourceRequirements, Time timeout) { final JobID jobId = resourceRequirements.getJobId(); - final JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId); - - if (null != jobManagerRegistration) { - if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) { - return getReadyToServeFuture() - .thenApply( - acknowledge -> { - validateRunsInMainThread(); - slotManager.processResourceRequirements(resourceRequirements); - return null; - }); + try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) { + final JobManagerRegistration jobManagerRegistration = + jobManagerRegistrations.get(jobId); + + if (null != jobManagerRegistration) { + if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) { + return getReadyToServeFuture() + .thenApply( + acknowledge -> { + validateRunsInMainThread(); + slotManager.processResourceRequirements( + resourceRequirements); + return null; + }); + } else { + return FutureUtils.completedExceptionally( + new ResourceManagerException( + "The job leader's id " + + jobManagerRegistration.getJobMasterId() + + " does not match the received id " + + jobMasterId + + '.')); + } } else { return FutureUtils.completedExceptionally( new ResourceManagerException( - "The job leader's id " - + jobManagerRegistration.getJobMasterId() - + " does not match the received id " - + jobMasterId - + '.')); + "Could not find registered job manager for job " + jobId + '.')); } - } else { - return FutureUtils.completedExceptionally( - new ResourceManagerException( - "Could not find registered job manager for job " + jobId + '.')); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index e75714425a237..8be2e04d24a6b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -142,6 +142,8 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkExpectedException; +import org.apache.flink.util.MdcUtils; +import org.apache.flink.util.MdcUtils.MdcCloseable; import org.apache.flink.util.OptionalConsumer; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; @@ -640,8 +642,10 @@ public CompletableFuture requestThreadInfoSamples( public CompletableFuture submitTask( TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) { - try { - final JobID jobId = tdd.getJobId(); + final JobID jobId = tdd.getJobId(); + // todo: consider adding task info + try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) { + final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId(); final JobTable.Connection jobManagerConnection = @@ -817,7 +821,7 @@ public CompletableFuture submitTask( taskManagerConfiguration, taskMetricGroup, partitionStateChecker, - getRpcService().getScheduledExecutor(), + MdcUtils.scopeToJob(jobId, getRpcService().getScheduledExecutor()), channelStateExecutorFactoryManager.getOrCreateExecutorFactory(jobId)); taskMetricGroup.gauge(MetricNames.IS_BACK_PRESSURED, task::isBackPressured); @@ -905,13 +909,17 @@ public CompletableFuture cancelTask( final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { - try { - task.cancelExecution(); - return CompletableFuture.completedFuture(Acknowledge.get()); - } catch (Throwable t) { - return FutureUtils.completedExceptionally( - new TaskException( - "Cannot cancel task for execution " + executionAttemptID + '.', t)); + try (MdcCloseable ignored = + MdcUtils.withContext(MdcUtils.asContextData(task.getJobID()))) { + try { + task.cancelExecution(); + return CompletableFuture.completedFuture(Acknowledge.get()); + } catch (Throwable t) { + return FutureUtils.completedExceptionally( + new TaskException( + "Cannot cancel task for execution " + executionAttemptID + '.', + t)); + } } } else { final String message = @@ -1039,18 +1047,19 @@ public CompletableFuture triggerCheckpoint( long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) { - log.debug( - "Trigger checkpoint {}@{} for {}.", - checkpointId, - checkpointTimestamp, - executionAttemptID); - final Task task = taskSlotTable.getTask(executionAttemptID); - if (task != null) { - task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions); + try (MdcCloseable ignored = + MdcUtils.withContext(MdcUtils.asContextData(task.getJobID()))) { + log.debug( + "Trigger checkpoint {}@{} for {}.", + checkpointId, + checkpointTimestamp, + executionAttemptID); + task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions); - return CompletableFuture.completedFuture(Acknowledge.get()); + return CompletableFuture.completedFuture(Acknowledge.get()); + } } else { final String message = "TaskManager received a checkpoint request for unknown task " @@ -1070,20 +1079,21 @@ public CompletableFuture confirmCheckpoint( long completedCheckpointId, long completedCheckpointTimestamp, long lastSubsumedCheckpointId) { - log.debug( - "Confirm completed checkpoint {}@{} and last subsumed checkpoint {} for {}.", - completedCheckpointId, - completedCheckpointTimestamp, - lastSubsumedCheckpointId, - executionAttemptID); - final Task task = taskSlotTable.getTask(executionAttemptID); - if (task != null) { - task.notifyCheckpointComplete(completedCheckpointId); - - task.notifyCheckpointSubsumed(lastSubsumedCheckpointId); - return CompletableFuture.completedFuture(Acknowledge.get()); + try (MdcCloseable ignored = + MdcUtils.withContext(MdcUtils.asContextData(task.getJobID()))) { + log.debug( + "Confirm completed checkpoint {}@{} and last subsumed checkpoint {} for {}.", + completedCheckpointId, + completedCheckpointTimestamp, + lastSubsumedCheckpointId, + executionAttemptID); + task.notifyCheckpointComplete(completedCheckpointId); + + task.notifyCheckpointSubsumed(lastSubsumedCheckpointId); + return CompletableFuture.completedFuture(Acknowledge.get()); + } } else { final String message = "TaskManager received a checkpoint confirmation for unknown task " @@ -1146,37 +1156,40 @@ public CompletableFuture requestSlot( // TODO: Filter invalid requests from the resource manager by using the // instance/registration Id - log.info( - "Receive slot request {} for job {} from resource manager with leader id {}.", - allocationId, - jobId, - resourceManagerId); + try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) { + log.info( + "Receive slot request {} for job {} from resource manager with leader id {}.", + allocationId, + jobId, + resourceManagerId); - if (!isConnectedToResourceManager(resourceManagerId)) { - final String message = - String.format( - "TaskManager is not connected to the resource manager %s.", - resourceManagerId); - log.debug(message); - return FutureUtils.completedExceptionally(new TaskManagerException(message)); - } + if (!isConnectedToResourceManager(resourceManagerId)) { + final String message = + String.format( + "TaskManager is not connected to the resource manager %s.", + resourceManagerId); + log.debug(message); + return FutureUtils.completedExceptionally(new TaskManagerException(message)); + } - tryPersistAllocationSnapshot( - new SlotAllocationSnapshot( - slotId, jobId, targetAddress, allocationId, resourceProfile)); + tryPersistAllocationSnapshot( + new SlotAllocationSnapshot( + slotId, jobId, targetAddress, allocationId, resourceProfile)); - try { - final boolean isConnected = - allocateSlotForJob(jobId, slotId, allocationId, resourceProfile, targetAddress); + try { + final boolean isConnected = + allocateSlotForJob( + jobId, slotId, allocationId, resourceProfile, targetAddress); - if (isConnected) { - offerSlotsToJobManager(jobId); - } + if (isConnected) { + offerSlotsToJobManager(jobId); + } - return CompletableFuture.completedFuture(Acknowledge.get()); - } catch (SlotAllocationException e) { - log.debug("Could not allocate slot for allocation id {}.", allocationId, e); - return FutureUtils.completedExceptionally(e); + return CompletableFuture.completedFuture(Acknowledge.get()); + } catch (SlotAllocationException e) { + log.debug("Could not allocate slot for allocation id {}.", allocationId, e); + return FutureUtils.completedExceptionally(e); + } } } @@ -1266,15 +1279,17 @@ public CompletableFuture freeSlot( @Override public void freeInactiveSlots(JobID jobId, Time timeout) { - log.debug("Freeing inactive slots for job {}.", jobId); - - // need a copy to prevent ConcurrentModificationExceptions - final ImmutableList> inactiveSlots = - ImmutableList.copyOf(taskSlotTable.getAllocatedSlots(jobId)); - for (TaskSlot slot : inactiveSlots) { - freeSlotInternal( - slot.getAllocationId(), - new FlinkException("Slot was re-claimed by resource manager.")); + try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) { + log.debug("Freeing inactive slots for job {}.", jobId); + + // need a copy to prevent ConcurrentModificationExceptions + final ImmutableList> inactiveSlots = + ImmutableList.copyOf(taskSlotTable.getAllocatedSlots(jobId)); + for (TaskSlot slot : inactiveSlots) { + freeSlotInternal( + slot.getAllocationId(), + new FlinkException("Slot was re-claimed by resource manager.")); + } } } @@ -1338,16 +1353,22 @@ public CompletableFuture> requestMetricQueryService @Override public void disconnectJobManager(JobID jobId, Exception cause) { - jobTable.getConnection(jobId) - .ifPresent( - jobManagerConnection -> - disconnectAndTryReconnectToJobManager(jobManagerConnection, cause)); + try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) { + jobTable.getConnection(jobId) + .ifPresent( + jobManagerConnection -> + disconnectAndTryReconnectToJobManager( + jobManagerConnection, cause)); + } } private void disconnectAndTryReconnectToJobManager( JobTable.Connection jobManagerConnection, Exception cause) { - disconnectJobManagerConnection(jobManagerConnection, cause); - jobLeaderService.reconnect(jobManagerConnection.getJobId()); + try (MdcCloseable ignored = + MdcUtils.withContext(MdcUtils.asContextData(jobManagerConnection.getJobId()))) { + disconnectJobManagerConnection(jobManagerConnection, cause); + jobLeaderService.reconnect(jobManagerConnection.getJobId()); + } } @Override @@ -1658,7 +1679,7 @@ private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnec acceptedSlotsFuture.whenCompleteAsync( handleAcceptedSlotOffers( jobId, jobMasterGateway, jobMasterId, reservedSlots, slotOfferId), - getMainThreadExecutor()); + getMainThreadExecutor(jobId)); } else { log.debug("There are no unassigned slots for the job {}.", jobId); } @@ -2061,13 +2082,17 @@ private void freeSlotInternal(AllocationID allocationId, Throwable cause) { // only respond to freeing slots when not shutting down to avoid freeing slot allocation // information if (isRunning()) { - log.debug( - "Free slot with allocation id {} because: {}", - allocationId, - cause.getMessage()); + final JobID jobId = taskSlotTable.getOwningJob(allocationId); + try (MdcCloseable ignored = + MdcUtils.withContext( + jobId == null + ? Collections.emptyMap() + : MdcUtils.asContextData(jobId))) { - try { - final JobID jobId = taskSlotTable.getOwningJob(allocationId); + log.debug( + "Free slot with allocation id {} because: {}", + allocationId, + cause.getMessage()); final int slotIndex = taskSlotTable.freeSlot(allocationId, cause); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 877279326e9b2..f23ec1f3a3b46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -87,6 +87,8 @@ import org.apache.flink.util.FatalExitExceptionHandler; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.MdcUtils; +import org.apache.flink.util.MdcUtils.MdcCloseable; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TaskManagerExceptionUtils; @@ -562,7 +564,7 @@ public void startTaskThread() { /** The core work method that bootstraps the task and executes its code. */ @Override public void run() { - try { + try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) { doRun(); } finally { terminationFuture.complete(executionState); @@ -1244,7 +1246,8 @@ void cancelOrFailAndCancelInvokableInternal(ExecutionState targetState, Throwabl invokable, executingThread, taskNameWithSubtask, - taskCancellationInterval); + taskCancellationInterval, + jobId); Thread interruptingThread = new Thread( @@ -1266,7 +1269,8 @@ void cancelOrFailAndCancelInvokableInternal(ExecutionState targetState, Throwabl taskInfo, executingThread, taskManagerActions, - taskCancellationTimeout); + taskCancellationTimeout, + jobId); Thread watchDogThread = new Thread( @@ -1661,7 +1665,7 @@ private class TaskCanceler implements Runnable { @Override public void run() { - try { + try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) { // the user-defined cancel method may throw errors. // we need do continue despite that try { @@ -1708,23 +1712,27 @@ private static final class TaskInterrupter implements Runnable { /** The interval in which we interrupt. */ private final long interruptIntervalMillis; + private final JobID jobID; + TaskInterrupter( Logger log, TaskInvokable task, Thread executorThread, String taskName, - long interruptIntervalMillis) { + long interruptIntervalMillis, + JobID jobID) { this.log = log; this.task = task; this.executorThread = executorThread; this.taskName = taskName; this.interruptIntervalMillis = interruptIntervalMillis; + this.jobID = jobID; } @Override public void run() { - try { + try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobID))) { // we initially wait for one interval // in most cases, the threads go away immediately (by the cancellation thread) // and we need not actually do anything @@ -1765,11 +1773,14 @@ private static class TaskCancelerWatchDog implements Runnable { private final TaskInfo taskInfo; + private final JobID jobID; + TaskCancelerWatchDog( TaskInfo taskInfo, Thread executorThread, TaskManagerActions taskManager, - long timeoutMillis) { + long timeoutMillis, + JobID jobID) { checkArgument(timeoutMillis > 0); @@ -1777,11 +1788,12 @@ private static class TaskCancelerWatchDog implements Runnable { this.executorThread = executorThread; this.taskManager = taskManager; this.timeoutMillis = timeoutMillis; + this.jobID = jobID; } @Override public void run() { - try { + try (MdcCloseable ign = MdcUtils.withContext(MdcUtils.asContextData(jobID))) { Deadline timeout = Deadline.fromNow(Duration.ofMillis(timeoutMillis)); while (executorThread.isAlive() && timeout.hasTimeLeft()) { try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java index ea31130b3fcac..8f9f073154c70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java @@ -87,7 +87,7 @@ private void testCloseAfterSubmit( Object registerLock = new Object(); ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl( - NO_OP, closingDeque, 5, registerLock, e -> {}); + NO_OP, closingDeque, 5, registerLock, e -> {}, new JobID()); closingDeque.setWorker(worker); synchronized (registerLock) { worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX); @@ -109,7 +109,7 @@ private void testSubmitFailure( Object registerLock = new Object(); ChannelStateWriteRequestExecutorImpl executor = new ChannelStateWriteRequestExecutorImpl( - NO_OP, deque, 5, registerLock, e -> {}); + NO_OP, deque, 5, registerLock, e -> {}, new JobID()); synchronized (registerLock) { executor.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX); } @@ -134,7 +134,7 @@ void testCleanup() throws IOException { Object registerLock = new Object(); ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl( - requestProcessor, deque, 5, registerLock, e -> {}); + requestProcessor, deque, 5, registerLock, e -> {}, new JobID()); synchronized (registerLock) { worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX); } @@ -153,7 +153,7 @@ void testIgnoresInterruptsWhileRunning() throws Exception { Object registerLock = new Object(); ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl( - requestProcessor, deque, 5, registerLock, e -> {}); + requestProcessor, deque, 5, registerLock, e -> {}, new JobID()); synchronized (registerLock) { worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX); } @@ -180,7 +180,8 @@ void testCanBeClosed() throws Exception { new ChannelStateSerializerImpl()); Object registerLock = new Object(); ChannelStateWriteRequestExecutorImpl worker = - new ChannelStateWriteRequestExecutorImpl(processor, 5, e -> {}, registerLock); + new ChannelStateWriteRequestExecutorImpl( + processor, 5, e -> {}, registerLock, new JobID()); synchronized (registerLock) { worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX); } @@ -273,7 +274,7 @@ public void dispatch(ChannelStateWriteRequest request) { Object registerLock = new Object(); ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl( - throwingRequestProcessor, 5, e -> {}, registerLock); + throwingRequestProcessor, 5, e -> {}, registerLock, new JobID()); synchronized (registerLock) { worker.registerSubtask(JOB_VERTEX_ID, subtaskIndex0); worker.registerSubtask(JOB_VERTEX_ID, subtaskIndex1); @@ -319,7 +320,7 @@ public void dispatch(ChannelStateWriteRequest request) { Object registerLock = new Object(); ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl( - throwingRequestProcessor, deque, 5, registerLock, e -> {}); + throwingRequestProcessor, deque, 5, registerLock, e -> {}, new JobID()); synchronized (registerLock) { worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX); } @@ -342,7 +343,8 @@ public void dispatch(ChannelStateWriteRequest request) { void testSubmitRequestOfUnregisteredSubtask() throws Exception { Object registerLock = new Object(); ChannelStateWriteRequestExecutorImpl worker = - new ChannelStateWriteRequestExecutorImpl(NO_OP, 5, e -> {}, registerLock); + new ChannelStateWriteRequestExecutorImpl( + NO_OP, 5, e -> {}, registerLock, new JobID()); synchronized (registerLock) { worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX); } @@ -366,7 +368,8 @@ void testSubmitRequestOfUnregisteredSubtask() throws Exception { void testSubmitPriorityUnreadyRequest() throws Exception { Object registerLock = new Object(); ChannelStateWriteRequestExecutorImpl worker = - new ChannelStateWriteRequestExecutorImpl(NO_OP, 5, e -> {}, registerLock); + new ChannelStateWriteRequestExecutorImpl( + NO_OP, 5, e -> {}, registerLock, new JobID()); synchronized (registerLock) { worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX); } @@ -390,7 +393,7 @@ void testRegisterSubtaskAfterRegisterCompleted() throws Exception { Object registerLock = new Object(); ChannelStateWriteRequestExecutorImpl worker = new ChannelStateWriteRequestExecutorImpl( - NO_OP, maxSubtasksPerChannelStateFile, e -> {}, registerLock); + NO_OP, maxSubtasksPerChannelStateFile, e -> {}, registerLock, new JobID()); synchronized (registerLock) { for (int i = 0; i < maxSubtasksPerChannelStateFile; i++) { assertThat(worker.isRegistering()).isTrue(); @@ -429,7 +432,8 @@ public void dispatch(ChannelStateWriteRequest request) { dispatcher, maxSubtasksPerChannelStateFile, workerFuture::complete, - registerLock); + registerLock, + new JobID()); worker.start(); synchronized (registerLock) { worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX); @@ -467,7 +471,8 @@ void testReleaseSubtaskBeforeRegisterCompleted() throws Exception { new TestRequestDispatcher(), maxSubtasksPerChannelStateFile, workerFuture::complete, - registerLock); + registerLock, + new JobID()); worker.start(); synchronized (registerLock) { worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java index d61ba418d0de3..afabbeff5920b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -23,6 +23,8 @@ import org.apache.flink.util.concurrent.ScheduledExecutor; import java.io.Serializable; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -195,8 +197,9 @@ public C getSelfGateway(Class selfGatewayType, RpcServ } @Override - public RpcServer startServer(C rpcEndpoint) { - return backingRpcService.startServer(rpcEndpoint); + public RpcServer startServer( + C rpcEndpoint, Map loggingContext) { + return backingRpcService.startServer(rpcEndpoint, Collections.emptyMap()); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 2391271156251..d16bc40a8bec0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -110,6 +110,7 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.MdcUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.clock.SystemClock; @@ -416,8 +417,10 @@ protected StreamTask( resourceCloser.registerCloseable(mailboxProcessor); this.channelIOExecutor = - Executors.newSingleThreadExecutor( - new ExecutorThreadFactory("channel-state-unspilling")); + MdcUtils.scopeToJob( + environment.getJobID(), + Executors.newSingleThreadExecutor( + new ExecutorThreadFactory("channel-state-unspilling"))); resourceCloser.registerCloseable(channelIOExecutor::shutdown); this.recordWriter = createRecordWriterDelegate(configuration, environment); @@ -436,13 +439,16 @@ protected StreamTask( // for simultaneous N ongoing concurrent checkpoints and for example clean up of one // aborted one. this.asyncOperationsThreadPool = - new ThreadPoolExecutor( - 0, - configuration.getMaxConcurrentCheckpoints() + 1, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler)); + MdcUtils.scopeToJob( + getEnvironment().getJobID(), + new ThreadPoolExecutor( + 0, + configuration.getMaxConcurrentCheckpoints() + 1, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ExecutorThreadFactory( + "AsyncOperations", uncaughtExceptionHandler))); // Register all asynchronous checkpoint threads. resourceCloser.registerCloseable(this::shutdownAsyncThreads); diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java index 8ca1b5cc1da35..a1e7367713de1 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; /** * Utility for auditing logged messages.(Junit5 extension) @@ -47,17 +48,31 @@ public class LoggerAuditingExtension implements BeforeEachCallback, AfterEachCal private final String loggerName; private final org.slf4j.event.Level level; - private ConcurrentLinkedQueue loggingEvents; + private ConcurrentLinkedQueue loggingEvents; public LoggerAuditingExtension(Class clazz, org.slf4j.event.Level level) { - this.loggerName = clazz.getCanonicalName(); + this(clazz.getCanonicalName(), level); + } + + public LoggerAuditingExtension(String loggerName, org.slf4j.event.Level level) { + this.loggerName = loggerName; this.level = level; } public List getMessages() { + return loggingEvents.stream() + .map(e -> e.getMessage().getFormattedMessage()) + .collect(Collectors.toList()); + } + + public List getEvents() { return new ArrayList<>(loggingEvents); } + public String getLoggerName() { + return loggerName; + } + @Override public void beforeEach(ExtensionContext context) throws Exception { loggingEvents = new ConcurrentLinkedQueue<>(); @@ -66,7 +81,7 @@ public void beforeEach(ExtensionContext context) throws Exception { new AbstractAppender("test-appender", null, null, false, Property.EMPTY_ARRAY) { @Override public void append(LogEvent event) { - loggingEvents.add(event.getMessage().getFormattedMessage()); + loggingEvents.add(event.toImmutable()); } }; testAppender.start(); diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 6f10bdfd43dbe..8c46a28d0a7d4 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -44,7 +44,7 @@ under the License. - + org.apache.flink flink-core diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java index aceabcea7c4ea..2ef37eddd1396 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java @@ -56,6 +56,7 @@ import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -382,8 +383,9 @@ public > CompletableFuture } @Override - public RpcServer startServer(C rpcEndpoint) { - return rpcService.startServer(rpcEndpoint); + public RpcServer startServer( + C rpcEndpoint, Map loggingContext) { + return rpcService.startServer(rpcEndpoint, Collections.emptyMap()); } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java new file mode 100644 index 0000000000000..3380698feb79c --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.misc; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.core.execution.CheckpointType; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.testutils.logging.LoggerAuditingExtension; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MdcUtils; + +import org.apache.logging.log4j.core.LogEvent; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.slf4j.event.Level.DEBUG; + +/** + * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the most important cases. + */ +public class JobIDLoggingITCase { + private static final Logger logger = LoggerFactory.getLogger(JobIDLoggingITCase.class); + + @RegisterExtension + public final LoggerAuditingExtension checkpointCoordinatorLogging = + new LoggerAuditingExtension(CheckpointCoordinator.class, DEBUG); + + @RegisterExtension + public final LoggerAuditingExtension streamTaskLogging = + new LoggerAuditingExtension(StreamTask.class, DEBUG); + + @RegisterExtension + public final LoggerAuditingExtension taskExecutorLogging = + new LoggerAuditingExtension(TaskExecutor.class, DEBUG); + + @RegisterExtension + public final LoggerAuditingExtension taskLogging = + new LoggerAuditingExtension(Task.class, DEBUG); + + @RegisterExtension + public final LoggerAuditingExtension executionGraphLogging = + new LoggerAuditingExtension(ExecutionGraph.class, DEBUG); + + @RegisterExtension + public final LoggerAuditingExtension jobMasterLogging = + new LoggerAuditingExtension(JobMaster.class, DEBUG); + + @RegisterExtension + public final LoggerAuditingExtension asyncCheckpointRunnableLogging = + // this class is private + new LoggerAuditingExtension( + "org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable", DEBUG); + + @RegisterExtension + public static MiniClusterExtension miniClusterResource = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + @Test + public void testJobIDLogging(@InjectClusterClient ClusterClient clusterClient) + throws Exception { + JobID jobID = runJob(clusterClient); + clusterClient.cancel(jobID).get(); + + // NOTE: most of the assertions are empirical, such as + // - which classes are important + // - how many messages to expect + // - which log patterns to ignore + + assertJobIDPresent(jobID, 3, checkpointCoordinatorLogging); + assertJobIDPresent(jobID, 6, streamTaskLogging); + assertJobIDPresent( + jobID, + 9, + taskExecutorLogging, + "Un-registering task.*", + "Successful registration.*", + "Establish JobManager connection.*", + "Offer reserved slots.*", + ".*ResourceManager.*", + "Operator event.*"); + + assertJobIDPresent(jobID, 10, taskLogging); + assertJobIDPresent(jobID, 10, executionGraphLogging); + assertJobIDPresent( + jobID, + 15, + jobMasterLogging, + "Registration at ResourceManager.*", + "Registration with ResourceManager.*", + "Resolved ResourceManager address.*"); + assertJobIDPresent(jobID, 1, asyncCheckpointRunnableLogging); + } + + private static void assertJobIDPresent( + JobID jobID, + int expectedLogMessages, + LoggerAuditingExtension ext, + String... ignPatterns) { + String loggerName = ext.getLoggerName(); + checkState( + ext.getEvents().size() >= expectedLogMessages, + "Too few log events recorded for %s (%s) - this must be a bug in the test code", + loggerName, + ext.getEvents().size()); + + final List eventsWithMissingJobId = new ArrayList<>(); + final List eventsWithWrongJobId = new ArrayList<>(); + final List ignoredEvents = new ArrayList<>(); + final List ignorePatterns = + Arrays.stream(ignPatterns).map(Pattern::compile).collect(Collectors.toList()); + + for (LogEvent e : ext.getEvents()) { + if (e.getContextData().containsKey(MdcUtils.JOB_ID)) { + if (!Objects.equals( + e.getContextData().getValue(MdcUtils.JOB_ID), jobID.toHexString())) { + eventsWithWrongJobId.add(e); + } + } else if (matchesAny(ignorePatterns, e.getMessage().getFormattedMessage())) { + ignoredEvents.add(e); + } else { + eventsWithMissingJobId.add(e); + } + } + logger.debug( + "checked events for {}:\n {};\n ignored: {},\n wrong job id: {},\n missing job id: {}", + loggerName, + ext.getEvents(), + ignoredEvents, + eventsWithWrongJobId, + eventsWithMissingJobId); + assertThat(eventsWithWrongJobId).as("events with a wrong Job ID").isEmpty(); + assertTrue( + eventsWithMissingJobId.isEmpty(), + "too many events without Job ID recorded for " + + loggerName + + ": " + + eventsWithMissingJobId); + } + + private static boolean matchesAny(List patternStream, String message) { + return patternStream.stream().anyMatch(p -> p.matcher(message).matches()); + } + + private static JobID runJob(ClusterClient clusterClient) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).addSink(new DiscardingSink<>()); + JobID jobId = clusterClient.submitJob(env.getStreamGraph().getJobGraph()).get(); + Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5)); + while (deadline.hasTimeLeft() + && clusterClient.listJobs().get().stream() + .noneMatch( + m -> + m.getJobId().equals(jobId) + && m.getJobState().equals(JobStatus.RUNNING))) { + Thread.sleep(10); + } + // wait for all tasks ready and then checkpoint + while (true) { + try { + clusterClient.triggerCheckpoint(jobId, CheckpointType.DEFAULT).get(); + return jobId; + } catch (ExecutionException e) { + if (ExceptionUtils.findThrowable(e, CheckpointException.class).isPresent() + && !deadline.isOverdue()) { + Thread.sleep(10); + } else { + throw e; + } + } + } + } +} diff --git a/flink-tests/src/test/resources/log4j2-test.properties b/flink-tests/src/test/resources/log4j2-test.properties index c5d9b0f65be1f..843e105b0ea7d 100644 --- a/flink-tests/src/test/resources/log4j2-test.properties +++ b/flink-tests/src/test/resources/log4j2-test.properties @@ -28,7 +28,7 @@ appender.testlogger.name = TestLogger appender.testlogger.type = CONSOLE appender.testlogger.target = SYSTEM_ERR appender.testlogger.layout.type = PatternLayout -appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n +appender.testlogger.layout.pattern = [%-32X{flink-job-id}] %c{0} [%t] %-5p %m%n logger.migration.name = org.apache.flink.test.migration logger.migration.level = INFO