forked from apache/rocketmq
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
for apache#3666 add unit test more and docs
- Loading branch information
1 parent
9e7af1c
commit 3e07fd2
Showing
2 changed files
with
87 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
# Light message queue (LMQ) | ||
|
||
|
||
## 一、broker启动配置 | ||
|
||
|
||
broker.conf文件需要增加以下的配置项,开启LMQ开关,这样就可以识别LMQ相关属性的消息,进行原子分发消息到LMQ队列 | ||
```properties | ||
enableLmq = true | ||
enableMultiDispatch = true | ||
``` | ||
## 二、发送消息 | ||
发送消息的时候通过设置 INNER_MULTI_DISPATCH 属性,LMQ queue使用逗号分割,queue前缀必须是 %LMQ%,这样broker就可以识别LMQ queue. | ||
```java | ||
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); | ||
producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); | ||
producer.start(); | ||
|
||
|
||
/* | ||
* Create a message instance, specifying topic, tag and message body. | ||
*/ | ||
Message msg = new Message("TopicTest" /* Topic */, | ||
"TagA" /* Tag */, | ||
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ | ||
); | ||
/* | ||
* INNER_MULTI_DISPATCH property and PREFIX must start as "%LMQ%", | ||
* If it is multiple LMQ, need to use “,” split | ||
*/ | ||
message.putUserProperty("INNER_MULTI_DISPATCH", "%LMQ%123,%LMQ%456"); | ||
/* | ||
* Call send message to deliver message to one of brokers. | ||
*/ | ||
SendResult sendResult = producer.send(msg); | ||
``` | ||
## 三、拉取消息 | ||
LMQ queue在每个broker上只有一个queue,也即queueId为0, 指明轻量级的MessageQueue,就可以拉取消息进行消费。 | ||
```java | ||
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(); | ||
defaultMQPullConsumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); | ||
defaultMQPullConsumer.setVipChannelEnabled(false); | ||
defaultMQPullConsumer.setConsumerGroup("CID_RMQ_SYS_LMQ_TEST"); | ||
defaultMQPullConsumer.setInstanceName("CID_RMQ_SYS_LMQ_TEST"); | ||
defaultMQPullConsumer.setRegisterTopics(new HashSet<>(Arrays.asList("TopicTest"))); | ||
defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(2000); | ||
defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(3000); | ||
defaultMQPullConsumer.start(); | ||
|
||
String brokerName = "set broker Name"; | ||
MessageQueue mq = new MessageQueue("%LMQ%123", brokerName, 0); | ||
|
||
Long offset = defaultMQPullConsumer.maxOffset(mq); | ||
|
||
defaultMQPullConsumer.pullBlockIfNotFound( | ||
mq, "*", offset, 32, | ||
new PullCallback() { | ||
@Override | ||
public void onSuccess(PullResult pullResult) { | ||
List<MessageExt> list = pullResult.getMsgFoundList(); | ||
if (list == null || list.isEmpty()) { | ||
return; | ||
} | ||
for (MessageExt messageExt : list) { | ||
|
||
} | ||
} | ||
@Override | ||
public void onException(Throwable e) { | ||
|
||
} | ||
}); | ||
``` | ||
| ||
|