Skip to content

Commit

Permalink
create topic command (apache#531)
Browse files Browse the repository at this point in the history
* create topic command

* address review comments

* fix build and add createtopic method to standalone module

* fix build issue

* fix a few remaining code check issues

* fix test

close apache#346
  • Loading branch information
yzhao244 authored and xwm1992 committed Dec 27, 2021
1 parent ca36bb0 commit f6f93cb
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.rocketmq.admin.command;

import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;
import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

import java.io.File;
import java.util.UUID;

public abstract class Command {
protected DefaultMQAdminExt adminExt;

protected String nameServerAddr;
protected String clusterName;

public void init() {
ConfigurationWrapper configurationWrapper =
new ConfigurationWrapper(EventMeshConstants.EVENTMESH_CONF_HOME
+ File.separator
+ EventMeshConstants.EVENTMESH_CONF_FILE, false);
final ClientConfiguration clientConfiguration =
new ClientConfiguration(configurationWrapper);
clientConfiguration.init();

nameServerAddr = clientConfiguration.namesrvAddr;
clusterName = clientConfiguration.clusterName;
String accessKey = clientConfiguration.accessKey;
String secretKey = clientConfiguration.secretKey;

RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
adminExt = new DefaultMQAdminExt(rpcHook);
String groupId = UUID.randomUUID().toString();
adminExt.setAdminExtGroup("admin_ext_group-" + groupId);
adminExt.setNamesrvAddr(nameServerAddr);
}

public abstract void execute() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.rocketmq.admin.command;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.tools.command.CommandUtil;

import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CreateTopicCommand extends Command {
public Logger logger = LoggerFactory.getLogger(this.getClass());

private int numOfQueue = 4;
private int queuePermission = 6;
private String topicName = "";

@Override
public void execute() throws Exception {
if (StringUtils.isBlank(topicName)) {
logger.error("Topic name can not be blank.");
throw new Exception("Topic name can not be blank.");
}
try {
init();
adminExt.start();
Set<String> brokersAddr = CommandUtil.fetchMasterAddrByClusterName(
adminExt, clusterName);
for (String masterAddr : brokersAddr) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setTopicName(topicName);
topicConfig.setReadQueueNums(numOfQueue);
topicConfig.setWriteQueueNums(numOfQueue);
topicConfig.setPerm(queuePermission);
adminExt.createAndUpdateTopicConfig(masterAddr, topicConfig);
logger.info("Topic {} is created for RocketMQ broker {}", topicName, masterAddr);
}
} finally {
adminExt.shutdown();
}
}

public int getNumOfQueue() {
return numOfQueue;
}

public int getQueuePermission() {
return queuePermission;
}

public String getTopicName() {
return topicName;
}

public void setTopicName(String topicName) {
this.topicName = topicName;
}

public void setNumOfQueue(int numOfQueue) {
this.numOfQueue = numOfQueue;
}

public void setQueuePermission(int permission) {
this.queuePermission = permission;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class ClientConfiguration {
public Integer pollNameServerInterval = 10 * 1000;
public Integer heartbeatBrokerInterval = 30 * 1000;
public Integer rebalanceInterval = 20 * 1000;
public String clusterName = "";
public String accessKey = "";
public String secretKey = "";

protected ConfigurationWrapper configurationWrapper;

Expand Down Expand Up @@ -101,13 +104,17 @@ public void init() {
pullBatchSize = Integer.valueOf(clientPullBatchSizeStr);
}

String clientPollNamesrvIntervalStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL);
String clientPollNamesrvIntervalStr =
configurationWrapper.getProp(
ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL);
if (StringUtils.isNotEmpty(clientPollNamesrvIntervalStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientPollNamesrvIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL));
pollNameServerInterval = Integer.valueOf(clientPollNamesrvIntervalStr);
}

String clientHeartbeatBrokerIntervalStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL);
String clientHeartbeatBrokerIntervalStr =
configurationWrapper.getProp(
ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL);
if (StringUtils.isNotEmpty(clientHeartbeatBrokerIntervalStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL));
heartbeatBrokerInterval = Integer.valueOf(clientHeartbeatBrokerIntervalStr);
Expand All @@ -118,6 +125,21 @@ public void init() {
Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL));
rebalanceInterval = Integer.valueOf(clientRebalanceIntervalIntervalStr);
}

String cluster = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLUSTER);
if (StringUtils.isNotBlank(cluster)) {
clusterName = cluster;
}

String ak = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY);
if (StringUtils.isNotBlank(ak)) {
accessKey = ak;
}

String sk = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY);
if (StringUtils.isNotBlank(sk)) {
secretKey = sk;
}
}

static class ConfKeys {
Expand Down Expand Up @@ -147,6 +169,14 @@ static class ConfKeys {
public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL = "eventMesh.server.rocketmq.client.heartbeatBrokerInterval";

public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL = "eventMesh.server.rocketmq.client.rebalanceInterval";

public static String KEYS_EVENTMESH_ROCKETMQ_CLUSTER = "eventMesh.server.rocketmq.cluster";

public static String KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY =
"eventMesh.server.rocketmq.accessKey";

public static String KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY =
"eventMesh.server.rocketmq.secretKey";

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@
#
#######################rocketmq-client##################
eventMesh.server.rocketmq.namesrvAddr=127.0.0.1:9876;127.0.0.1:9876
eventMesh.server.rocketmq.cluster=DefaultCluster
eventMesh.server.rocketmq.accessKey=********
eventMesh.server.rocketmq.secretKey=********
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.producer;


import static org.assertj.core.api.Assertions.assertThat;

import org.apache.eventmesh.api.producer.MeshMQProducer;
import org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import io.openmessaging.api.exception.OMSRuntimeException;

public class DefaultProducerImplTest {

@Before
public void before() {}


@After
public void after() {
//TBD:Remove topic
}

@Test
public void testCreate_EmptyTopic() {
MeshMQProducer meshPub = new RocketMQProducerImpl();
try {
meshPub.createTopic(" ");
} catch (OMSRuntimeException e) {
assertThat(e.getMessage()).isEqualToIgnoringWhitespace("RocketMQ can not create topic");
}
}

@Test
public void testCreate_NullTopic() {
MeshMQProducer meshPub = new RocketMQProducerImpl();
try {
meshPub.createTopic(null);
} catch (OMSRuntimeException e) {
String errorMessage = e.getMessage();
assertThat(errorMessage).isEqualTo("RocketMQ can not create topic null");
}
}
}

0 comments on commit f6f93cb

Please sign in to comment.