Skip to content

Commit

Permalink
[apache#1355] fix(client): Netty client will leak when decoding respo…
Browse files Browse the repository at this point in the history
…nses
  • Loading branch information
rickyma committed Jan 15, 2024
1 parent 3180501 commit a3fbd4c
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ public void checkProcessedBlockIds() {

@Override
public void close() {
if (sdr != null) {
sdr.release();
}
if (readBuffer != null) {
RssUtils.releaseByteBuffer(readBuffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,21 @@ public void channelRead(ChannelHandlerContext ctx, Object data) {
if (frame == null) {
break;
}
Message msg = Message.decode(curType, frame);
if (msg.body() == null) {
frame.release();
Message msg = null;
try {
msg = Message.decode(curType, frame);
} finally {
boolean shouldRelease =
msg != null
&& (msg.body() == null
|| (msg.body().byteBuf() != null && msg.body().byteBuf().readableBytes() == 0));
if (shouldRelease) {
frame.release();
}
}
if (msg != null) {
ctx.fireChannelRead(msg);
}
ctx.fireChannelRead(msg);
clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,14 @@ public ShuffleDataResult readShuffleData() {
return null;
}

shuffleDataSegments =
SegmentSplitterFactory.getInstance()
.get(distributionType, expectTaskIds, readBufferSize)
.split(shuffleIndexResult);
shuffleIndexResult.release();
try {
shuffleDataSegments =
SegmentSplitterFactory.getInstance()
.get(distributionType, expectTaskIds, readBufferSize)
.split(shuffleIndexResult);
} finally {
shuffleIndexResult.release();
}
}

// We should skip unexpected and processed segments when handler is read
Expand Down

0 comments on commit a3fbd4c

Please sign in to comment.