From 42528520c6a36f9f3ebcc8bc65f7e450df8647cb Mon Sep 17 00:00:00 2001 From: yuri Date: Tue, 30 Nov 2021 02:40:45 -0500 Subject: [PATCH] create topic command (#531) * create topic command * address review comments * fix build and add createtopic method to standalone module * fix build issue * fix a few remaining code check issues * fix test close #346 --- .../api/producer/MeshMQProducer.java | 3 + .../rocketmq/admin/command/Command.java | 60 +++++++++++++ .../admin/command/CreateTopicCommand.java | 84 +++++++++++++++++++ .../rocketmq/config/ClientConfiguration.java | 34 +++++++- .../producer/RocketMQProducerImpl.java | 49 +++++++---- .../main/resources/rocketmq-client.properties | 3 + .../producer/DefaultProducerImplTest.java | 63 ++++++++++++++ .../StandaloneMeshMQProducerAdaptor.java | 7 ++ 8 files changed, 287 insertions(+), 16 deletions(-) create mode 100644 eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java create mode 100644 eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java create mode 100644 eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/producer/DefaultProducerImplTest.java diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java index d87be7d842..b1a1de0385 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java @@ -22,6 +22,7 @@ import io.openmessaging.api.Message; import io.openmessaging.api.Producer; import io.openmessaging.api.SendCallback; +import io.openmessaging.api.exception.OMSRuntimeException; import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.spi.EventMeshExtensionType; @@ -41,5 +42,7 @@ public interface MeshMQProducer extends Producer { void checkTopicExist(String topic) throws Exception; void setExtFields(); + + void createTopic(String topicName) throws OMSRuntimeException; } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java new file mode 100644 index 0000000000..82ca6a514e --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.rocketmq.admin.command; + +import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants; +import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration; +import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper; + +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; + +import java.io.File; +import java.util.UUID; + +public abstract class Command { + protected DefaultMQAdminExt adminExt; + + protected String nameServerAddr; + protected String clusterName; + + public void init() { + ConfigurationWrapper configurationWrapper = + new ConfigurationWrapper(EventMeshConstants.EVENTMESH_CONF_HOME + + File.separator + + EventMeshConstants.EVENTMESH_CONF_FILE, false); + final ClientConfiguration clientConfiguration = + new ClientConfiguration(configurationWrapper); + clientConfiguration.init(); + + nameServerAddr = clientConfiguration.namesrvAddr; + clusterName = clientConfiguration.clusterName; + String accessKey = clientConfiguration.accessKey; + String secretKey = clientConfiguration.secretKey; + + RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); + adminExt = new DefaultMQAdminExt(rpcHook); + String groupId = UUID.randomUUID().toString(); + adminExt.setAdminExtGroup("admin_ext_group-" + groupId); + adminExt.setNamesrvAddr(nameServerAddr); + } + + public abstract void execute() throws Exception; +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java new file mode 100644 index 0000000000..53f1822560 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.rocketmq.admin.command; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.tools.command.CommandUtil; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CreateTopicCommand extends Command { + public Logger logger = LoggerFactory.getLogger(this.getClass()); + + private int numOfQueue = 4; + private int queuePermission = 6; + private String topicName = ""; + + @Override + public void execute() throws Exception { + if (StringUtils.isBlank(topicName)) { + logger.error("Topic name can not be blank."); + throw new Exception("Topic name can not be blank."); + } + try { + init(); + adminExt.start(); + Set brokersAddr = CommandUtil.fetchMasterAddrByClusterName( + adminExt, clusterName); + for (String masterAddr : brokersAddr) { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topicName); + topicConfig.setReadQueueNums(numOfQueue); + topicConfig.setWriteQueueNums(numOfQueue); + topicConfig.setPerm(queuePermission); + adminExt.createAndUpdateTopicConfig(masterAddr, topicConfig); + logger.info("Topic {} is created for RocketMQ broker {}", topicName, masterAddr); + } + } finally { + adminExt.shutdown(); + } + } + + public int getNumOfQueue() { + return numOfQueue; + } + + public int getQueuePermission() { + return queuePermission; + } + + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + public void setNumOfQueue(int numOfQueue) { + this.numOfQueue = numOfQueue; + } + + public void setQueuePermission(int permission) { + this.queuePermission = permission; + } +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java index f1b9335bbe..feadca6319 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java @@ -36,6 +36,9 @@ public class ClientConfiguration { public Integer pollNameServerInterval = 10 * 1000; public Integer heartbeatBrokerInterval = 30 * 1000; public Integer rebalanceInterval = 20 * 1000; + public String clusterName = ""; + public String accessKey = ""; + public String secretKey = ""; protected ConfigurationWrapper configurationWrapper; @@ -101,13 +104,17 @@ public void init() { pullBatchSize = Integer.valueOf(clientPullBatchSizeStr); } - String clientPollNamesrvIntervalStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL); + String clientPollNamesrvIntervalStr = + configurationWrapper.getProp( + ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL); if (StringUtils.isNotEmpty(clientPollNamesrvIntervalStr)) { Preconditions.checkState(StringUtils.isNumeric(clientPollNamesrvIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL)); pollNameServerInterval = Integer.valueOf(clientPollNamesrvIntervalStr); } - String clientHeartbeatBrokerIntervalStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL); + String clientHeartbeatBrokerIntervalStr = + configurationWrapper.getProp( + ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL); if (StringUtils.isNotEmpty(clientHeartbeatBrokerIntervalStr)) { Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL)); heartbeatBrokerInterval = Integer.valueOf(clientHeartbeatBrokerIntervalStr); @@ -118,6 +125,21 @@ public void init() { Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL)); rebalanceInterval = Integer.valueOf(clientRebalanceIntervalIntervalStr); } + + String cluster = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLUSTER); + if (StringUtils.isNotBlank(cluster)) { + clusterName = cluster; + } + + String ak = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY); + if (StringUtils.isNotBlank(ak)) { + accessKey = ak; + } + + String sk = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY); + if (StringUtils.isNotBlank(sk)) { + secretKey = sk; + } } static class ConfKeys { @@ -147,6 +169,14 @@ static class ConfKeys { public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL = "eventMesh.server.rocketmq.client.heartbeatBrokerInterval"; public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL = "eventMesh.server.rocketmq.client.rebalanceInterval"; + + public static String KEYS_EVENTMESH_ROCKETMQ_CLUSTER = "eventMesh.server.rocketmq.cluster"; + + public static String KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY = + "eventMesh.server.rocketmq.accessKey"; + + public static String KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY = + "eventMesh.server.rocketmq.secretKey"; } } \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java index 75e2360ba1..d35d109b91 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java @@ -17,32 +17,38 @@ package org.apache.eventmesh.connector.rocketmq.producer; -import java.io.File; -import java.util.Properties; -import java.util.concurrent.ExecutorService; - -import io.openmessaging.api.Message; -import io.openmessaging.api.MessageBuilder; -import io.openmessaging.api.MessagingAccessPoint; -import io.openmessaging.api.OMS; -import io.openmessaging.api.OMSBuiltinKeys; -import io.openmessaging.api.SendCallback; -import io.openmessaging.api.SendResult; - import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.producer.MeshMQProducer; import org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl; +import org.apache.eventmesh.connector.rocketmq.admin.command.CreateTopicCommand; import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants; import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration; import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper; + + import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.io.File; +import java.util.Properties; +import java.util.concurrent.ExecutorService; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.openmessaging.api.Message; +import io.openmessaging.api.MessageBuilder; +import io.openmessaging.api.MessagingAccessPoint; +import io.openmessaging.api.OMS; +import io.openmessaging.api.OMSBuiltinKeys; +import io.openmessaging.api.SendCallback; +import io.openmessaging.api.SendResult; +import io.openmessaging.api.exception.OMSRuntimeException; + + public class RocketMQProducerImpl implements MeshMQProducer { public Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -55,7 +61,8 @@ public synchronized void init(Properties keyValue) { new ConfigurationWrapper(EventMeshConstants.EVENTMESH_CONF_HOME + File.separator + EventMeshConstants.EVENTMESH_CONF_FILE, false); - final ClientConfiguration clientConfiguration = new ClientConfiguration(configurationWrapper); + final ClientConfiguration clientConfiguration = + new ClientConfiguration(configurationWrapper); clientConfiguration.init(); String producerGroup = keyValue.getProperty("producerGroup"); @@ -112,7 +119,9 @@ public boolean reply(final Message message, final SendCallback sendCallback) thr @Override public void checkTopicExist(String topic) throws Exception { - this.producer.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getDefaultTopicRouteInfoFromNameServer(topic, EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); + this.producer.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory() + .getMQClientAPIImpl().getDefaultTopicRouteInfoFromNameServer(topic, + EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); } @Override @@ -149,4 +158,16 @@ public void updateCredential(Properties credentialProperties) { public MessageBuilder messageBuilder() { return null; } + + @Override + public void createTopic(String topicName) throws OMSRuntimeException { + CreateTopicCommand createTopicCommand = new CreateTopicCommand(); + createTopicCommand.setTopicName(topicName); + try { + createTopicCommand.execute(); + } catch (Exception e) { + throw new OMSRuntimeException(-1, + String.format("RocketMQ can not create topic %s", topicName), e); + } + } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties index 1261f30e2c..8e0822ad36 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties @@ -16,3 +16,6 @@ # #######################rocketmq-client################## eventMesh.server.rocketmq.namesrvAddr=127.0.0.1:9876;127.0.0.1:9876 +eventMesh.server.rocketmq.cluster=DefaultCluster +eventMesh.server.rocketmq.accessKey=******** +eventMesh.server.rocketmq.secretKey=******** diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/producer/DefaultProducerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/producer/DefaultProducerImplTest.java new file mode 100644 index 0000000000..16ff257b79 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/producer/DefaultProducerImplTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.producer; + + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.eventmesh.api.producer.MeshMQProducer; +import org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.openmessaging.api.exception.OMSRuntimeException; + +public class DefaultProducerImplTest { + + @Before + public void before() {} + + + @After + public void after() { + //TBD:Remove topic + } + + @Test + public void testCreate_EmptyTopic() { + MeshMQProducer meshPub = new RocketMQProducerImpl(); + try { + meshPub.createTopic(" "); + } catch (OMSRuntimeException e) { + assertThat(e.getMessage()).isEqualToIgnoringWhitespace("RocketMQ can not create topic"); + } + } + + @Test + public void testCreate_NullTopic() { + MeshMQProducer meshPub = new RocketMQProducerImpl(); + try { + meshPub.createTopic(null); + } catch (OMSRuntimeException e) { + String errorMessage = e.getMessage(); + assertThat(errorMessage).isEqualTo("RocketMQ can not create topic null"); + } + } +} \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneMeshMQProducerAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneMeshMQProducerAdaptor.java index fb632c7c5c..43c9423e16 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneMeshMQProducerAdaptor.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneMeshMQProducerAdaptor.java @@ -21,6 +21,8 @@ import io.openmessaging.api.MessageBuilder; import io.openmessaging.api.SendCallback; import io.openmessaging.api.SendResult; +import io.openmessaging.api.exception.OMSRuntimeException; + import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.producer.MeshMQProducer; import org.apache.eventmesh.connector.standalone.MessagingAccessPointImpl; @@ -122,4 +124,9 @@ public void shutdown() { public MessageBuilder messageBuilder() { return null; } + + @Override + public void createTopic(String topicName) throws OMSRuntimeException { + // TODO Auto-generated method stub + } }