forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
4171b98
commit d6a4eb9
Showing
31 changed files
with
1,148 additions
and
258 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
39 changes: 39 additions & 0 deletions
39
flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.util; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
import java.util.concurrent.Executor; | ||
|
||
import static org.apache.flink.util.Preconditions.checkNotNull; | ||
|
||
class MdcAwareExecutor<T extends Executor> implements Executor { | ||
protected final Map<String, String> contextData; | ||
protected final T delegate; | ||
|
||
protected MdcAwareExecutor(T delegate, Map<String, String> contextData) { | ||
this.delegate = checkNotNull(delegate); | ||
this.contextData = Collections.unmodifiableMap(checkNotNull(contextData)); | ||
} | ||
|
||
public void execute(Runnable command) { | ||
delegate.execute(MdcUtils.wrapRunnable(contextData, command)); | ||
} | ||
} |
114 changes: 114 additions & 0 deletions
114
flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutorService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.util; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
import static org.apache.flink.util.MdcUtils.wrapCallable; | ||
import static org.apache.flink.util.MdcUtils.wrapRunnable; | ||
|
||
class MdcAwareExecutorService<S extends ExecutorService> extends MdcAwareExecutor<S> | ||
implements ExecutorService { | ||
|
||
public MdcAwareExecutorService(S delegate, Map<String, String> contextData) { | ||
super(delegate, contextData); | ||
} | ||
|
||
@Override | ||
public void shutdown() { | ||
delegate.shutdown(); | ||
} | ||
|
||
@Override | ||
public List<Runnable> shutdownNow() { | ||
return delegate.shutdownNow(); | ||
} | ||
|
||
@Override | ||
public boolean isShutdown() { | ||
return delegate.isShutdown(); | ||
} | ||
|
||
@Override | ||
public boolean isTerminated() { | ||
return delegate.isTerminated(); | ||
} | ||
|
||
@Override | ||
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { | ||
return delegate.awaitTermination(timeout, unit); | ||
} | ||
|
||
@Override | ||
public <T> Future<T> submit(Callable<T> task) { | ||
return delegate.submit(wrapCallable(contextData, task)); | ||
} | ||
|
||
@Override | ||
public <T> Future<T> submit(Runnable task, T result) { | ||
return delegate.submit(wrapRunnable(contextData, task), result); | ||
} | ||
|
||
@Override | ||
public Future<?> submit(Runnable task) { | ||
return delegate.submit(wrapRunnable(contextData, task)); | ||
} | ||
|
||
@Override | ||
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) | ||
throws InterruptedException { | ||
return delegate.invokeAll(wrapCallables(tasks)); | ||
} | ||
|
||
@Override | ||
public <T> List<Future<T>> invokeAll( | ||
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) | ||
throws InterruptedException { | ||
return delegate.invokeAll(wrapCallables(tasks), timeout, unit); | ||
} | ||
|
||
@Override | ||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) | ||
throws InterruptedException, ExecutionException { | ||
return delegate.invokeAny(wrapCallables(tasks)); | ||
} | ||
|
||
@Override | ||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) | ||
throws InterruptedException, ExecutionException, TimeoutException { | ||
return delegate.invokeAny(wrapCallables(tasks), timeout, unit); | ||
} | ||
|
||
private <T> List<Callable<T>> wrapCallables(Collection<? extends Callable<T>> tasks) { | ||
List<Callable<T>> list = new ArrayList<>(tasks.size()); | ||
for (Callable<T> task : tasks) { | ||
list.add(wrapCallable(contextData, task)); | ||
} | ||
return list; | ||
} | ||
} |
61 changes: 61 additions & 0 deletions
61
flink-core/src/main/java/org/apache/flink/util/MdcAwareScheduledExecutorService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.util; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.apache.flink.util.MdcUtils.wrapCallable; | ||
import static org.apache.flink.util.MdcUtils.wrapRunnable; | ||
|
||
class MdcAwareScheduledExecutorService extends MdcAwareExecutorService<ScheduledExecutorService> | ||
implements ScheduledExecutorService { | ||
|
||
public MdcAwareScheduledExecutorService( | ||
ScheduledExecutorService delegate, Map<String, String> contextData) { | ||
super(delegate, contextData); | ||
} | ||
|
||
@Override | ||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { | ||
return delegate.schedule(wrapRunnable(contextData, command), delay, unit); | ||
} | ||
|
||
@Override | ||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { | ||
return delegate.schedule(wrapCallable(contextData, callable), delay, unit); | ||
} | ||
|
||
@Override | ||
public ScheduledFuture<?> scheduleAtFixedRate( | ||
Runnable command, long initialDelay, long period, TimeUnit unit) { | ||
return delegate.scheduleAtFixedRate( | ||
wrapRunnable(contextData, command), initialDelay, period, unit); | ||
} | ||
|
||
@Override | ||
public ScheduledFuture<?> scheduleWithFixedDelay( | ||
Runnable command, long initialDelay, long delay, TimeUnit unit) { | ||
return delegate.scheduleWithFixedDelay( | ||
wrapRunnable(contextData, command), initialDelay, delay, unit); | ||
} | ||
} |
112 changes: 112 additions & 0 deletions
112
flink-core/src/main/java/org/apache/flink/util/MdcUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.util; | ||
|
||
import org.apache.flink.api.common.JobID; | ||
|
||
import org.slf4j.MDC; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
|
||
import static org.apache.flink.util.Preconditions.checkArgument; | ||
|
||
/** Utility class to manage common Flink attributes in {@link MDC} (only {@link JobID} ATM). */ | ||
public class MdcUtils { | ||
|
||
public static final String JOB_ID = "flink-job-id"; | ||
|
||
/** | ||
* Replace MDC contents with the provided one and return a closeable object that can be used to | ||
* restore the original MDC. | ||
* | ||
* @param context to put into MDC | ||
*/ | ||
public static MdcCloseable withContext(Map<String, String> context) { | ||
final Map<String, String> orig = MDC.getCopyOfContextMap(); | ||
MDC.setContextMap(context); | ||
return () -> MDC.setContextMap(orig); | ||
} | ||
|
||
/** {@link AutoCloseable } that restores the {@link MDC} contents on close. */ | ||
public interface MdcCloseable extends AutoCloseable { | ||
@Override | ||
void close(); | ||
} | ||
|
||
/** | ||
* Wrap the given {@link Runnable} so that the given data is added to {@link MDC} before its | ||
* execution and removed afterward. | ||
*/ | ||
public static Runnable wrapRunnable(Map<String, String> contextData, Runnable command) { | ||
return () -> { | ||
try (MdcCloseable ctx = withContext(contextData)) { | ||
command.run(); | ||
} | ||
}; | ||
} | ||
|
||
/** | ||
* Wrap the given {@link Callable} so that the given data is added to {@link MDC} before its | ||
* execution and removed afterward. | ||
*/ | ||
public static <T> Callable<T> wrapCallable( | ||
Map<String, String> contextData, Callable<T> command) { | ||
return () -> { | ||
try (MdcCloseable ctx = withContext(contextData)) { | ||
return command.call(); | ||
} | ||
}; | ||
} | ||
|
||
/** | ||
* Wrap the given {@link Executor} so that the given {@link JobID} is added before it executes | ||
* any submitted commands and removed afterward. | ||
*/ | ||
public static Executor scopeToJob(JobID jobID, Executor executor) { | ||
checkArgument(!(executor instanceof MdcAwareExecutor)); | ||
return new MdcAwareExecutor<>(executor, asContextData(jobID)); | ||
} | ||
|
||
/** | ||
* Wrap the given {@link ExecutorService} so that the given {@link JobID} is added before it | ||
* executes any submitted commands and removed afterward. | ||
*/ | ||
public static ExecutorService scopeToJob(JobID jobID, ExecutorService delegate) { | ||
checkArgument(!(delegate instanceof MdcAwareExecutorService)); | ||
return new MdcAwareExecutorService<>(delegate, asContextData(jobID)); | ||
} | ||
|
||
/** | ||
* Wrap the given {@link ScheduledExecutorService} so that the given {@link JobID} is added | ||
* before it executes any submitted commands and removed afterward. | ||
*/ | ||
public static ScheduledExecutorService scopeToJob(JobID jobID, ScheduledExecutorService ses) { | ||
checkArgument(!(ses instanceof MdcAwareScheduledExecutorService)); | ||
return new MdcAwareScheduledExecutorService(ses, asContextData(jobID)); | ||
} | ||
|
||
public static Map<String, String> asContextData(JobID jobID) { | ||
return Collections.singletonMap(JOB_ID, jobID.toHexString()); | ||
} | ||
} |
Oops, something went wrong.