Skip to content

Commit

Permalink
Pass Allocator as RedisStateMachine constructor argument #1053
Browse files Browse the repository at this point in the history
Use provided allocator instead of using a static reference to PooledByteBufAllocator.DEFAULT.
  • Loading branch information
mp911de committed Nov 8, 2019
1 parent caa2748 commit 9c583b9
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 21 deletions.
28 changes: 16 additions & 12 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom
private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque<>();
private final long commandHandlerId = COMMAND_HANDLER_COUNTER.incrementAndGet();

private final RedisStateMachine rsm = new RedisStateMachine();
private final boolean traceEnabled = logger.isTraceEnabled();
private final boolean debugEnabled = logger.isDebugEnabled();
private final boolean latencyMetricsEnabled;
Expand All @@ -85,7 +84,8 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom
private final boolean boundedQueues;
private final BackpressureSource backpressureSource = new BackpressureSource();

Channel channel;
private RedisStateMachine rsm;
private Channel channel;
private ByteBuf buffer;
private LifecycleState lifecycleState = LifecycleState.NOT_CONNECTED;
private String logPrefix;
Expand Down Expand Up @@ -165,6 +165,7 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
setState(LifecycleState.REGISTERED);

buffer = ctx.alloc().directBuffer(8192 * 8);
rsm = new RedisStateMachine(ctx.alloc());
ctx.fireChannelRegistered();
}

Expand All @@ -186,11 +187,12 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

channel = null;
buffer.release();
rsm.close();
rsm = null;

reset();

setState(LifecycleState.CLOSED);
rsm.close();

ctx.fireChannelUnregistered();
}
Expand Down Expand Up @@ -232,7 +234,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
try {
command.completeExceptionally(cause);
} catch (Exception ex) {
logger.warn("{} Unexpected exception during command completion exceptionally: {}", logPrefix, ex.toString(), ex);
logger.warn("{} Unexpected exception during command completion exceptionally: {}", logPrefix, ex.toString(),
ex);
}
}

Expand Down Expand Up @@ -328,8 +331,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
onProtectedMode(command.getOutput().getError());
}

rsm.reset();

if (debugEnabled) {
logger.debug("{} channelInactive() done", logPrefix());
}
Expand Down Expand Up @@ -429,8 +430,8 @@ private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<?, ?,

if (isWriteable(command) && !deduplicated.add(command)) {
deduplicated.remove(command);
command.completeExceptionally(new RedisException(
"Attempting to write duplicate command that is already enqueued: " + command));
command.completeExceptionally(
new RedisException("Attempting to write duplicate command that is already enqueued: " + command));
}
}

Expand Down Expand Up @@ -779,13 +780,14 @@ protected void afterDecode(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> comm

private void recordLatency(WithLatency withLatency, ProtocolKeyword commandType) {

if (withLatency != null && clientResources.commandLatencyCollector().isEnabled() && channel != null && remote() != null) {
if (withLatency != null && clientResources.commandLatencyCollector().isEnabled() && channel != null
&& remote() != null) {

long firstResponseLatency = withLatency.getFirstResponse() - withLatency.getSent();
long completionLatency = nanoTime() - withLatency.getSent();

clientResources.commandLatencyCollector().recordCommandLatency(local(), remote(), commandType,
firstResponseLatency, completionLatency);
clientResources.commandLatencyCollector().recordCommandLatency(local(), remote(), commandType, firstResponseLatency,
completionLatency);
}
}

Expand Down Expand Up @@ -813,7 +815,9 @@ private void reset() {

private void resetInternals() {

rsm.reset();
if (rsm != null) {
rsm.reset();
}

if (buffer.refCnt() > 0) {
buffer.clear();
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/io/lettuce/core/protocol/RedisStateMachine.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.lettuce.core.RedisException;
import io.lettuce.core.output.CommandOutput;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ByteProcessor;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand Down Expand Up @@ -55,15 +55,16 @@ enum Type {
private final State[] stack = new State[32];
private final boolean debugEnabled = logger.isDebugEnabled();
private final LongProcessor longProcessor = new LongProcessor();
private final ByteBuf responseElementBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(1024);
private final ByteBuf responseElementBuffer;
private final AtomicBoolean closed = new AtomicBoolean();

private int stackElements;

/**
* Initialize a new instance.
*/
public RedisStateMachine() {
public RedisStateMachine(ByteBufAllocator alloc) {
this.responseElementBuffer = alloc.directBuffer(1024);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,14 @@
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;

import io.lettuce.core.RedisException;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.output.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;

/**
Expand Down Expand Up @@ -69,7 +67,12 @@ static void afterClass() {
@BeforeEach
final void createStateMachine() {
output = new StatusOutput<>(codec);
rsm = new RedisStateMachine();
rsm = new RedisStateMachine(ByteBufAllocator.DEFAULT);
}

@AfterEach
void tearDown() {
rsm.close();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.output.ArrayOutput;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;

/**
Expand Down Expand Up @@ -59,7 +60,7 @@ public void set(long integer) {

private ByteBuf masterBuffer;

private final RedisStateMachine stateMachine = new RedisStateMachine();
private final RedisStateMachine stateMachine = new RedisStateMachine(ByteBufAllocator.DEFAULT);
private final byte[] payload = ("*3\r\n" + //
"$4\r\n" + //
"LLEN\r\n" + //
Expand Down

0 comments on commit 9c583b9

Please sign in to comment.