Skip to content

Commit

Permalink
Fix SimpleBatcher apparent deadlock #2196 (#3148)
Browse files Browse the repository at this point in the history
* Fix SimpleBatcher Concurrency Issue (#2196)

### Problem:
- **Flush Operation Conflict:**
  - When one thread (T1) performs a flush, it may read and dispatch batched commands before resetting the `flushing` flag.
  - If another thread (T2) adds a command and forced flush at this moment. Command might be added to the queue but does not trigger a flush.
  - As a result, the command remains in the queue until the next flush request, causing a delay in dispatching.

- **Flag Reset Between Iterations:**
  - During a default flush operation, if multiple batches are processed, the `flushing` flag is reset between iterations.
  - This allows another thread to take over, potentially causing the initial thread to return `BatchTasks.EMPTY` instead of properly processed commands.

1. T1 -> batch(command, CommandBatching.flush()
2. T1 -> flushing.compareAndSet(false, true) == true
3. T1 -> flush()->doFlush()
4. T2 -> batch(command, CommandBatching.flush()
5. T2 -> flushing.compareAndSet(false, true) == false  #already flushing will skip doFlush  and command remain not dispatched
6. T1 -> batch() completes
7. T2 -> batch() completes

### Fix: If force flush is requested while flushing, perform additional flush iteration after ongoing completes

* format

* Update src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java

Co-authored-by: Tihomir Krasimirov Mateev <tihomir.mateev@redis.com>

* Update src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java

Co-authored-by: ggivo <ivo.gaydajiev@gmail.com>

---------

Co-authored-by: Tihomir Krasimirov Mateev <tihomir.mateev@redis.com>
  • Loading branch information
ggivo and tishun authored Feb 26, 2025
1 parent 87761b0 commit a127d5a
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 20 deletions.
65 changes: 45 additions & 20 deletions src/main/java/io/lettuce/core/dynamic/SimpleBatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
*
* @author Mark Paluch
* @author Lucio Paiva
* @author Ivo Gaydajiev
*/
class SimpleBatcher implements Batcher {

Expand All @@ -51,6 +52,11 @@ class SimpleBatcher implements Batcher {

private final AtomicBoolean flushing = new AtomicBoolean();

// forceFlushRequested indicates that a flush was requested while there is already a flush in progress
// This flag is used to ensure we will flush again after the current flush is done
// to ensure that any commands added while dispatching the current flush are also dispatched
private final AtomicBoolean forceFlushRequested = new AtomicBoolean();

public SimpleBatcher(StatefulConnection<Object, Object> connection, int batchSize) {

LettuceAssert.isTrue(batchSize == -1 || batchSize > 1, "Batch size must be greater zero or -1");
Expand Down Expand Up @@ -95,37 +101,56 @@ protected BatchTasks flush(boolean forcedFlush) {

List<RedisCommand<?, ?, ?>> commands = newDrainTarget();

while (flushing.compareAndSet(false, true)) {
while (true) {
if (flushing.compareAndSet(false, true)) {
try {

try {
int consume = -1;

int consume = -1;
if (!forcedFlush) {
long queuedItems = queue.size();
if (queuedItems >= batchSize) {
consume = batchSize;
defaultFlush = true;
}
}

if (!forcedFlush) {
long queuedItems = queue.size();
if (queuedItems >= batchSize) {
consume = batchSize;
defaultFlush = true;
List<? extends RedisCommand<?, ?, ?>> batch = doFlush(forcedFlush, defaultFlush, consume);
if (batch != null) {
commands.addAll(batch);
}
}

List<? extends RedisCommand<?, ?, ?>> batch = doFlush(forcedFlush, defaultFlush, consume);
if (batch != null) {
commands.addAll(batch);
}
if (defaultFlush && !queue.isEmpty() && queue.size() > batchSize) {
continue;
}

if (forceFlushRequested.compareAndSet(true, false)) {
continue;
}

if (defaultFlush && !queue.isEmpty() && queue.size() > batchSize) {
continue;
return new BatchTasks(commands);

} finally {
flushing.set(false);
}

return new BatchTasks(commands);
} else {
// Another thread is already flushing
if (forcedFlush) {
forceFlushRequested.set(true);
}

} finally {
flushing.set(false);
if (commands.isEmpty()) {
return BatchTasks.EMPTY;
} else {
// Scenario: A default flush is started in Thread T1.
// If multiple default batches need processing, T1 will release `flushing` and try to reacquire it.
// However, in the brief moment when T1 releases `flushing`, another thread (T2) might acquire it.
// This lead to a state where T2 has taken over processing from T1 and T1 should return commands processed
return new BatchTasks(commands);
}
}
}

return BatchTasks.EMPTY;
}

private List<? extends RedisCommand<?, ?, ?>> doFlush(boolean forcedFlush, boolean defaultFlush, int consume) {
Expand Down
40 changes: 40 additions & 0 deletions src/test/java/io/lettuce/core/dynamic/SimpleBatcherUnitTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand All @@ -21,6 +22,7 @@

/**
* @author Mark Paluch
* @author Ivo Gaydajiev
*/
@Tag(UNIT_TEST)
@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -127,6 +129,44 @@ void shouldBatchWithBatchControlFlush() {
verify(connection).dispatch(Arrays.asList(c1, c2));
}

@Test
void shouldDispatchCommandsQueuedDuringOngoingFlush() throws InterruptedException {
RedisCommand<Object, Object, Object> c1 = createCommand();
RedisCommand<Object, Object, Object> c2 = createCommand();

CountDownLatch batchFlushLatch1 = new CountDownLatch(1);
CountDownLatch batchFlushLatch2 = new CountDownLatch(1);

when(connection.dispatch((RedisCommand<Object, Object, Object>) any())).thenAnswer(invocation -> {
batchFlushLatch1.countDown();
batchFlushLatch2.await();

return null;
});

SimpleBatcher batcher = new SimpleBatcher(connection, 4);

Thread batchThread1 = new Thread(() -> {
batcher.batch(c1, CommandBatching.flush());
});
batchThread1.start();

Thread batchThread2 = new Thread(() -> {
try {
batchFlushLatch1.await();
} catch (InterruptedException ignored) {
}
batcher.batch(c2, CommandBatching.flush());
batchFlushLatch2.countDown();
});
batchThread2.start();

batchThread1.join();
batchThread2.join();
verify(connection, times(1)).dispatch(c1);
verify(connection, times(1)).dispatch(c2);
}

private static RedisCommand<Object, Object, Object> createCommand() {
return new AsyncCommand<>(new Command<>(CommandType.COMMAND, null, null));
}
Expand Down

0 comments on commit a127d5a

Please sign in to comment.