Skip to content

Commit

Permalink
Consistent TaskDecorator and ErrorHandler support in schedulers
Browse files Browse the repository at this point in the history
Closes gh-23755
Closes gh-32460
  • Loading branch information
jhoeller committed Mar 15, 2024
1 parent cf31d08 commit 66235fa
Show file tree
Hide file tree
Showing 8 changed files with 731 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -199,6 +199,10 @@ private TaskExecutorAdapter getAdaptedExecutor(Executor concurrentExecutor) {
return adapter;
}

Runnable decorateTaskIfNecessary(Runnable task) {
return (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
}


/**
* TaskExecutorAdapter subclass that wraps all provided Runnables and Callables
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,8 +20,10 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -39,6 +41,7 @@
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.concurrent.ListenableFuture;

/**
* Adapter that takes a {@code java.util.concurrent.ScheduledExecutorService} and
Expand Down Expand Up @@ -191,6 +194,7 @@ public void setErrorHandler(ErrorHandler errorHandler) {
* @see Clock#systemDefaultZone()
*/
public void setClock(Clock clock) {
Assert.notNull(clock, "Clock must not be null");
this.clock = clock;
}

Expand All @@ -200,6 +204,33 @@ public Clock getClock() {
}


@Override
public void execute(Runnable task) {
super.execute(TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, false));
}

@Override
public Future<?> submit(Runnable task) {
return super.submit(TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, false));
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(new DelegatingErrorHandlingCallable<>(task, this.errorHandler));
}

@SuppressWarnings("deprecation")
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
return super.submitListenable(TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, false));
}

@SuppressWarnings("deprecation")
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
return super.submitListenable(new DelegatingErrorHandlingCallable<>(task, this.errorHandler));
}

@Override
@Nullable
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
Expand All @@ -211,7 +242,9 @@ public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
else {
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
return new ReschedulingRunnable(task, trigger, this.clock, scheduleExecutorToUse, errorHandler).schedule();
return new ReschedulingRunnable(
decorateTaskIfNecessary(task), trigger, this.clock, scheduleExecutorToUse, errorHandler)
.schedule();
}
}
catch (RejectedExecutionException ex) {
Expand Down Expand Up @@ -283,6 +316,7 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay)

private Runnable decorateTask(Runnable task, boolean isRepeatingTask) {
Runnable result = TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
result = decorateTaskIfNecessary(result);
if (this.enterpriseConcurrentScheduler) {
result = ManagedTaskBuilder.buildManagedTask(result, task.toString());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.scheduling.concurrent;

import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.Callable;

import org.springframework.lang.Nullable;
import org.springframework.scheduling.support.TaskUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.ReflectionUtils;

/**
* {@link Callable} adapter for an {@link ErrorHandler}.
*
* @author Juergen Hoeller
* @since 6.2
* @param <V> the value type
*/
class DelegatingErrorHandlingCallable<V> implements Callable<V> {

private final Callable<V> delegate;

private final ErrorHandler errorHandler;


public DelegatingErrorHandlingCallable(Callable<V> delegate, @Nullable ErrorHandler errorHandler) {
this.delegate = delegate;
this.errorHandler = (errorHandler != null ? errorHandler :
TaskUtils.getDefaultErrorHandler(false));
}


@Override
@Nullable
public V call() throws Exception {
try {
return this.delegate.call();
}
catch (Throwable ex) {
try {
this.errorHandler.handleError(ex);
}
catch (UndeclaredThrowableException exToPropagate) {
ReflectionUtils.rethrowException(exToPropagate.getUndeclaredThrowable());
}
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -41,7 +42,9 @@
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.DelegatingErrorHandlingRunnable;
import org.springframework.scheduling.support.TaskUtils;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.concurrent.ListenableFuture;

/**
* A simple implementation of Spring's {@link TaskScheduler} interface, using
Expand Down Expand Up @@ -108,6 +111,9 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements

private final ExecutorLifecycleDelegate lifecycleDelegate = new ExecutorLifecycleDelegate(this.scheduledExecutor);

@Nullable
private ErrorHandler errorHandler;

private Clock clock = Clock.systemDefaultZone();

private int phase = DEFAULT_PHASE;
Expand All @@ -119,13 +125,22 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements
private ApplicationContext applicationContext;


/**
* Provide an {@link ErrorHandler} strategy.
* @since 6.2
*/
public void setErrorHandler(ErrorHandler errorHandler) {
Assert.notNull(errorHandler, "ErrorHandler must not be null");
this.errorHandler = errorHandler;
}

/**
* Set the clock to use for scheduling purposes.
* <p>The default clock is the system clock for the default time zone.
* @since 5.3
* @see Clock#systemDefaultZone()
*/
public void setClock(Clock clock) {
Assert.notNull(clock, "Clock must not be null");
this.clock = clock;
}

Expand Down Expand Up @@ -194,15 +209,19 @@ protected void doExecute(Runnable task) {
}

private Runnable taskOnSchedulerThread(Runnable task) {
return new DelegatingErrorHandlingRunnable(task, TaskUtils.getDefaultErrorHandler(true));
return new DelegatingErrorHandlingRunnable(task,
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true)));
}

private Runnable scheduledTask(Runnable task) {
return () -> execute(new DelegatingErrorHandlingRunnable(task, this::shutdownAwareErrorHandler));
}

private void shutdownAwareErrorHandler(Throwable ex) {
if (this.scheduledExecutor.isTerminated()) {
if (this.errorHandler != null) {
this.errorHandler.handleError(ex);
}
else if (this.scheduledExecutor.isTerminated()) {
LogFactory.getLog(getClass()).debug("Ignoring scheduled task exception after shutdown", ex);
}
else {
Expand All @@ -211,12 +230,40 @@ private void shutdownAwareErrorHandler(Throwable ex) {
}


@Override
public void execute(Runnable task) {
super.execute(TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, false));
}

@Override
public Future<?> submit(Runnable task) {
return super.submit(TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, false));
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(new DelegatingErrorHandlingCallable<>(task, this.errorHandler));
}

@SuppressWarnings("deprecation")
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
return super.submitListenable(TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, false));
}

@SuppressWarnings("deprecation")
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
return super.submitListenable(new DelegatingErrorHandlingCallable<>(task, this.errorHandler));
}

@Override
@Nullable
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
try {
Runnable delegate = scheduledTask(task);
ErrorHandler errorHandler = TaskUtils.getDefaultErrorHandler(true);
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
return new ReschedulingRunnable(
delegate, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();
}
Expand Down
Loading

0 comments on commit 66235fa

Please sign in to comment.