Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #19]Message track query enhancement #21

Merged
merged 4 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand Down Expand Up @@ -48,8 +47,6 @@ public class RMQConfigure {

private boolean enableDashBoardCollect;

private String msgTrackTopicName;

private boolean loginRequired = false;

private String accessKey;
Expand Down Expand Up @@ -123,17 +120,6 @@ public void setEnableDashBoardCollect(String enableDashBoardCollect) {
this.enableDashBoardCollect = Boolean.valueOf(enableDashBoardCollect);
}

public String getMsgTrackTopicNameOrDefault() {
if (StringUtils.isEmpty(msgTrackTopicName)) {
return TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
return msgTrackTopicName;
}

public void setMsgTrackTopicName(String msgTrackTopicName) {
this.msgTrackTopicName = msgTrackTopicName;
}

public boolean isLoginRequired() {
return loginRequired;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public Object viewTraceMessages(@RequestParam String msgId) {

@RequestMapping(value = "/viewMessageTraceGraph.query", method = RequestMethod.GET)
@ResponseBody
public MessageTraceGraph viewMessageTraceGraph(@RequestParam String msgId) {
return messageTraceService.queryMessageTraceGraph(msgId);
public MessageTraceGraph viewMessageTraceGraph(@RequestParam String msgId,
@RequestParam(required = false) String traceTopic) {
return messageTraceService.queryMessageTraceGraph(msgId, traceTopic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,9 @@ public class TopicController {

@RequestMapping(value = "/list.query", method = RequestMethod.GET)
@ResponseBody
public Object list(@RequestParam(value = "skipSysProcess", required = false) String skipSysProcess) {
boolean flag = false;
if ("true".equals(skipSysProcess)) {
flag = true;
}
return topicService.fetchAllTopicList(flag);
public Object list(@RequestParam(value = "skipSysProcess", required = false) boolean skipSysProcess,
@RequestParam(value = "skipRetryAndDlq", required = false) boolean skipRetryAndDlq) {
return topicService.fetchAllTopicList(skipSysProcess, skipRetryAndDlq);
}

@RequestMapping(value = "/stats.query", method = RequestMethod.GET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ public interface MessageTraceService {

List<MessageTraceView> queryMessageTraceByTopicAndKey(final String topic, final String key);

MessageTraceGraph queryMessageTraceGraph(final String key);
MessageTraceGraph queryMessageTraceGraph(final String key, final String traceTopic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.List;

public interface TopicService {
TopicList fetchAllTopicList(boolean skipSysProcess);
TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndDlq);

TopicStatsTable stats(String topic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.MessageTraceView;
import org.apache.rocketmq.dashboard.model.trace.ProducerNode;
Expand Down Expand Up @@ -65,7 +66,7 @@ public class MessageTraceServiceImpl implements MessageTraceService {

@Override
public List<MessageTraceView> queryMessageTraceKey(String key) {
String queryTopic = configure.getMsgTrackTopicNameOrDefault();
String queryTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
logger.info("query data topic name is:{}", queryTopic);
return queryMessageTraceByTopicAndKey(queryTopic, key);
}
Expand All @@ -86,8 +87,11 @@ public List<MessageTraceView> queryMessageTraceByTopicAndKey(String topic, Strin
}

@Override
public MessageTraceGraph queryMessageTraceGraph(String key) {
List<MessageTraceView> messageTraceViews = queryMessageTraceKey(key);
public MessageTraceGraph queryMessageTraceGraph(String key, String topic) {
if (StringUtils.isEmpty(topic)) {
topic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
List<MessageTraceView> messageTraceViews = queryMessageTraceByTopicAndKey(topic, key);
return buildMessageTraceGraph(messageTraceViews);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
Expand All @@ -36,6 +37,7 @@
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
Expand All @@ -61,23 +63,23 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ
private RMQConfigure configure;

@Override
public TopicList fetchAllTopicList(boolean skipSysProcess) {
public TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndDlq) {
try {
TopicList allTopics = mqAdminExt.fetchAllTopicList();
if (skipSysProcess) {
return allTopics;
}

TopicList sysTopics = getSystemTopicList();
Set<String> topics = new HashSet<>();

for (String topic : allTopics.getTopicList()) {
if (sysTopics.getTopicList().contains(topic)) {
topics.add(String.format("%s%s", "%SYS%", topic));
} else {
topics.add(topic);
}
}
Set<String> topics =
allTopics.getTopicList().stream().map(topic -> {
if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) {
topic = String.format("%s%s", "%SYS%", topic);
}
return topic;
}).filter(topic -> {
if (skipRetryAndDlq) {
return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
|| topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX));
}
return true;
}).collect(Collectors.toSet());
allTopics.getTopicList().clear();
allTopics.getTopicList().addAll(topics);
Comment on lines 83 to 84
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trivial. (and it is not introduced by this PR, I'm OK to merge it)
can just use

allTopics.setTopicList(topics);

return allTopics;
Expand Down Expand Up @@ -209,7 +211,7 @@ public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rp
}

public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook, boolean traceEnabled) {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup, rpcHook, traceEnabled, configure.getMsgTrackTopicNameOrDefault());
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup, rpcHook, traceEnabled, TopicValidator.RMQ_SYS_TRACE_TOPIC);
defaultMQProducer.setUseTLS(configure.isUseTLS());
return defaultMQProducer;
}
Expand Down
5 changes: 4 additions & 1 deletion src/main/resources/static/src/i18n/en.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,8 @@ var en = {
"NO_DATA":"Don't have ",
"SYSTEM":"SYSTEM",
"WELCOME":"Hi, welcome using RocketMQ Dashboard",
"ENABLE_MESSAGE_TRACE":"Enable Message Trace"
"ENABLE_MESSAGE_TRACE":"Enable Message Trace",
"MESSAGE_TRACE_DETAIL":"Message Trace Detail",
"TRACE_TOPIC":"TraceTopic",
"SELECT_TRACE_TOPIC":"selectTraceTopic"
}
5 changes: 4 additions & 1 deletion src/main/resources/static/src/i18n/zh.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,8 @@ var zh = {
"NO_DATA":"不存在 ",
"SYSTEM":"系统",
"WELCOME":"您好,欢迎使用RocketMQ仪表盘",
"ENABLE_MESSAGE_TRACE":"开启消息轨迹"
"ENABLE_MESSAGE_TRACE":"开启消息轨迹",
"MESSAGE_TRACE_DETAIL":"消息轨迹详情",
"TRACE_TOPIC":"消息轨迹主题",
"SELECT_TRACE_TOPIC":"选择消息轨迹主题"
}
2 changes: 1 addition & 1 deletion src/main/resources/static/src/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module.controller('messageController', ['$scope', 'ngDialog', '$http', 'Notifica
method: "GET",
url: "topic/list.query",
params: {
skipSysProcess: 'true'
skipSysProcess: true
}
}).success(function (resp) {
if (resp.status == 0) {
Expand Down
19 changes: 16 additions & 3 deletions src/main/resources/static/src/messageTrace.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ const TIME_FORMAT_PATTERN = "YYYY-MM-DD HH:mm:ss.SSS";
const DEFAULT_DISPLAY_DURATION = 10 * 1000
// transactionTraceNode do not have costTime, assume it cost 50ms
const TRANSACTION_CHECK_COST_TIME = 50;
const RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
const DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
module.controller('messageTraceController', ['$scope', '$routeParams', 'ngDialog', '$http', 'Notification', function ($scope, $routeParams, ngDialog, $http, Notification) {
$scope.allTopicList = [];
$scope.selectedTopic = [];
$scope.allTraceTopicList = [];
$scope.selectedTraceTopic = [];
$scope.key = "";
$scope.messageId = $routeParams.messageId;
$scope.queryMessageByTopicAndKeyResult = [];
Expand All @@ -39,16 +43,25 @@ module.controller('messageTraceController', ['$scope', '$routeParams', 'ngDialog
method: "GET",
url: "topic/list.query",
params: {
skipSysProcess: "true"
skipSysProcess: true
}
}).success(function (resp) {
if (resp.status == 0) {
$scope.allTopicList = resp.data.topicList.sort();
console.log($scope.allTopicList);
console.log($scope.allTopicList)
for (const topic of $scope.allTopicList) {
if (topic.startsWith(RETRY_GROUP_TOPIC_PREFIX)
|| topic.startsWith(DLQ_GROUP_TOPIC_PREFIX)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
It’s great to delete useless system topics

continue;
}
$scope.allTraceTopicList.push(topic);
}
console.log($scope.allTraceTopicList)
} else {
Notification.error({message: resp.errMsg, delay: 2000});
}
});

$scope.timepickerBegin = moment().subtract(1, 'hour').format('YYYY-MM-DD HH:mm');
$scope.timepickerEnd = moment().add(1, 'hour').format('YYYY-MM-DD HH:mm');
$scope.timepickerOptions = {format: 'YYYY-MM-DD HH:mm', showClear: true};
Expand Down Expand Up @@ -99,7 +112,7 @@ module.controller('messageTraceController', ['$scope', '$routeParams', 'ngDialog
url: "messageTrace/viewMessageTraceGraph.query",
params: {
msgId: messageId,
topic: topic
traceTopic: topic
}
}).success(function (resp) {
if (resp.status == 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/static/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module.controller('producerController', ['$scope', '$http','Notification',functi
method: "GET",
url: "topic/list.query",
params:{
skipSysProcess:"true"
skipSysProcess: true
}
}).success(function (resp) {
if(resp.status ==0){
Expand Down
34 changes: 25 additions & 9 deletions src/main/resources/static/view/pages/messageTrace.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@
~ limitations under the License.
-->
<div class="container-fluid" id="deployHistoryList">
<div class="modal-header">
<div class="row">
<label style="color: #000000">{{ 'TRACE_TOPIC' | translate }}:</label>
<div style="display: inline-block; min-width: 300px">
<select name="mySelect" chosen
ng-model="selectedTraceTopic"
ng-options="item for item in allTraceTopicList"
required>
<option value=""></option>
</select>
</div>
<div style="display: inline-block; color: #BDBDBD">(if no select, it will use RMQ_SYS_TRACE_TOPIC)</div>
</div>
</div>
<div class="modal-body">
<div ng-cloak="" class="tabsdemoDynamicHeight">
<md-content>
Expand Down Expand Up @@ -64,7 +78,7 @@ <h5 class="md-display-5">Only Return 64 Messages</h5>
</td>
<td class="text-center">
<button class="btn btn-raised btn-sm btn-primary" type="button"
ng-click="queryMessageTraceByMessageId(item.msgId,item.topic)">Message Trace Detail
ng-click="queryMessageTraceByMessageId(item.msgId, selectedTraceTopic)">{{ 'MESSAGE_TRACE_DETAIL' | translate }}
</button>
</td>
</tr>
Expand Down Expand Up @@ -114,7 +128,7 @@ <h5 class="md-display-5">topic can't be empty if you producer client version>=v3
</td>
<td class="text-center">
<button class="btn btn-raised btn-sm btn-primary" type="button"
ng-click="queryMessageTraceByMessageId(item.msgId,item.topic)">Message Trace Detail
ng-click="queryMessageTraceByMessageId(item.msgId, selectedTraceTopic)">{{ 'MESSAGE_TRACE_DETAIL' | translate }}
</button>
</td>
</tr>
Expand Down Expand Up @@ -274,14 +288,14 @@ <h3 data-toggle="collapse" data-target="#consumeMessageTrace">
<tr ng-repeat="consumeNode in subscriptionNode.consumeNodeList">
<td class="text-center">
{{consumeNode.beginTimestamp < 0 ? 'N/A' :
(consumeNode.beginTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss')}}
(consumeNode.beginTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss')}}
</td>
<td class="text-center">
{{consumeNode.endTimestamp < 0 ? 'N/A' :
(consumeNode.endTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss')}}
(consumeNode.endTimestamp | date:'yyyy-MM-dd HH:mm:ss.sss')}}
</td>
<td class="text-center">{{consumeNode.costTime < 0 ? 'N/A' :
((consumeNode.costTime === 0 ? '<1' : consumeNode.costTime) + 'ms')}}
((consumeNode.costTime === 0 ? '<1' : consumeNode.costTime) + 'ms')}}
</td>
<td class="text-center">{{consumeNode.status}}</td>
<td class="text-center">
Expand All @@ -302,10 +316,12 @@ <h3 data-toggle="collapse" data-target="#consumeMessageTrace">
</div>
</div>
</md-content>
<div class="ngdialog-buttons">
<button type="button" class="ngdialog-button ngdialog-button-secondary"
ng-click="closeThisDialog('Cancel')">{{ 'CLOSE' | translate }}
</button>
<div class="modal-footer">
<div class="ngdialog-buttons">
<button type="button" class="ngdialog-button ngdialog-button-secondary"
ng-click="closeThisDialog('Cancel')">{{ 'CLOSE' | translate }}
</button>
</div>
</div>
</script>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.rocketmq.dashboard.config;

import java.io.File;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.boot.web.server.ErrorPage;
Expand All @@ -38,7 +37,6 @@ public void testSet() {
rmqConfigure.setIsVIPChannel("true");
rmqConfigure.setUseTLS(true);
rmqConfigure.setLoginRequired(true);
rmqConfigure.setMsgTrackTopicName(null);
rmqConfigure.setNamesrvAddr("127.0.0.1:9876");
rmqConfigure.setTimeoutMillis(3000L);
}
Expand All @@ -55,7 +53,6 @@ public void testGet() {
Assert.assertEquals(rmqConfigure.getIsVIPChannel(), "true");
Assert.assertTrue(rmqConfigure.isEnableDashBoardCollect());
Assert.assertTrue(rmqConfigure.isLoginRequired());
Assert.assertEquals(rmqConfigure.getMsgTrackTopicNameOrDefault(), TopicValidator.RMQ_SYS_TRACE_TOPIC);
Assert.assertEquals(rmqConfigure.getNamesrvAddr(), "127.0.0.1:9876");
Assert.assertEquals(rmqConfigure.getTimeoutMillis().longValue(), 3000L);
ErrorPageRegistrar registrar = rmqConfigure.errorPageRegistrar();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.service.impl.MessageServiceImpl;
import org.apache.rocketmq.dashboard.service.impl.MessageTraceServiceImpl;
import org.apache.rocketmq.dashboard.util.MockObjectUtil;
Expand Down Expand Up @@ -57,7 +56,6 @@ public class MessageTraceControllerTest extends BaseControllerTest {
@Before
public void init() throws MQClientException, InterruptedException {
super.mockRmqConfigure();
when(configure.getMsgTrackTopicNameOrDefault()).thenReturn(TopicValidator.RMQ_SYS_TRACE_TOPIC);
List<MessageExt> messageList = new ArrayList<>(2);
MessageExt messageExt = MockObjectUtil.createMessageExt();
messageExt.setBody(MockObjectUtil.createTraceData().getBytes());
Expand Down
Loading