From 15a0dbff744cd00ea250e251e0530fd5c5c3eabf Mon Sep 17 00:00:00 2001 From: Kirill Saied Date: Sat, 13 May 2023 20:07:10 +0300 Subject: [PATCH 1/3] [ISSUE #3909] Replaced anonymous new ChannelFutureListener() with lambda --- .../protocol/tcp/client/task/MessageTransferTask.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java index 815aae914e..31cf8f8951 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java @@ -54,7 +54,6 @@ import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.opentelemetry.api.trace.Span; @@ -134,13 +133,7 @@ public void run() { msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), "Tps overload, global flow control", pkg.getHeader().getSeq())); ctx.writeAndFlush(msg).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, - taskExecuteTime); - } - } + (ChannelFutureListener) future -> Utils.logSucceedMessageFlow(msg, session.getClient(), startTime, taskExecuteTime) ); TraceUtils.finishSpanWithException(ctx, event, "Tps overload, global flow control", null); From 22736d9e02892fcabea431245513938c02d92434 Mon Sep 17 00:00:00 2001 From: Kirill Saied Date: Mon, 15 May 2023 12:17:39 +0300 Subject: [PATCH 2/3] Fixed checkstyle for source-connector-rocketmq --- .../rocketmq/EventMeshTestUtils.java | 20 ++++++++++--------- .../rocketmq/RocketMQSourceWorker.java | 15 +++++++------- .../connector/RocketMQSourceConnector.java | 12 ++++++----- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/EventMeshTestUtils.java b/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/EventMeshTestUtils.java index 8e96ef01ce..211e30bb64 100644 --- a/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/EventMeshTestUtils.java +++ b/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/EventMeshTestUtils.java @@ -19,15 +19,6 @@ import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER; -import io.cloudevents.CloudEvent; -import io.cloudevents.core.builder.CloudEventBuilder; -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.client.tcp.common.MessageUtils; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; @@ -36,6 +27,17 @@ import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.common.utils.JsonUtils; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; + public class EventMeshTestUtils { private static final int SEQ_LENGTH = 10; diff --git a/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/RocketMQSourceWorker.java b/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/RocketMQSourceWorker.java index 41a758ecde..80c3d1eda8 100644 --- a/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/RocketMQSourceWorker.java +++ b/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/RocketMQSourceWorker.java @@ -17,8 +17,6 @@ package org.apache.eventmesh.source.connector.rocketmq; -import io.cloudevents.CloudEvent; -import java.util.List; import org.apache.eventmesh.client.tcp.EventMeshTCPClient; import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory; import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; @@ -27,6 +25,10 @@ import org.apache.eventmesh.source.connector.rocketmq.config.RocketMQSourceConfig; import org.apache.eventmesh.source.connector.rocketmq.connector.RocketMQSourceConnector; +import java.util.List; + +import io.cloudevents.CloudEvent; + public class RocketMQSourceWorker { public static final String SOURCE_CONSUMER_GROUP = "DEFAULT-CONSUMER-GROUP"; @@ -50,21 +52,18 @@ public static void main(String[] args) throws Exception { client.init(); - RocketMQSourceConnector rocketMQSourceConnector = new RocketMQSourceConnector(); - RocketMQSourceConfig rocketMQSourceConfig = new RocketMQSourceConfig(); - rocketMQSourceConfig.setSourceNameserver(SOURCE_CONNECT_NAMESRVADDR); rocketMQSourceConfig.setSourceTopic(SOURCE_TOPIC); rocketMQSourceConfig.setSourceGroup(SOURCE_CONSUMER_GROUP); + RocketMQSourceConnector rocketMQSourceConnector = new RocketMQSourceConnector(); rocketMQSourceConnector.init(rocketMQSourceConfig); - rocketMQSourceConnector.start(); - while(true) { + while (true) { List connectorRecordList = rocketMQSourceConnector.poll(); - for(ConnectRecord connectRecord : connectorRecordList) { + for (ConnectRecord connectRecord : connectorRecordList) { // todo:connectorRecord convert cloudEvents CloudEvent event = EventMeshTestUtils.generateCloudEventV1(connectRecord.getExtension("topic"), connectRecord.getData().toString()); client.publish(event, 3000); diff --git a/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/connector/RocketMQSourceConnector.java b/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/connector/RocketMQSourceConnector.java index 751e6ef754..50c1c9f20b 100644 --- a/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/connector/RocketMQSourceConnector.java +++ b/eventmesh-connectors/source-connector-rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/connector/RocketMQSourceConnector.java @@ -17,20 +17,22 @@ package org.apache.eventmesh.source.connector.rocketmq.connector; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.eventmesh.connector.api.config.Config; import org.apache.eventmesh.connector.api.data.ConnectRecord; import org.apache.eventmesh.connector.api.data.RecordOffset; import org.apache.eventmesh.connector.api.data.RecordPartition; import org.apache.eventmesh.connector.api.source.Source; import org.apache.eventmesh.source.connector.rocketmq.config.RocketMQSourceConfig; + import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.common.message.MessageExt; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class RocketMQSourceConnector implements Source { private RocketMQSourceConfig sourceConfig; From ac214cfe7b8299204902d49dc178da18b6ddce2f Mon Sep 17 00:00:00 2001 From: Kirill Saied Date: Mon, 15 May 2023 12:26:26 +0300 Subject: [PATCH 3/3] Fixed checkstyle for eventmesh-storage-rocketmq --- .../eventmesh/storage/rocketmq/admin/AbstractRmqAdmin.java | 1 + .../eventmesh/storage/rocketmq/admin/RocketMQAdmin.java | 7 ------- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/admin/AbstractRmqAdmin.java b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/admin/AbstractRmqAdmin.java index b8cfd00769..2e3c6e8770 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/admin/AbstractRmqAdmin.java +++ b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/admin/AbstractRmqAdmin.java @@ -2,6 +2,7 @@ import org.apache.eventmesh.common.config.ConfigService; import org.apache.eventmesh.storage.rocketmq.config.ClientConfiguration; + import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.remoting.RPCHook; diff --git a/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/admin/RocketMQAdmin.java b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/admin/RocketMQAdmin.java index 0aaae3e046..cbdfad6f50 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/admin/RocketMQAdmin.java +++ b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/admin/RocketMQAdmin.java @@ -21,18 +21,12 @@ import org.apache.eventmesh.api.admin.Admin; import org.apache.eventmesh.api.admin.TopicProperties; -import org.apache.eventmesh.common.config.ConfigService; -import org.apache.eventmesh.storage.rocketmq.config.ClientConfiguration; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.acl.common.AclClientRPCHook; -import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; import java.util.ArrayList; @@ -41,7 +35,6 @@ import java.util.List; import java.util.Properties; import java.util.Set; -import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import io.cloudevents.CloudEvent;