From 3e07fd280fa83ba00b9e5b9dd0ce2c7448336cee Mon Sep 17 00:00:00 2001 From: tianliuliu <643422162@qq.com> Date: Thu, 6 Jan 2022 20:00:39 +0800 Subject: [PATCH] for #3666 add unit test more and docs --- .../apache/rocketmq/common/MixAllTest.java | 12 +++ docs/cn/Example_LMQ.md | 75 +++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 docs/cn/Example_LMQ.md diff --git a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java index 8d86544be69..4f2a341553e 100644 --- a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java @@ -95,4 +95,16 @@ public void testGetLocalhostByNetworkInterface() throws Exception { assertThat(MixAll.LOCALHOST).isNotNull(); assertThat(MixAll.getLocalhostByNetworkInterface()).isNotNull(); } + + @Test + public void testIsLmq() { + String testLmq = null; + assertThat(MixAll.isLmq(testLmq)).isFalse(); + testLmq = "lmq"; + assertThat(MixAll.isLmq(testLmq)).isFalse(); + testLmq = "%LMQ%queue123"; + assertThat(MixAll.isLmq(testLmq)).isTrue(); + testLmq = "%LMQ%GID_TEST"; + assertThat(MixAll.isLmq(testLmq)).isTrue(); + } } diff --git a/docs/cn/Example_LMQ.md b/docs/cn/Example_LMQ.md new file mode 100644 index 00000000000..e31232581a7 --- /dev/null +++ b/docs/cn/Example_LMQ.md @@ -0,0 +1,75 @@ +# Light message queue (LMQ) + + +## 一、broker启动配置 + + +broker.conf文件需要增加以下的配置项,开启LMQ开关,这样就可以识别LMQ相关属性的消息,进行原子分发消息到LMQ队列 +```properties +enableLmq = true +enableMultiDispatch = true +``` +## 二、发送消息 +发送消息的时候通过设置 INNER_MULTI_DISPATCH 属性,LMQ queue使用逗号分割,queue前缀必须是 %LMQ%,这样broker就可以识别LMQ queue. +```java +DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); +producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); +producer.start(); + + +/* +* Create a message instance, specifying topic, tag and message body. +*/ +Message msg = new Message("TopicTest" /* Topic */, + "TagA" /* Tag */, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ + ); +/* +* INNER_MULTI_DISPATCH property and PREFIX must start as "%LMQ%", +* If it is multiple LMQ, need to use “,” split +*/ +message.putUserProperty("INNER_MULTI_DISPATCH", "%LMQ%123,%LMQ%456"); +/* +* Call send message to deliver message to one of brokers. +*/ +SendResult sendResult = producer.send(msg); +``` +## 三、拉取消息 +LMQ queue在每个broker上只有一个queue,也即queueId为0, 指明轻量级的MessageQueue,就可以拉取消息进行消费。 +```java +DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(); +defaultMQPullConsumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); +defaultMQPullConsumer.setVipChannelEnabled(false); +defaultMQPullConsumer.setConsumerGroup("CID_RMQ_SYS_LMQ_TEST"); +defaultMQPullConsumer.setInstanceName("CID_RMQ_SYS_LMQ_TEST"); +defaultMQPullConsumer.setRegisterTopics(new HashSet<>(Arrays.asList("TopicTest"))); +defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(2000); +defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(3000); +defaultMQPullConsumer.start(); + +String brokerName = "set broker Name"; +MessageQueue mq = new MessageQueue("%LMQ%123", brokerName, 0); + +Long offset = defaultMQPullConsumer.maxOffset(mq); + +defaultMQPullConsumer.pullBlockIfNotFound( + mq, "*", offset, 32, + new PullCallback() { + @Override + public void onSuccess(PullResult pullResult) { + List list = pullResult.getMsgFoundList(); + if (list == null || list.isEmpty()) { + return; + } + for (MessageExt messageExt : list) { + + } + } + @Override + public void onException(Throwable e) { + + } +}); +``` +​ +