Skip to content

Commit

Permalink
[ISSUE #19]Message track query enhancement (#21)
Browse files Browse the repository at this point in the history
* [ISSUE #19]Message track query enhancement

* traceTopic can be null and system Topic is used by default

* add unit test

* select messageTrack topic in messageTrace page

Co-authored-by: zhangjidi <zhangjidi@cmss.chinamobile.com>
  • Loading branch information
zhangjidi2016 and zhangjidi authored Sep 23, 2021
1 parent dc67c66 commit 58336d9
Show file tree
Hide file tree
Showing 16 changed files with 97 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand Down Expand Up @@ -48,8 +47,6 @@ public class RMQConfigure {

private boolean enableDashBoardCollect;

private String msgTrackTopicName;

private boolean loginRequired = false;

private String accessKey;
Expand Down Expand Up @@ -123,17 +120,6 @@ public void setEnableDashBoardCollect(String enableDashBoardCollect) {
this.enableDashBoardCollect = Boolean.valueOf(enableDashBoardCollect);
}

public String getMsgTrackTopicNameOrDefault() {
if (StringUtils.isEmpty(msgTrackTopicName)) {
return TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
return msgTrackTopicName;
}

public void setMsgTrackTopicName(String msgTrackTopicName) {
this.msgTrackTopicName = msgTrackTopicName;
}

public boolean isLoginRequired() {
return loginRequired;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public Object viewTraceMessages(@RequestParam String msgId) {

@RequestMapping(value = "/viewMessageTraceGraph.query", method = RequestMethod.GET)
@ResponseBody
public MessageTraceGraph viewMessageTraceGraph(@RequestParam String msgId) {
return messageTraceService.queryMessageTraceGraph(msgId);
public MessageTraceGraph viewMessageTraceGraph(@RequestParam String msgId,
@RequestParam(required = false) String traceTopic) {
return messageTraceService.queryMessageTraceGraph(msgId, traceTopic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,9 @@ public class TopicController {

@RequestMapping(value = "/list.query", method = RequestMethod.GET)
@ResponseBody
public Object list(@RequestParam(value = "skipSysProcess", required = false) String skipSysProcess) {
boolean flag = false;
if ("true".equals(skipSysProcess)) {
flag = true;
}
return topicService.fetchAllTopicList(flag);
public Object list(@RequestParam(value = "skipSysProcess", required = false) boolean skipSysProcess,
@RequestParam(value = "skipRetryAndDlq", required = false) boolean skipRetryAndDlq) {
return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq);
}

@RequestMapping(value = "/stats.query", method = RequestMethod.GET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ public interface MessageTraceService {

List<MessageTraceView> queryMessageTraceByTopicAndKey(final String topic, final String key);

MessageTraceGraph queryMessageTraceGraph(final String key);
MessageTraceGraph queryMessageTraceGraph(final String key, final String traceTopic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.List;

public interface TopicService {
TopicList fetchAllTopicList(boolean skipSysProcess);
TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndDlq);

TopicStatsTable stats(String topic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.MessageTraceView;
import org.apache.rocketmq.dashboard.model.trace.ProducerNode;
Expand Down Expand Up @@ -65,7 +66,7 @@ public class MessageTraceServiceImpl implements MessageTraceService {

@Override
public List<MessageTraceView> queryMessageTraceKey(String key) {
String queryTopic = configure.getMsgTrackTopicNameOrDefault();
String queryTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
logger.info("query data topic name is:{}", queryTopic);
return queryMessageTraceByTopicAndKey(queryTopic, key);
}
Expand All @@ -86,8 +87,11 @@ public List<MessageTraceView> queryMessageTraceByTopicAndKey(String topic, Strin
}

@Override
public MessageTraceGraph queryMessageTraceGraph(String key) {
List<MessageTraceView> messageTraceViews = queryMessageTraceKey(key);
public MessageTraceGraph queryMessageTraceGraph(String key, String topic) {
if (StringUtils.isEmpty(topic)) {
topic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
List<MessageTraceView> messageTraceViews = queryMessageTraceByTopicAndKey(topic, key);
return buildMessageTraceGraph(messageTraceViews);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
Expand All @@ -36,6 +37,7 @@
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
Expand All @@ -61,23 +63,23 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
private RMQConfigure configure;

@Override
public TopicList fetchAllTopicList(boolean skipSysProcess) {
public TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndDlq) {
try {
TopicList allTopics = mqAdminExt.fetchAllTopicList();
if (skipSysProcess) {
return allTopics;
}

TopicList sysTopics = getSystemTopicList();
Set<String> topics = new HashSet<>();

for (String topic : allTopics.getTopicList()) {
if (sysTopics.getTopicList().contains(topic)) {
topics.add(String.format("%s%s", "%SYS%", topic));
} else {
topics.add(topic);
}
}
Set<String> topics =
allTopics.getTopicList().stream().map(topic -> {
if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) {
topic = String.format("%s%s", "%SYS%", topic);
}
return topic;
}).filter(topic -> {
if (skipRetryAndDlq) {
return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
|| topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX));
}
return true;
}).collect(Collectors.toSet());
allTopics.getTopicList().clear();
allTopics.getTopicList().addAll(topics);
return allTopics;
Expand Down Expand Up @@ -209,7 +211,7 @@ public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rp
}

public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook, boolean traceEnabled) {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup, rpcHook, traceEnabled, configure.getMsgTrackTopicNameOrDefault());
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup, rpcHook, traceEnabled, TopicValidator.RMQ_SYS_TRACE_TOPIC);
defaultMQProducer.setUseTLS(configure.isUseTLS());
return defaultMQProducer;
}
Expand Down
5 changes: 4 additions & 1 deletion src/main/resources/static/src/i18n/en.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,8 @@ var en = {
"NO_DATA":"Don't have ",
"SYSTEM":"SYSTEM",
"WELCOME":"Hi, welcome using RocketMQ Dashboard",
"ENABLE_MESSAGE_TRACE":"Enable Message Trace"
"ENABLE_MESSAGE_TRACE":"Enable Message Trace",
"MESSAGE_TRACE_DETAIL":"Message Trace Detail",
"TRACE_TOPIC":"TraceTopic",
"SELECT_TRACE_TOPIC":"selectTraceTopic"
}
5 changes: 4 additions & 1 deletion src/main/resources/static/src/i18n/zh.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,8 @@ var zh = {
"NO_DATA":"不存在 ",
"SYSTEM":"系统",
"WELCOME":"您好,欢迎使用RocketMQ仪表盘",
"ENABLE_MESSAGE_TRACE":"开启消息轨迹"
"ENABLE_MESSAGE_TRACE":"开启消息轨迹",
"MESSAGE_TRACE_DETAIL":"消息轨迹详情",
"TRACE_TOPIC":"消息轨迹主题",
"SELECT_TRACE_TOPIC":"选择消息轨迹主题"
}
2 changes: 1 addition & 1 deletion src/main/resources/static/src/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module.controller('messageController', ['$scope', 'ngDialog', '$http', 'Notifica
method: "GET",
url: "topic/list.query",
params: {
skipSysProcess: 'true'
skipSysProcess: true
}
}).success(function (resp) {
if (resp.status == 0) {
Expand Down
19 changes: 16 additions & 3 deletions src/main/resources/static/src/messageTrace.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ const TIME_FORMAT_PATTERN = "YYYY-MM-DD HH:mm:ss.SSS";
const DEFAULT_DISPLAY_DURATION = 10 * 1000
// transactionTraceNode do not have costTime, assume it cost 50ms
const TRANSACTION_CHECK_COST_TIME = 50;
const RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
const DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
module.controller('messageTraceController', ['$scope', '$routeParams', 'ngDialog', '$http', 'Notification', function ($scope, $routeParams, ngDialog, $http, Notification) {
$scope.allTopicList = [];
$scope.selectedTopic = [];
$scope.allTraceTopicList = [];
$scope.selectedTraceTopic = [];
$scope.key = "";
$scope.messageId = $routeParams.messageId;
$scope.queryMessageByTopicAndKeyResult = [];
Expand All @@ -39,16 +43,25 @@ module.controller('messageTraceController', ['$scope', '$routeParams', 'ngDialog
method: "GET",
url: "topic/list.query",
params: {
skipSysProcess: "true"
skipSysProcess: true
}
}).success(function (resp) {
if (resp.status == 0) {
$scope.allTopicList = resp.data.topicList.sort();
console.log($scope.allTopicList);
console.log($scope.allTopicList)
for (const topic of $scope.allTopicList) {
if (topic.startsWith(RETRY_GROUP_TOPIC_PREFIX)
|| topic.startsWith(DLQ_GROUP_TOPIC_PREFIX)) {
continue;
}
$scope.allTraceTopicList.push(topic);
}
console.log($scope.allTraceTopicList)
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});

$scope.timepickerBegin = moment().subtract(1, 'hour').format('YYYY-MM-DD HH:mm');
$scope.timepickerEnd = moment().add(1, 'hour').format('YYYY-MM-DD HH:mm');
$scope.timepickerOptions = {format: 'YYYY-MM-DD HH:mm', showClear: true};
Expand Down Expand Up @@ -99,7 +112,7 @@ module.controller('messageTraceController', ['$scope', '$routeParams', 'ngDialog
url: "messageTrace/viewMessageTraceGraph.query",
params: {
msgId: messageId,
topic: topic
traceTopic: topic
}
}).success(function (resp) {
if (resp.status == 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/static/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module.controller('producerController', ['$scope', '$http','Notification',functi
method: "GET",
url: "topic/list.query",
params:{
skipSysProcess:"true"
skipSysProcess: true
}
}).success(function (resp) {
if(resp.status ==0){
Expand Down
34 changes: 25 additions & 9 deletions src/main/resources/static/view/pages/messageTrace.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@
~ limitations under the License.
-->
<div class="container-fluid" id="deployHistoryList">
<div class="modal-header">
<div class="row">
<label style="color: #000000">{{ 'TRACE_TOPIC' | translate }}:</label>
<div style="display: inline-block; min-width: 300px">
<select name="mySelect" chosen
ng-model="selectedTraceTopic"
ng-options="item for item in allTraceTopicList"
required>
<option value=""></option>
</select>
</div>
<div style="display: inline-block; color: #BDBDBD">(if no select, it will use RMQ_SYS_TRACE_TOPIC)</div>
</div>
</div>
<div class="modal-body">
<div ng-cloak="" class="tabsdemoDynamicHeight">
<md-content>
Expand Down Expand Up @@ -64,7 +78,7 @@ <h5 class="md-display-5">Only Return 64 Messages</h5>
</td>
<td class="text-center">
<button class="btn btn-raised btn-sm btn-primary" type="button"
ng-click="queryMessageTraceByMessageId(item.msgId,item.topic)">Message Trace Detail
ng-click="queryMessageTraceByMessageId(item.msgId, selectedTraceTopic)">{{ 'MESSAGE_TRACE_DETAIL' | translate }}
</button>
</td>
</tr>
Expand Down Expand Up @@ -114,7 +128,7 @@ <h5 class="md-display-5">topic can't be empty if you producer client version>=v3
</td>
<td class="text-center">
<button class="btn btn-raised btn-sm btn-primary" type="button"
ng-click="queryMessageTraceByMessageId(item.msgId,item.topic)">Message Trace Detail
ng-click="queryMessageTraceByMessageId(item.msgId, selectedTraceTopic)">{{ 'MESSAGE_TRACE_DETAIL' | translate }}
</button>
</td>
</tr>
Expand Down Expand Up @@ -274,14 +288,14 @@ <h3 data-toggle="collapse" data-target="#consumeMessageTrace">
<tr ng-repeat="consumeNode in subscriptionNode.consumeNodeList">
<td class="text-center">
{{consumeNode.beginTimestamp < 0 ? 'N/A' :
(consumeNode.beginTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss')}}
(consumeNode.beginTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss')}}
</td>
<td class="text-center">
{{consumeNode.endTimestamp < 0 ? 'N/A' :
(consumeNode.endTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss')}}
(consumeNode.endTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss')}}
</td>
<td class="text-center">{{consumeNode.costTime < 0 ? 'N/A' :
((consumeNode.costTime === 0 ? '<1' : consumeNode.costTime) + 'ms')}}
((consumeNode.costTime === 0 ? '<1' : consumeNode.costTime) + 'ms')}}
</td>
<td class="text-center">{{consumeNode.status}}</td>
<td class="text-center">
Expand All @@ -302,10 +316,12 @@ <h3 data-toggle="collapse" data-target="#consumeMessageTrace">
</div>
</div>
</md-content>
<div class="ngdialog-buttons">
<button type="button" class="ngdialog-button ngdialog-button-secondary"
ng-click="closeThisDialog('Cancel')">{{ 'CLOSE' | translate }}
</button>
<div class="modal-footer">
<div class="ngdialog-buttons">
<button type="button" class="ngdialog-button ngdialog-button-secondary"
ng-click="closeThisDialog('Cancel')">{{ 'CLOSE' | translate }}
</button>
</div>
</div>
</script>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.rocketmq.dashboard.config;

import java.io.File;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.boot.web.server.ErrorPage;
Expand All @@ -38,7 +37,6 @@ public void testSet() {
rmqConfigure.setIsVIPChannel("true");
rmqConfigure.setUseTLS(true);
rmqConfigure.setLoginRequired(true);
rmqConfigure.setMsgTrackTopicName(null);
rmqConfigure.setNamesrvAddr("127.0.0.1:9876");
rmqConfigure.setTimeoutMillis(3000L);
}
Expand All @@ -55,7 +53,6 @@ public void testGet() {
Assert.assertEquals(rmqConfigure.getIsVIPChannel(), "true");
Assert.assertTrue(rmqConfigure.isEnableDashBoardCollect());
Assert.assertTrue(rmqConfigure.isLoginRequired());
Assert.assertEquals(rmqConfigure.getMsgTrackTopicNameOrDefault(), TopicValidator.RMQ_SYS_TRACE_TOPIC);
Assert.assertEquals(rmqConfigure.getNamesrvAddr(), "127.0.0.1:9876");
Assert.assertEquals(rmqConfigure.getTimeoutMillis().longValue(), 3000L);
ErrorPageRegistrar registrar = rmqConfigure.errorPageRegistrar();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.service.impl.MessageServiceImpl;
import org.apache.rocketmq.dashboard.service.impl.MessageTraceServiceImpl;
import org.apache.rocketmq.dashboard.util.MockObjectUtil;
Expand Down Expand Up @@ -57,7 +56,6 @@ public class MessageTraceControllerTest extends BaseControllerTest {
@Before
public void init() throws MQClientException, InterruptedException {
super.mockRmqConfigure();
when(configure.getMsgTrackTopicNameOrDefault()).thenReturn(TopicValidator.RMQ_SYS_TRACE_TOPIC);
List<MessageExt> messageList = new ArrayList<>(2);
MessageExt messageExt = MockObjectUtil.createMessageExt();
messageExt.setBody(MockObjectUtil.createTraceData().getBytes());
Expand Down
Loading

0 comments on commit 58336d9

Please sign in to comment.