From ba4d8da93825deb8351892dab1bad9ef6a3a080a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B9=BF=E9=B8=A3?= Date: Mon, 27 Feb 2023 20:17:23 +0800 Subject: [PATCH 1/3] Revert "[ISSUE #3271]Fix handle http message throw IllegalReferenceCountException" This reverts commit 3a362753c43e5fcc36e32e1973e7375a98eb6f2d. --- .../apache/eventmesh/runtime/boot/AbstractHTTPServer.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index 078eaf9cb9..8a1179ed1f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -95,6 +95,7 @@ import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.ReferenceCountUtil; import io.opentelemetry.api.trace.Span; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; @@ -443,7 +444,9 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) } } catch (Exception ex) { - log.error("execute AbstractHTTPServer.HTTPHandler.channelRead0 error", ex); + log.error("AbrstractHTTPServer.HTTPHandler.channelRead error", ex); + } finally { + ReferenceCountUtil.release(httpRequest); } } From 52fa1fcb17bf18ed8d8af1c4c9e5b7116f0fb988 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B9=BF=E9=B8=A3?= Date: Mon, 27 Feb 2023 20:22:18 +0800 Subject: [PATCH 2/3] Fix HTTP server HttpRequest release bug --- .../runtime/boot/AbstractHTTPServer.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index 8a1179ed1f..aadfb90f6b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -66,10 +66,10 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -332,21 +332,23 @@ private Map parseHttpRequestBody(final HttpRequest httpRequest) } @Sharable - private class HTTPHandler extends SimpleChannelInboundHandler { + private class HTTPHandler extends ChannelInboundHandlerAdapter { /** * Is called for each message of type {@link HttpRequest}. * - * @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler} belongs to - * @param httpRequest the message to handle + * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelInboundHandlerAdapter} belongs to + * @param msg the message to handle * @throws Exception is thrown if an error occurred */ @Override - protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception { - if (httpRequest == null) { + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (!(msg instanceof HttpRequest)) { return; } + HttpRequest httpRequest = (HttpRequest) msg; + if (Objects.nonNull(handlerService) && handlerService.isProcessorWrapper(httpRequest)) { handlerService.handler(ctx, httpRequest, asyncContextCompleteHandler); return; @@ -444,7 +446,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) } } catch (Exception ex) { - log.error("AbrstractHTTPServer.HTTPHandler.channelRead error", ex); + log.error("AbstractHTTPServer.HTTPHandler.channelRead error", ex); } finally { ReferenceCountUtil.release(httpRequest); } From da8e1644f448ea51083373b6d8b91505d7d1d3a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B9=BF=E9=B8=A3?= Date: Mon, 27 Feb 2023 20:35:22 +0800 Subject: [PATCH 3/3] bug fix --- .../client/http/producer/EventMeshMessageProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/EventMeshMessageProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/EventMeshMessageProducer.java index fcfcc9659d..5ba3f0feaa 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/EventMeshMessageProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/EventMeshMessageProducer.java @@ -98,7 +98,7 @@ public EventMeshMessage transformMessage(final EventMeshRetObj retObj) { final SendMessageResponseBody.ReplyMessage replyMessage = JsonUtils.parseObject(retObj.getRetMsg(), SendMessageResponseBody.ReplyMessage.class); return EventMeshMessage.builder() - .content(Objects.requireNonNUll(replyMessage, "ReplyMessage must not be null").body) + .content(Objects.requireNonNull(replyMessage, "ReplyMessage must not be null").body) .prop(replyMessage.properties) .topic(replyMessage.topic).build(); }