Skip to content

Commit

Permalink
[INLONG-4883][TubeMQ] No error report for incorrect topic subscription (
Browse files Browse the repository at this point in the history
  • Loading branch information
gosonzhang authored and bruceneenhl committed Aug 12, 2022
1 parent 44a7cfe commit 55a6c67
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class TErrCodeConstants {
public static final int CLIENT_INCONSISTENT_SELECTBIG = 428;
public static final int CLIENT_INCONSISTENT_SOURCECOUNT = 429;
public static final int CLIENT_DUPLICATE_INDEXID = 430;
public static final int TOPIC_NOT_DEPLOYED = 431;

public static final int CONSUME_GROUP_FORBIDDEN = 450;
public static final int SERVER_CONSUME_SPEED_LIMIT = 452;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,21 @@ public static ParamCheckResult checkProducerTopicList(final List<String> reqTopi
/**
* Check request topic list of consumer
*
* @param depTopicSet the deployed topic set
* @param reqTopicLst the topic list to be checked.
* @param strBuffer a string buffer used to construct the result
* @param strBuff a string buffer used to construct the result
* @return the check result
*/
public static ParamCheckResult checkConsumerTopicList(final List<String> reqTopicLst,
final StringBuilder strBuffer) {
ParamCheckResult retResult = new ParamCheckResult();
if ((reqTopicLst == null)
|| (reqTopicLst.isEmpty())) {
retResult.setCheckResult(false,
TErrCodeConstants.BAD_REQUEST,
public static boolean checkConsumerTopicList(Set<String> depTopicSet,
List<String> reqTopicLst,
ProcessResult result,
StringBuilder strBuff) {
if ((reqTopicLst == null) || (reqTopicLst.isEmpty())) {
result.setFailResult(TErrCodeConstants.BAD_REQUEST,
"Request miss necessary subscribed topicList data!");
return retResult;
return result.isSuccess();
}
// remove spaces
Set<String> transTopicSet = new HashSet<>();
for (String topicItem : reqTopicLst) {
if (TStringUtils.isBlank(topicItem)) {
Expand All @@ -117,21 +118,34 @@ public static ParamCheckResult checkConsumerTopicList(final List<String> reqTopi
transTopicSet.add(topicItem.trim());
}
if (transTopicSet.isEmpty()) {
retResult.setCheckResult(false,
TErrCodeConstants.BAD_REQUEST,
result.setFailResult(TErrCodeConstants.BAD_REQUEST,
"Request subscribed topicList data must not Blank!");
return retResult;
return result.isSuccess();
}
// check if exceed max topic count booked
if (transTopicSet.size() > TBaseConstants.META_MAX_BOOKED_TOPIC_COUNT) {
retResult.setCheckResult(false,
TErrCodeConstants.BAD_REQUEST,
strBuffer.append("Subscribed topicList size over max value, required max count is ")
result.setFailResult(TErrCodeConstants.BAD_REQUEST,
strBuff.append("Subscribed topicList size over max value, required max count is ")
.append(TBaseConstants.META_MAX_BOOKED_TOPIC_COUNT).toString());
strBuffer.delete(0, strBuffer.length());
return retResult;
strBuff.delete(0, strBuff.length());
return result.isSuccess();
}
retResult.setCheckData(transTopicSet);
return retResult;
// Check if the topics all in deployment
Set<String> invalidTopicSet = new HashSet<>();
for (String reqTopic : transTopicSet) {
if (!depTopicSet.contains(reqTopic)) {
invalidTopicSet.add(reqTopic);
}
}
if (!invalidTopicSet.isEmpty()) {
result.setFailResult(TErrCodeConstants.TOPIC_NOT_DEPLOYED,
strBuff.append("Requested topic [").append(invalidTopicSet)
.append("] not deployed!").toString());
strBuff.delete(0, strBuff.length());
return result.isSuccess();
}
result.setSuccResult(transTopicSet);
return result.isSuccess();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ public static boolean getAndValidTopicNameInfo(HttpServletRequest req,
}
Set<String> topicNameSet = (Set<String>) result.getRetData();
Set<String> existedTopicSet =
defMetaDataService.getTotalConfiguredTopicNames();
defMetaDataService.getDeployedTopicSet();
for (String topic : topicNameSet) {
if (!existedTopicSet.contains(topic)) {
result.setFailResult(sBuffer.append(WebFieldDef.COMPSTOPICNAME.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,13 +552,13 @@ public RegisterResponseM2C consumerRegisterC2M(RegisterRequestC2M request,
return builder.build();
}
final String groupName = (String) paramCheckResult.checkData;
paramCheckResult = PBParameterUtils.checkConsumerTopicList(request.getTopicListList(), strBuffer);
if (!paramCheckResult.result) {
builder.setErrCode(paramCheckResult.errCode);
builder.setErrMsg(paramCheckResult.errMsg);
if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
request.getTopicListList(), result, strBuffer)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
Set<String> reqTopicSet = (Set<String>) paramCheckResult.checkData;
final Set<String> reqTopicSet = (Set<String>) result.getRetData();
String requiredParts = request.hasRequiredPartition() ? request.getRequiredPartition() : "";
ConsumeType csmType = (request.hasRequireBound() && request.getRequireBound())
? ConsumeType.CONSUME_BAND : ConsumeType.CONSUME_NORMAL;
Expand Down Expand Up @@ -1234,15 +1234,13 @@ public RegisterResponseM2CV2 consumerRegisterC2MV2(RegisterRequestC2MV2 request,
return builder.build();
}
final String groupName = (String) paramCheckResult.checkData;
paramCheckResult =
PBParameterUtils.checkConsumerTopicList(
request.getTopicListList(), sBuffer);
if (!paramCheckResult.result) {
builder.setErrCode(paramCheckResult.errCode);
builder.setErrMsg(paramCheckResult.errMsg);
if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
request.getTopicListList(), result, sBuffer)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final Set<String> reqTopicSet = (Set<String>) paramCheckResult.checkData;
final Set<String> reqTopicSet = (Set<String>) result.getRetData();
final Map<String, TreeSet<String>> reqTopicConditions =
DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
int sourceCount = request.getSourceCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,8 @@ public Map<String, Map<Integer, String>> getTopicBrokerConfigInfo(Set<String> to
}

@Override
public Set<String> getTotalConfiguredTopicNames() {
return metaConfigMapper.getConfiguredTopicSet();
public Set<String> getDeployedTopicSet() {
return metaConfigMapper.getDeployedTopicSet();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ boolean delCleanedTopicDeployInfo(int brokerId, List<String> removedTopics,
*
* @return the deployed topic set
*/
Set<String> getTotalConfiguredTopicNames();
Set<String> getDeployedTopicSet();

/**
* Get broker configure entity by broker id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ Map<String, List<TopicDeployEntity>> getTopicConfInfoByTopicAndBrokerIds(
*
* @return the deployed topic set
*/
Set<String> getConfiguredTopicSet();
Set<String> getDeployedTopicSet();

// ////////////////////////////////////////////////////////////

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,6 @@ Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> topicNam

Map<String/* topicName */, Map<Integer, String>> getTopicBrokerInfo(Set<String> topicNameSet);

Set<String> getConfiguredTopicSet();
Set<String> getDeployedTopicSet();

}
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,8 @@ public Map<String, Map<Integer, String>> getTopicBrokerInfo(Set<String> topicNam
}

@Override
public Set<String> getConfiguredTopicSet() {
return topicDeployMapper.getConfiguredTopicSet();
public Set<String> getDeployedTopicSet() {
return topicDeployMapper.getDeployedTopicSet();
}

// //////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ public Map<String, Map<Integer, String>> getTopicBrokerInfo(Set<String> topicNam
}

@Override
public Set<String> getConfiguredTopicSet() {
public Set<String> getDeployedTopicSet() {
return new HashSet<>(topicName2RecordCache.keySet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ private boolean getFilterJsonSetInfo(HttpServletRequest req, boolean isAddOp,
GroupConsumeCtrlEntity itemEntity;
Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
defMetaDataService.getTotalConfiguredTopicNames();
defMetaDataService.getDeployedTopicSet();
for (Map<String, String> itemValueMap : groupJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
Expand Down Expand Up @@ -1349,7 +1349,7 @@ private boolean getGroupCtrlJsonSetInfo(HttpServletRequest req, BaseEntity defOp
Map<String, String> itemValueMap;
Map<String, GroupResCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
defMetaDataService.getTotalConfiguredTopicNames();
defMetaDataService.getDeployedTopicSet();
for (int j = 0; j < groupJsonArray.size(); j++) {
itemValueMap = groupJsonArray.get(j);
// check and get operation info
Expand Down Expand Up @@ -1409,7 +1409,7 @@ private boolean getGroupCsmJsonSetInfo(HttpServletRequest req, BaseEntity defOpE
GroupConsumeCtrlEntity itemEntity;
Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
defMetaDataService.getTotalConfiguredTopicNames();
defMetaDataService.getDeployedTopicSet();
for (Map<String, String> itemValueMap : groupJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ private boolean getGroupConsumeJsonSetInfo(HttpServletRequest req, boolean isAdd
GroupConsumeCtrlEntity itemConf;
Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
defMetaDataService.getTotalConfiguredTopicNames();
defMetaDataService.getDeployedTopicSet();
for (Map<String, String> itemsMap : filterJsonArray) {
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(itemsMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.inlong.tubemq.server.common;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.server.common.paramcheck.PBParameterUtils;
import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
import org.junit.Assert;
Expand All @@ -43,17 +46,26 @@ public void checkProducerTopicTest() {

@Test
public void checkConsumerTopicTest() {
ParamCheckResult result = PBParameterUtils.checkConsumerTopicList(null, null);
Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST);
final List<String> topicList = new ArrayList<>();
topicList.add("test1");
result = PBParameterUtils.checkConsumerTopicList(topicList, new StringBuilder(128));
Assert.assertEquals(result.errCode, TErrCodeConstants.SUCCESS);
ProcessResult result = new ProcessResult();
PBParameterUtils.checkConsumerTopicList(null, null, result, null);
Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST);
final Set<String> depTopicList = new HashSet<>();
final List<String> reqTopicList = new ArrayList<>();
depTopicList.add("test1");
reqTopicList.add("test1");
PBParameterUtils.checkConsumerTopicList(depTopicList,
reqTopicList, result, new StringBuilder(128));
Assert.assertEquals(result.getErrCode(), TErrCodeConstants.SUCCESS);
reqTopicList.add("test2");
PBParameterUtils.checkConsumerTopicList(depTopicList,
reqTopicList, result, new StringBuilder(128));
Assert.assertEquals(result.getErrCode(), TErrCodeConstants.TOPIC_NOT_DEPLOYED);
for (int i = 0; i < 1025; i++) {
topicList.add("test" + i);
reqTopicList.add("test" + i);
}
result = PBParameterUtils.checkConsumerTopicList(topicList, new StringBuilder(128));
Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST);
PBParameterUtils.checkConsumerTopicList(depTopicList,
reqTopicList, result, new StringBuilder(128));
Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST);
}

@Test
Expand Down

0 comments on commit 55a6c67

Please sign in to comment.