diff --git a/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/RocketMQSourceWorker.java b/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/RocketMQSourceWorker.java index c98157d15b..be5cdf1883 100644 --- a/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/RocketMQSourceWorker.java +++ b/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/RocketMQSourceWorker.java @@ -23,9 +23,9 @@ public class RocketMQSourceWorker { public static void main(String[] args) throws Exception { - + Application.run(RocketMQSourceConnector.class); - + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java index 815aae914e..31cf8f8951 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java @@ -54,7 +54,6 @@ import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.opentelemetry.api.trace.Span; @@ -134,13 +133,7 @@ public void run() { msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), "Tps overload, global flow control", pkg.getHeader().getSeq())); ctx.writeAndFlush(msg).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, - taskExecuteTime); - } - } + (ChannelFutureListener) future -> Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, taskExecuteTime) ); TraceUtils.finishSpanWithException(ctx, event, "Tps overload, global flow control", null);