Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown ClusterTopologyRefreshTask properly #2985

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import io.lettuce.core.ClientOptions;
Expand Down Expand Up @@ -64,6 +66,10 @@ class ClusterTopologyRefreshScheduler implements Runnable, ClusterEventListener

private final EventExecutorGroup genericWorkerPool;

private static final ReentrantLock refreshLock = new ReentrantLock();

private static final Condition refreshComplete = refreshLock.newCondition();

ClusterTopologyRefreshScheduler(Supplier<ClusterClientOptions> clientOptions, Supplier<Partitions> partitions,
Supplier<CompletionStage<?>> refreshTopology, ClientResources clientResources) {

Expand Down Expand Up @@ -112,6 +118,14 @@ public boolean isTopologyRefreshInProgress() {
return clusterTopologyRefreshTask.get();
}

public ReentrantLock getRefreshLock() {
return refreshLock;
}

public Condition getRefreshComplete() {
return refreshComplete;
}

@Override
public void run() {

Expand Down Expand Up @@ -323,13 +337,18 @@ private static class ClusterTopologyRefreshTask extends AtomicBoolean implements

public void run() {

if (compareAndSet(false, true)) {
doRun();
return;
}

if (logger.isDebugEnabled()) {
logger.debug("ClusterTopologyRefreshTask already in progress");
refreshLock.lock();
try {
if (compareAndSet(false, true)) {
doRun();
return;
}

if (logger.isDebugEnabled()) {
logger.debug("ClusterTopologyRefreshTask already in progress");
}
} finally {
refreshLock.unlock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do not need to make any changes here:

  • the comapreAndSet makes sure there is only one thread that starts a refresh
  • the suspendTopologyRefresh should stop any new topology refresh attempts

}
}

Expand All @@ -345,7 +364,13 @@ void doRun() {
logger.warn("Cannot refresh Redis Cluster topology", throwable);
}

set(false);
refreshLock.lock();
try {
set(false);
refreshComplete.signalAll();
} finally {
refreshLock.unlock();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to lock before initiating the topology refresh with reloadTopologyAsync.get() and then unlock after we call set(false)

});
} catch (Exception e) {
logger.warn("Cannot refresh Redis Cluster topology", e);
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -1152,6 +1154,21 @@ public void setPartitions(Partitions partitions) {
public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) {

suspendTopologyRefresh();
ReentrantLock refreshLock = topologyRefreshScheduler.getRefreshLock();
Condition refreshComplete = topologyRefreshScheduler.getRefreshComplete();

refreshLock.lock();
try {
while (topologyRefreshScheduler.isTopologyRefreshInProgress()) {
try {
refreshComplete.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} finally {
refreshLock.unlock();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we know there is only one thread that started a refresh we do not need to wait for more than one lock to be released. We need one lock to acquire (I'd not use a spinlock here, because if a refresh is in progress it might take some time) and then we call the super method, e.g.

refreshLock.lock();
try {
   return super.shutdownAsync(quietPeriod, timeout, timeUnit);
} finally {
   refreshLock.unlock();
}


return super.shutdownAsync(quietPeriod, timeout, timeUnit);
}
Expand Down
Loading