diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java new file mode 100644 index 0000000000..82ca6a514e --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.rocketmq.admin.command; + +import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants; +import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration; +import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper; + +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; + +import java.io.File; +import java.util.UUID; + +public abstract class Command { + protected DefaultMQAdminExt adminExt; + + protected String nameServerAddr; + protected String clusterName; + + public void init() { + ConfigurationWrapper configurationWrapper = + new ConfigurationWrapper(EventMeshConstants.EVENTMESH_CONF_HOME + + File.separator + + EventMeshConstants.EVENTMESH_CONF_FILE, false); + final ClientConfiguration clientConfiguration = + new ClientConfiguration(configurationWrapper); + clientConfiguration.init(); + + nameServerAddr = clientConfiguration.namesrvAddr; + clusterName = clientConfiguration.clusterName; + String accessKey = clientConfiguration.accessKey; + String secretKey = clientConfiguration.secretKey; + + RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); + adminExt = new DefaultMQAdminExt(rpcHook); + String groupId = UUID.randomUUID().toString(); + adminExt.setAdminExtGroup("admin_ext_group-" + groupId); + adminExt.setNamesrvAddr(nameServerAddr); + } + + public abstract void execute() throws Exception; +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java new file mode 100644 index 0000000000..53f1822560 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.rocketmq.admin.command; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.tools.command.CommandUtil; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CreateTopicCommand extends Command { + public Logger logger = LoggerFactory.getLogger(this.getClass()); + + private int numOfQueue = 4; + private int queuePermission = 6; + private String topicName = ""; + + @Override + public void execute() throws Exception { + if (StringUtils.isBlank(topicName)) { + logger.error("Topic name can not be blank."); + throw new Exception("Topic name can not be blank."); + } + try { + init(); + adminExt.start(); + Set brokersAddr = CommandUtil.fetchMasterAddrByClusterName( + adminExt, clusterName); + for (String masterAddr : brokersAddr) { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topicName); + topicConfig.setReadQueueNums(numOfQueue); + topicConfig.setWriteQueueNums(numOfQueue); + topicConfig.setPerm(queuePermission); + adminExt.createAndUpdateTopicConfig(masterAddr, topicConfig); + logger.info("Topic {} is created for RocketMQ broker {}", topicName, masterAddr); + } + } finally { + adminExt.shutdown(); + } + } + + public int getNumOfQueue() { + return numOfQueue; + } + + public int getQueuePermission() { + return queuePermission; + } + + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + public void setNumOfQueue(int numOfQueue) { + this.numOfQueue = numOfQueue; + } + + public void setQueuePermission(int permission) { + this.queuePermission = permission; + } +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java index f1b9335bbe..feadca6319 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java @@ -36,6 +36,9 @@ public class ClientConfiguration { public Integer pollNameServerInterval = 10 * 1000; public Integer heartbeatBrokerInterval = 30 * 1000; public Integer rebalanceInterval = 20 * 1000; + public String clusterName = ""; + public String accessKey = ""; + public String secretKey = ""; protected ConfigurationWrapper configurationWrapper; @@ -101,13 +104,17 @@ public void init() { pullBatchSize = Integer.valueOf(clientPullBatchSizeStr); } - String clientPollNamesrvIntervalStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL); + String clientPollNamesrvIntervalStr = + configurationWrapper.getProp( + ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL); if (StringUtils.isNotEmpty(clientPollNamesrvIntervalStr)) { Preconditions.checkState(StringUtils.isNumeric(clientPollNamesrvIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL)); pollNameServerInterval = Integer.valueOf(clientPollNamesrvIntervalStr); } - String clientHeartbeatBrokerIntervalStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL); + String clientHeartbeatBrokerIntervalStr = + configurationWrapper.getProp( + ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL); if (StringUtils.isNotEmpty(clientHeartbeatBrokerIntervalStr)) { Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL)); heartbeatBrokerInterval = Integer.valueOf(clientHeartbeatBrokerIntervalStr); @@ -118,6 +125,21 @@ public void init() { Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL)); rebalanceInterval = Integer.valueOf(clientRebalanceIntervalIntervalStr); } + + String cluster = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLUSTER); + if (StringUtils.isNotBlank(cluster)) { + clusterName = cluster; + } + + String ak = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY); + if (StringUtils.isNotBlank(ak)) { + accessKey = ak; + } + + String sk = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY); + if (StringUtils.isNotBlank(sk)) { + secretKey = sk; + } } static class ConfKeys { @@ -147,6 +169,14 @@ static class ConfKeys { public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL = "eventMesh.server.rocketmq.client.heartbeatBrokerInterval"; public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL = "eventMesh.server.rocketmq.client.rebalanceInterval"; + + public static String KEYS_EVENTMESH_ROCKETMQ_CLUSTER = "eventMesh.server.rocketmq.cluster"; + + public static String KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY = + "eventMesh.server.rocketmq.accessKey"; + + public static String KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY = + "eventMesh.server.rocketmq.secretKey"; } } \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties index 1261f30e2c..8e0822ad36 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties @@ -16,3 +16,6 @@ # #######################rocketmq-client################## eventMesh.server.rocketmq.namesrvAddr=127.0.0.1:9876;127.0.0.1:9876 +eventMesh.server.rocketmq.cluster=DefaultCluster +eventMesh.server.rocketmq.accessKey=******** +eventMesh.server.rocketmq.secretKey=******** diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/producer/DefaultProducerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/producer/DefaultProducerImplTest.java new file mode 100644 index 0000000000..16ff257b79 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/producer/DefaultProducerImplTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.producer; + + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.eventmesh.api.producer.MeshMQProducer; +import org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.openmessaging.api.exception.OMSRuntimeException; + +public class DefaultProducerImplTest { + + @Before + public void before() {} + + + @After + public void after() { + //TBD:Remove topic + } + + @Test + public void testCreate_EmptyTopic() { + MeshMQProducer meshPub = new RocketMQProducerImpl(); + try { + meshPub.createTopic(" "); + } catch (OMSRuntimeException e) { + assertThat(e.getMessage()).isEqualToIgnoringWhitespace("RocketMQ can not create topic"); + } + } + + @Test + public void testCreate_NullTopic() { + MeshMQProducer meshPub = new RocketMQProducerImpl(); + try { + meshPub.createTopic(null); + } catch (OMSRuntimeException e) { + String errorMessage = e.getMessage(); + assertThat(errorMessage).isEqualTo("RocketMQ can not create topic null"); + } + } +} \ No newline at end of file