From 55a6c67dc1ef86d52dd949404be8061879ba40b0 Mon Sep 17 00:00:00 2001 From: gosonzhang <4675739@qq.com> Date: Fri, 8 Jul 2022 09:32:21 +0800 Subject: [PATCH] [INLONG-4883][TubeMQ] No error report for incorrect topic subscription (#4913) --- .../tubemq/corebase/TErrCodeConstants.java | 1 + .../common/paramcheck/PBParameterUtils.java | 52 ++++++++++++------- .../common/utils/WebParameterUtils.java | 2 +- .../inlong/tubemq/server/master/TMaster.java | 22 ++++---- .../metamanage/DefaultMetaDataService.java | 4 +- .../master/metamanage/MetaDataService.java | 2 +- .../dao/mapper/MetaConfigMapper.java | 2 +- .../dao/mapper/TopicDeployMapper.java | 2 +- .../impl/AbsMetaConfigMapperImpl.java | 4 +- .../impl/AbsTopicDeployMapperImpl.java | 2 +- .../web/handler/WebAdminGroupCtrlHandler.java | 6 +-- .../handler/WebGroupConsumeCtrlHandler.java | 2 +- .../tubemq/server/common/PBParameterTest.java | 30 +++++++---- 13 files changed, 78 insertions(+), 53 deletions(-) diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java index 30ff82715fe..b3487c20c11 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java @@ -50,6 +50,7 @@ public class TErrCodeConstants { public static final int CLIENT_INCONSISTENT_SELECTBIG = 428; public static final int CLIENT_INCONSISTENT_SOURCECOUNT = 429; public static final int CLIENT_DUPLICATE_INDEXID = 430; + public static final int TOPIC_NOT_DEPLOYED = 431; public static final int CONSUME_GROUP_FORBIDDEN = 450; public static final int SERVER_CONSUME_SPEED_LIMIT = 452; diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java index 3cbadaa1b68..9f3a9849e95 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java @@ -95,20 +95,21 @@ public static ParamCheckResult checkProducerTopicList(final List reqTopi /** * Check request topic list of consumer * + * @param depTopicSet the deployed topic set * @param reqTopicLst the topic list to be checked. - * @param strBuffer a string buffer used to construct the result + * @param strBuff a string buffer used to construct the result * @return the check result */ - public static ParamCheckResult checkConsumerTopicList(final List reqTopicLst, - final StringBuilder strBuffer) { - ParamCheckResult retResult = new ParamCheckResult(); - if ((reqTopicLst == null) - || (reqTopicLst.isEmpty())) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, + public static boolean checkConsumerTopicList(Set depTopicSet, + List reqTopicLst, + ProcessResult result, + StringBuilder strBuff) { + if ((reqTopicLst == null) || (reqTopicLst.isEmpty())) { + result.setFailResult(TErrCodeConstants.BAD_REQUEST, "Request miss necessary subscribed topicList data!"); - return retResult; + return result.isSuccess(); } + // remove spaces Set transTopicSet = new HashSet<>(); for (String topicItem : reqTopicLst) { if (TStringUtils.isBlank(topicItem)) { @@ -117,21 +118,34 @@ public static ParamCheckResult checkConsumerTopicList(final List reqTopi transTopicSet.add(topicItem.trim()); } if (transTopicSet.isEmpty()) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, + result.setFailResult(TErrCodeConstants.BAD_REQUEST, "Request subscribed topicList data must not Blank!"); - return retResult; + return result.isSuccess(); } + // check if exceed max topic count booked if (transTopicSet.size() > TBaseConstants.META_MAX_BOOKED_TOPIC_COUNT) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, - strBuffer.append("Subscribed topicList size over max value, required max count is ") + result.setFailResult(TErrCodeConstants.BAD_REQUEST, + strBuff.append("Subscribed topicList size over max value, required max count is ") .append(TBaseConstants.META_MAX_BOOKED_TOPIC_COUNT).toString()); - strBuffer.delete(0, strBuffer.length()); - return retResult; + strBuff.delete(0, strBuff.length()); + return result.isSuccess(); } - retResult.setCheckData(transTopicSet); - return retResult; + // Check if the topics all in deployment + Set invalidTopicSet = new HashSet<>(); + for (String reqTopic : transTopicSet) { + if (!depTopicSet.contains(reqTopic)) { + invalidTopicSet.add(reqTopic); + } + } + if (!invalidTopicSet.isEmpty()) { + result.setFailResult(TErrCodeConstants.TOPIC_NOT_DEPLOYED, + strBuff.append("Requested topic [").append(invalidTopicSet) + .append("] not deployed!").toString()); + strBuff.delete(0, strBuff.length()); + return result.isSuccess(); + } + result.setSuccResult(transTopicSet); + return result.isSuccess(); } /** diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java index 65c9413b1f2..e382b0f92eb 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java @@ -829,7 +829,7 @@ public static boolean getAndValidTopicNameInfo(HttpServletRequest req, } Set topicNameSet = (Set) result.getRetData(); Set existedTopicSet = - defMetaDataService.getTotalConfiguredTopicNames(); + defMetaDataService.getDeployedTopicSet(); for (String topic : topicNameSet) { if (!existedTopicSet.contains(topic)) { result.setFailResult(sBuffer.append(WebFieldDef.COMPSTOPICNAME.name) diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java index 5a611e80425..edef4c26979 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java @@ -552,13 +552,13 @@ public RegisterResponseM2C consumerRegisterC2M(RegisterRequestC2M request, return builder.build(); } final String groupName = (String) paramCheckResult.checkData; - paramCheckResult = PBParameterUtils.checkConsumerTopicList(request.getTopicListList(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(), + request.getTopicListList(), result, strBuffer)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - Set reqTopicSet = (Set) paramCheckResult.checkData; + final Set reqTopicSet = (Set) result.getRetData(); String requiredParts = request.hasRequiredPartition() ? request.getRequiredPartition() : ""; ConsumeType csmType = (request.hasRequireBound() && request.getRequireBound()) ? ConsumeType.CONSUME_BAND : ConsumeType.CONSUME_NORMAL; @@ -1234,15 +1234,13 @@ public RegisterResponseM2CV2 consumerRegisterC2MV2(RegisterRequestC2MV2 request, return builder.build(); } final String groupName = (String) paramCheckResult.checkData; - paramCheckResult = - PBParameterUtils.checkConsumerTopicList( - request.getTopicListList(), sBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(), + request.getTopicListList(), result, sBuffer)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final Set reqTopicSet = (Set) paramCheckResult.checkData; + final Set reqTopicSet = (Set) result.getRetData(); final Map> reqTopicConditions = DataConverterUtil.convertTopicConditions(request.getTopicConditionList()); int sourceCount = request.getSourceCount(); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java index f2c707280d4..273604b5c22 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java @@ -556,8 +556,8 @@ public Map> getTopicBrokerConfigInfo(Set to } @Override - public Set getTotalConfiguredTopicNames() { - return metaConfigMapper.getConfiguredTopicSet(); + public Set getDeployedTopicSet() { + return metaConfigMapper.getDeployedTopicSet(); } @Override diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java index e6e382a5bc0..95f94270b7d 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java @@ -269,7 +269,7 @@ boolean delCleanedTopicDeployInfo(int brokerId, List removedTopics, * * @return the deployed topic set */ - Set getTotalConfiguredTopicNames(); + Set getDeployedTopicSet(); /** * Get broker configure entity by broker id diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java index 0ce5456b786..8acfb75d929 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java @@ -361,7 +361,7 @@ Map> getTopicConfInfoByTopicAndBrokerIds( * * @return the deployed topic set */ - Set getConfiguredTopicSet(); + Set getDeployedTopicSet(); // //////////////////////////////////////////////////////////// diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java index 05cb02bdf61..26d782b9f24 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java @@ -118,6 +118,6 @@ Map> getTopicDeployInfoMap(Set topicNam Map> getTopicBrokerInfo(Set topicNameSet); - Set getConfiguredTopicSet(); + Set getDeployedTopicSet(); } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java index d2d39a033a2..9e14215085a 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java @@ -869,8 +869,8 @@ public Map> getTopicBrokerInfo(Set topicNam } @Override - public Set getConfiguredTopicSet() { - return topicDeployMapper.getConfiguredTopicSet(); + public Set getDeployedTopicSet() { + return topicDeployMapper.getDeployedTopicSet(); } // ////////////////////////////////////////////////////////////////////////////// diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java index b0b40c7e6d3..5ed2e5102b1 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java @@ -447,7 +447,7 @@ public Map> getTopicBrokerInfo(Set topicNam } @Override - public Set getConfiguredTopicSet() { + public Set getDeployedTopicSet() { return new HashSet<>(topicName2RecordCache.keySet()); } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java index dc9f688dcc5..902a8b3717e 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java @@ -1279,7 +1279,7 @@ private boolean getFilterJsonSetInfo(HttpServletRequest req, boolean isAddOp, GroupConsumeCtrlEntity itemEntity; Map addRecordMap = new HashMap<>(); Set configuredTopicSet = - defMetaDataService.getTotalConfiguredTopicNames(); + defMetaDataService.getDeployedTopicSet(); for (Map itemValueMap : groupJsonArray) { // check and get operation info if (!WebParameterUtils.getAUDBaseInfo(itemValueMap, @@ -1349,7 +1349,7 @@ private boolean getGroupCtrlJsonSetInfo(HttpServletRequest req, BaseEntity defOp Map itemValueMap; Map addRecordMap = new HashMap<>(); Set configuredTopicSet = - defMetaDataService.getTotalConfiguredTopicNames(); + defMetaDataService.getDeployedTopicSet(); for (int j = 0; j < groupJsonArray.size(); j++) { itemValueMap = groupJsonArray.get(j); // check and get operation info @@ -1409,7 +1409,7 @@ private boolean getGroupCsmJsonSetInfo(HttpServletRequest req, BaseEntity defOpE GroupConsumeCtrlEntity itemEntity; Map addRecordMap = new HashMap<>(); Set configuredTopicSet = - defMetaDataService.getTotalConfiguredTopicNames(); + defMetaDataService.getDeployedTopicSet(); for (Map itemValueMap : groupJsonArray) { // check and get operation info if (!WebParameterUtils.getAUDBaseInfo(itemValueMap, diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java index 18b52767775..347a46183db 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java @@ -421,7 +421,7 @@ private boolean getGroupConsumeJsonSetInfo(HttpServletRequest req, boolean isAdd GroupConsumeCtrlEntity itemConf; Map addRecordMap = new HashMap<>(); Set configuredTopicSet = - defMetaDataService.getTotalConfiguredTopicNames(); + defMetaDataService.getDeployedTopicSet(); for (Map itemsMap : filterJsonArray) { // check and get operation info if (!WebParameterUtils.getAUDBaseInfo(itemsMap, diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java index da69dd8d07a..5ddced8c7c5 100644 --- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java +++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java @@ -18,8 +18,11 @@ package org.apache.inlong.tubemq.server.common; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.inlong.tubemq.corebase.TErrCodeConstants; +import org.apache.inlong.tubemq.corebase.rv.ProcessResult; import org.apache.inlong.tubemq.server.common.paramcheck.PBParameterUtils; import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult; import org.junit.Assert; @@ -43,17 +46,26 @@ public void checkProducerTopicTest() { @Test public void checkConsumerTopicTest() { - ParamCheckResult result = PBParameterUtils.checkConsumerTopicList(null, null); - Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST); - final List topicList = new ArrayList<>(); - topicList.add("test1"); - result = PBParameterUtils.checkConsumerTopicList(topicList, new StringBuilder(128)); - Assert.assertEquals(result.errCode, TErrCodeConstants.SUCCESS); + ProcessResult result = new ProcessResult(); + PBParameterUtils.checkConsumerTopicList(null, null, result, null); + Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST); + final Set depTopicList = new HashSet<>(); + final List reqTopicList = new ArrayList<>(); + depTopicList.add("test1"); + reqTopicList.add("test1"); + PBParameterUtils.checkConsumerTopicList(depTopicList, + reqTopicList, result, new StringBuilder(128)); + Assert.assertEquals(result.getErrCode(), TErrCodeConstants.SUCCESS); + reqTopicList.add("test2"); + PBParameterUtils.checkConsumerTopicList(depTopicList, + reqTopicList, result, new StringBuilder(128)); + Assert.assertEquals(result.getErrCode(), TErrCodeConstants.TOPIC_NOT_DEPLOYED); for (int i = 0; i < 1025; i++) { - topicList.add("test" + i); + reqTopicList.add("test" + i); } - result = PBParameterUtils.checkConsumerTopicList(topicList, new StringBuilder(128)); - Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST); + PBParameterUtils.checkConsumerTopicList(depTopicList, + reqTopicList, result, new StringBuilder(128)); + Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST); } @Test