Skip to content

Commit

Permalink
Leniently accept tasks after context close in lifecycle stop phase
Browse files Browse the repository at this point in the history
Schedulers remain strict, just plain executors are lenient on shutdown now.
An early shutdown for executors can be enforced via setStrictEarlyShutdown.

Closes gh-32226
  • Loading branch information
jhoeller committed Feb 11, 2024
1 parent 4a3ef3e commit a2000db
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ public void setRejectedExecutionHandler(@Nullable RejectedExecutionHandler rejec
* the executor's destruction step, with individual awaiting according to the
* {@link #setAwaitTerminationSeconds "awaitTerminationSeconds"} property.
* <p>This flag will only have effect when the executor is running in a Spring
* application context and able to receive the {@link ContextClosedEvent}.
* application context and able to receive the {@link ContextClosedEvent}. Also,
* note that {@link ThreadPoolTaskExecutor} effectively accepts tasks after context
* close by default, in combination with a coordinated lifecycle stop, unless
* {@link ThreadPoolTaskExecutor#setStrictEarlyShutdown "strictEarlyShutdown"}
* has been specified.
* @since 6.1
* @see org.springframework.context.ConfigurableApplicationContext#close()
* @see DisposableBean#destroy()
Expand Down Expand Up @@ -294,6 +298,9 @@ public void destroy() {
* scheduling of periodic tasks, letting existing tasks complete still.
* This step is non-blocking and can be applied as an early shutdown signal
* before following up with a full {@link #shutdown()} call later on.
* <p>Automatically called for early shutdown signals on
* {@link #onApplicationEvent(ContextClosedEvent) context close}.
* Can be manually called as well, in particular outside a container.
* @since 6.1
* @see #shutdown()
* @see java.util.concurrent.ExecutorService#shutdown()
Expand Down Expand Up @@ -463,11 +470,29 @@ public void onApplicationEvent(ContextClosedEvent event) {
this.lateShutdown = true;
}
else {
// Early shutdown signal: accept no further tasks, let existing tasks complete
// before hitting the actual destruction step in the shutdown() method above.
initiateShutdown();
if (this.lifecycleDelegate != null) {
this.lifecycleDelegate.markShutdown();
}
initiateEarlyShutdown();
}
}
}

/**
* Early shutdown signal: do not trigger further tasks, let existing tasks complete
* before hitting the actual destruction step in the {@link #shutdown()} method.
* This goes along with a {@link #stop(Runnable) coordinated lifecycle stop phase}.
* <p>Called from {@link #onApplicationEvent(ContextClosedEvent)} if no
* indications for a late shutdown have been determined, that is, if the
* {@link #setAcceptTasksAfterContextClose "acceptTasksAfterContextClose} and
* {@link #setWaitForTasksToCompleteOnShutdown "waitForTasksToCompleteOnShutdown"}
* flags have not been set.
* <p>The default implementation calls {@link #initiateShutdown()}.
* @since 6.1.4
* @see #initiateShutdown()
*/
protected void initiateEarlyShutdown() {
initiateShutdown();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,8 @@ final class ExecutorLifecycleDelegate implements SmartLifecycle {

private volatile boolean paused;

private volatile boolean shutdown;

private int executingTaskCount = 0;

@Nullable
Expand Down Expand Up @@ -100,10 +102,14 @@ public boolean isRunning() {
return (!this.paused && !this.executor.isTerminated());
}

void markShutdown() {
this.shutdown = true;
}

void beforeExecute(Thread thread) {
this.pauseLock.lock();
try {
while (this.paused && !this.executor.isShutdown()) {
while (this.paused && !this.shutdown && !this.executor.isShutdown()) {
this.unpaused.await();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,11 +71,13 @@ public class ThreadPoolExecutorFactoryBean extends ExecutorConfigurationSupport

private int keepAliveSeconds = 60;

private int queueCapacity = Integer.MAX_VALUE;

private boolean allowCoreThreadTimeOut = false;

private boolean prestartAllCoreThreads = false;

private int queueCapacity = Integer.MAX_VALUE;
private boolean strictEarlyShutdown = false;

private boolean exposeUnconfigurableExecutor = false;

Expand Down Expand Up @@ -107,6 +109,18 @@ public void setKeepAliveSeconds(int keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
}

/**
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
* Default is {@code Integer.MAX_VALUE}.
* <p>Any positive value will lead to a LinkedBlockingQueue instance;
* any other value will lead to a SynchronousQueue instance.
* @see java.util.concurrent.LinkedBlockingQueue
* @see java.util.concurrent.SynchronousQueue
*/
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}

/**
* Specify whether to allow core threads to time out. This enables dynamic
* growing and shrinking even in combination with a non-zero queue (since
Expand All @@ -129,15 +143,15 @@ public void setPrestartAllCoreThreads(boolean prestartAllCoreThreads) {
}

/**
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
* Default is {@code Integer.MAX_VALUE}.
* <p>Any positive value will lead to a LinkedBlockingQueue instance;
* any other value will lead to a SynchronousQueue instance.
* @see java.util.concurrent.LinkedBlockingQueue
* @see java.util.concurrent.SynchronousQueue
* Specify whether to initiate an early shutdown signal on context close,
* disposing all idle threads and rejecting further task submissions.
* <p>Default is "false".
* See {@link ThreadPoolTaskExecutor#setStrictEarlyShutdown} for details.
* @since 6.1.4
* @see #initiateShutdown()
*/
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
public void setStrictEarlyShutdown(boolean defaultEarlyShutdown) {
this.strictEarlyShutdown = defaultEarlyShutdown;
}

/**
Expand Down Expand Up @@ -222,6 +236,13 @@ protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
}
}

@Override
protected void initiateEarlyShutdown() {
if (this.strictEarlyShutdown) {
super.initiateEarlyShutdown();
}
}


@Override
@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,6 +98,8 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport

private boolean prestartAllCoreThreads = false;

private boolean strictEarlyShutdown = false;

@Nullable
private TaskDecorator taskDecorator;

Expand Down Expand Up @@ -212,14 +214,38 @@ public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {

/**
* Specify whether to start all core threads, causing them to idly wait for work.
* <p>Default is "false".
* <p>Default is "false", starting threads and adding them to the pool on demand.
* @since 5.3.14
* @see java.util.concurrent.ThreadPoolExecutor#prestartAllCoreThreads
*/
public void setPrestartAllCoreThreads(boolean prestartAllCoreThreads) {
this.prestartAllCoreThreads = prestartAllCoreThreads;
}

/**
* Specify whether to initiate an early shutdown signal on context close,
* disposing all idle threads and rejecting further task submissions.
* <p>By default, existing tasks will be allowed to complete within the
* coordinated lifecycle stop phase in any case. This setting just controls
* whether an explicit {@link ThreadPoolExecutor#shutdown()} call will be
* triggered on context close, rejecting task submissions after that point.
* <p>As of 6.1.4, the default is "false", leniently allowing for late tasks
* to arrive after context close, still participating in the lifecycle stop
* phase. Note that this differs from {@link #setAcceptTasksAfterContextClose}
* which completely bypasses the coordinated lifecycle stop phase, with no
* explicit waiting for the completion of existing tasks at all.
* <p>Switch this to "true" for a strict early shutdown signal analogous to
* the 6.1-established default behavior of {@link ThreadPoolTaskScheduler}.
* Note that the related flags {@link #setAcceptTasksAfterContextClose} and
* {@link #setWaitForTasksToCompleteOnShutdown} will override this setting,
* leading to a late shutdown without a coordinated lifecycle stop phase.
* @since 6.1.4
* @see #initiateShutdown()
*/
public void setStrictEarlyShutdown(boolean defaultEarlyShutdown) {
this.strictEarlyShutdown = defaultEarlyShutdown;
}

/**
* Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
* about to be executed.
Expand Down Expand Up @@ -292,7 +318,7 @@ protected void afterExecute(Runnable task, Throwable ex) {
/**
* Create the BlockingQueue to use for the ThreadPoolExecutor.
* <p>A LinkedBlockingQueue instance will be created for a positive
* capacity value; a SynchronousQueue else.
* capacity value; a SynchronousQueue otherwise.
* @param queueCapacity the specified queue capacity
* @return the BlockingQueue instance
* @see java.util.concurrent.LinkedBlockingQueue
Expand Down Expand Up @@ -424,4 +450,11 @@ protected void cancelRemainingTask(Runnable task) {
}
}

@Override
protected void initiateEarlyShutdown() {
if (this.strictEarlyShutdown) {
super.initiateEarlyShutdown();
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -165,13 +165,16 @@ public void setTaskDecorator(TaskDecorator taskDecorator) {
}

/**
* Specify a timeout for task termination when closing this executor.
* The default is 0, not waiting for task termination at all.
* Specify a timeout (in milliseconds) for task termination when closing
* this executor. The default is 0, not waiting for task termination at all.
* <p>Note that a concrete >0 timeout specified here will lead to the
* wrapping of every submitted task into a task-tracking runnable which
* involves considerable overhead in case of a high number of tasks.
* However, for a modest level of submissions with longer-running
* tasks, this is feasible in order to arrive at a graceful shutdown.
* <p>Note that {@code SimpleAsyncTaskExecutor} does not participate in
* a coordinated lifecycle stop but rather just awaits task termination
* on {@link #close()}.
* @param timeout the timeout in milliseconds
* @since 6.1
* @see #close()
Expand All @@ -183,18 +186,6 @@ public void setTaskTerminationTimeout(long timeout) {
this.activeThreads = (timeout > 0 ? Collections.newSetFromMap(new ConcurrentHashMap<>()) : null);
}

/**
* Return whether this executor is still active, i.e. not closed yet,
* and therefore accepts further task submissions. Otherwise, it is
* either in the task termination phase or entirely shut down already.
* @since 6.1
* @see #setTaskTerminationTimeout
* @see #close()
*/
public boolean isActive() {
return this.active;
}

/**
* Set the maximum number of parallel task executions allowed.
* The default of -1 indicates no concurrency limit at all.
Expand Down Expand Up @@ -224,6 +215,18 @@ public final boolean isThrottleActive() {
return this.concurrencyThrottle.isThrottleActive();
}

/**
* Return whether this executor is still active, i.e. not closed yet,
* and therefore accepts further task submissions. Otherwise, it is
* either in the task termination phase or entirely shut down already.
* @since 6.1
* @see #setTaskTerminationTimeout
* @see #close()
*/
public boolean isActive() {
return this.active;
}


/**
* Executes the given task, within a concurrency throttle
Expand Down

0 comments on commit a2000db

Please sign in to comment.