Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9060][Manager] Fix manager return wrong sink configuration to sort standalone #9067

Merged
merged 19 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
select
inlong_group_id,
inlong_stream_id,
sink_id,
field_name
from stream_sink_field
where is_deleted = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ public class SortFieldInfo {

private String inlongGroupId;
private String inlongStreamId;
private Integer sinkId;
private String fieldName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
import org.apache.inlong.manager.service.core.SortClusterService;
import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.apache.inlong.manager.service.node.DataNodeOperator;
Expand All @@ -34,6 +37,7 @@

import com.google.gson.Gson;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,14 +80,18 @@ public class SortClusterServiceImpl implements SortClusterService {

private static final String KEY_GROUP_ID = "inlongGroupId";
private static final String KEY_STREAM_ID = "inlongStreamId";
private Map<String, List<String>> fieldMap;
private static final String FILED_OFFSET = "fieldOffset";
// key: sink id, value: fileNames
private Map<Integer, List<String>> fieldMap;

// key : sort cluster name, value : md5
private Map<String, String> sortClusterMd5Map = new ConcurrentHashMap<>();
// key : sort cluster name, value : cluster config
private Map<String, SortClusterConfig> sortClusterConfigMap = new ConcurrentHashMap<>();
// key : sort cluster name, value : error log
private Map<String, String> sortClusterErrorLogMap = new ConcurrentHashMap<>();
// key: group id ,value: {key: stream id, value: stream info}
private Map<String, Map<String, SortSourceStreamInfo>> allStreams;

private long reloadInterval;

Expand Down Expand Up @@ -169,7 +177,7 @@ private void reloadAllClusterConfig() {
List<SortFieldInfo> fieldInfos = sortConfigLoader.loadAllFields();
fieldMap = new HashMap<>();
fieldInfos.forEach(info -> {
List<String> fields = fieldMap.computeIfAbsent(info.getInlongGroupId(), k -> new ArrayList<>());
List<String> fields = fieldMap.computeIfAbsent(info.getSinkId(), k -> new ArrayList<>());
fields.add(info.getFieldName());
});

Expand All @@ -183,6 +191,12 @@ private void reloadAllClusterConfig() {
&& StringUtils.isNotBlank(dto.getSinkType()))
.collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));

// reload all streams
allStreams = sortConfigLoader.loadAllStreams()
.stream()
.collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId,
Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info)));

// get all stream sinks
Map<String, List<StreamSinkEntity>> task2AllStreams = sinkEntities.stream()
.filter(entity -> StringUtils.isNotBlank(entity.getInlongClusterName()))
Expand Down Expand Up @@ -265,8 +279,10 @@ private List<Map<String, String>> parseIdParams(List<StreamSinkEntity> streams,
.map(streamSink -> {
try {
StreamSinkOperator operator = sinkOperatorFactory.getInstance(streamSink.getSinkType());
List<String> fields = fieldMap.get(streamSink.getInlongGroupId());
return operator.parse2IdParams(streamSink, fields, dataNodeInfo);
List<String> fields = fieldMap.get(streamSink.getId());
Map<String, String> params = operator.parse2IdParams(streamSink, fields, dataNodeInfo);
setFiledOffset(streamSink, params);
return params;
} catch (Exception e) {
LOGGER.error("fail to parse id params of groupId={}, streamId={} name={}, type={}}",
streamSink.getInlongGroupId(), streamSink.getInlongStreamId(),
Expand All @@ -278,6 +294,17 @@ private List<Map<String, String>> parseIdParams(List<StreamSinkEntity> streams,
.collect(Collectors.toList());
}

private void setFiledOffset(StreamSinkEntity streamSink, Map<String, String> params) {

SortSourceStreamInfo sortSourceStreamInfo = allStreams.get(streamSink.getInlongGroupId())
.get(streamSink.getInlongStreamId());
InlongStreamExtParam inlongStreamExtParam = JsonUtils.parseObject(
sortSourceStreamInfo.getExtParams(), InlongStreamExtParam.class);
if (ObjectUtils.anyNotNull(inlongStreamExtParam) && !inlongStreamExtParam.getUseExtendedFields()) {
params.put(FILED_OFFSET, String.valueOf(0));
}
}

private Map<String, String> parseSinkParams(DataNodeInfo nodeInfo) {
DataNodeOperator operator = dataNodeOperatorFactory.getInstance(nodeInfo.getType());
return operator.parse2SinkParams(nodeInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
Expand All @@ -55,26 +56,28 @@
@Service
public class ClsOperator {

@Value("${cls.manager.endpoint}")
castorqin marked this conversation as resolved.
Show resolved Hide resolved
private String endpoint;
private static final Logger LOG = LoggerFactory.getLogger(ClsOperator.class);
private static final String TOPIC_NAME = "topicName";
private static final String LOG_SET_ID = "logsetId";
private static final long PRECISE_SEARCH = 1L;

public String createTopicReturnTopicId(String topicName, String logSetId, String tag, String secretId,
String secretKey, String endPoint, String region)
public String createTopicReturnTopicId(String topicName, String logSetId, String tag, Integer storageDuration,
String secretId, String secretKey, String region)
throws TencentCloudSDKException {
ClsClient client = getClsClient(secretId, secretKey, endPoint, region);
CreateTopicRequest req = getCreateTopicRequest(tag, logSetId, topicName);
ClsClient client = getClsClient(secretId, secretKey, region);
CreateTopicRequest req = getCreateTopicRequest(tag, logSetId, topicName, storageDuration);
CreateTopicResponse resp = client.CreateTopic(req);
LOG.info("create cls topic success for topicName = {}, topicId = {}, requestId = {}", topicName,
resp.getTopicId(), resp.getRequestId());
updateTopicTag(resp.getTopicId(), tag, secretId, secretKey, endPoint, region);
updateTopicTag(resp.getTopicId(), tag, secretId, secretKey, region);
return resp.getTopicId();
}

public void updateTopicTag(String topicId, String tag, String secretId,
String secretKey, String endPoint, String region) throws TencentCloudSDKException {
ClsClient client = getClsClient(secretId, secretKey, endPoint, region);
public void updateTopicTag(String topicId, String tag, String secretId, String secretKey, String region)
throws TencentCloudSDKException {
ClsClient client = getClsClient(secretId, secretKey, region);
ModifyTopicRequest modifyTopicRequest = new ModifyTopicRequest();
modifyTopicRequest.setTags(convertTags(tag.split(InlongConstants.CENTER_LINE)));
modifyTopicRequest.setTopicId(topicId);
Expand All @@ -85,22 +88,22 @@ public void updateTopicTag(String topicId, String tag, String secretId,
/**
* Create topic index by tokenizer
*/
public void createTopicIndex(String tokenizer, String topicId, String secretId, String secretKey, String endPoint,
String region) throws BusinessException {
public void createTopicIndex(String tokenizer, String topicId, String secretId, String secretKey, String region)
throws BusinessException {

LOG.debug("create topic index start for topicId = {}, tokenizer = {}", topicId, tokenizer);
if (StringUtils.isBlank(tokenizer)) {
LOG.warn("tokenizer is blank for topic = {}", topicId);
return;
}
FullTextInfo topicIndexFullText = getTopicIndexFullText(secretId, secretKey, endPoint, region, topicId);
FullTextInfo topicIndexFullText = getTopicIndexFullText(secretId, secretKey, region, topicId);
if (ObjectUtils.anyNotNull(topicIndexFullText)) {
// if topic index exist, update
LOG.debug("cls topic is exist and update for topicId = {},tokenizer = {}", topicId, tokenizer);
updateTopicIndex(tokenizer, topicId, secretId, secretKey, endPoint, region);
updateTopicIndex(tokenizer, topicId, secretId, secretKey, region);
return;
}
ClsClient clsClient = getClsClient(secretId, secretKey, endPoint, region);
ClsClient clsClient = getClsClient(secretId, secretKey, region);
CreateIndexRequest req = getCreateIndexRequest(tokenizer, topicId);
try {
CreateIndexResponse createIndexResponse = clsClient.CreateIndex(req);
Expand All @@ -117,9 +120,9 @@ public void createTopicIndex(String tokenizer, String topicId, String secretId,
/**
* Describe cls topicId by topic name
*/
public String describeTopicIDByTopicName(String topicName, String logSetId, String tag, String secretId,
String secretKey, String endPoint, String region) {
ClsClient clsClient = getClsClient(secretId, secretKey, endPoint, region);
public String describeTopicIDByTopicName(String topicName, String logSetId, String secretId, String secretKey,
String region) {
ClsClient clsClient = getClsClient(secretId, secretKey, region);
Filter[] filters = getDescribeFilters(topicName, logSetId);
DescribeTopicsRequest req = new DescribeTopicsRequest();
req.setFilters(filters);
Expand Down Expand Up @@ -154,10 +157,9 @@ public Filter[] getDescribeFilters(String topicName, String logSetId) {
/**
* Get cls topic index full text
*/
public FullTextInfo getTopicIndexFullText(String secretId, String secretKey, String endPoint, String region,
String topicId) {
public FullTextInfo getTopicIndexFullText(String secretId, String secretKey, String region, String topicId) {

ClsClient clsClient = getClsClient(secretId, secretKey, endPoint, region);
ClsClient clsClient = getClsClient(secretId, secretKey, region);
DescribeIndexRequest req = new DescribeIndexRequest();
req.setTopicId(topicId);
try {
Expand All @@ -170,9 +172,8 @@ public FullTextInfo getTopicIndexFullText(String secretId, String secretKey, Str
}
}

public void updateTopicIndex(String tokenizer, String topicId,
String secretId, String secretKey, String endPoint, String region) {
ClsClient clsClient = getClsClient(secretId, secretKey, endPoint, region);
public void updateTopicIndex(String tokenizer, String topicId, String secretId, String secretKey, String region) {
ClsClient clsClient = getClsClient(secretId, secretKey, region);
RuleInfo ruleInfo = new RuleInfo();
FullTextInfo fullTextInfo = new FullTextInfo();
fullTextInfo.setTokenizer(tokenizer);
Expand All @@ -192,11 +193,11 @@ public void updateTopicIndex(String tokenizer, String topicId,
}
}

public ClsClient getClsClient(String secretId, String secretKey, String endPoint, String region) {
public ClsClient getClsClient(String secretId, String secretKey, String region) {
Credential cred = new Credential(secretId,
secretKey);
HttpProfile httpProfile = new HttpProfile();
httpProfile.setEndpoint(endPoint);
httpProfile.setEndpoint(endpoint);
ClientProfile clientProfile = new ClientProfile();

clientProfile.setHttpProfile(httpProfile);
Expand All @@ -215,11 +216,13 @@ public CreateIndexRequest getCreateIndexRequest(String tokenizer, String topicId
return req;
}

public CreateTopicRequest getCreateTopicRequest(String tags, String logSetId, String topicName) {
public CreateTopicRequest getCreateTopicRequest(String tags, String logSetId, String topicName,
Integer storageDuration) {
CreateTopicRequest req = new CreateTopicRequest();
req.setTags(convertTags(tags.split(InlongConstants.CENTER_LINE)));
req.setLogsetId(logSetId);
req.setTopicName(topicName);
req.setPeriod(storageDuration == null ? null : Long.valueOf(storageDuration));
return req;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private void createClsResource(SinkInfo sinkInfo) {
// create topic index by tokenizer
clsOperator.createTopicIndex(clsSinkDTO.getTokenizer(), clsSinkDTO.getTopicId(),
clsDataNode.getManageSecretId(),
clsDataNode.getManageSecretKey(), clsDataNode.getEndpoint(), clsDataNode.getRegion());
clsDataNode.getManageSecretKey(), clsDataNode.getRegion());
// update set topic id into sink info
updateSinkInfo(sinkInfo, clsSinkDTO);
String info = "success to create cls resource";
Expand All @@ -104,14 +104,13 @@ private void createClsResource(SinkInfo sinkInfo) {
private String getTopicID(ClsDataNodeDTO clsDataNode, ClsSinkDTO clsSinkDTO)
throws TencentCloudSDKException {
String topicId = clsOperator.describeTopicIDByTopicName(clsSinkDTO.getTopicName(), clsDataNode.getLogSetId(),
clsSinkDTO.getTag(),
clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(), clsDataNode.getEndpoint(),
clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(),
clsDataNode.getRegion());
if (StringUtils.isBlank(topicId)) {
// if topic don't exist, create topic in cls
topicId = clsOperator.createTopicReturnTopicId(clsSinkDTO.getTopicName(), clsDataNode.getLogSetId(),
clsSinkDTO.getTag(), clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(),
clsDataNode.getEndpoint(),
clsSinkDTO.getTag(), clsSinkDTO.getStorageDuration(), clsDataNode.getManageSecretId(),
clsDataNode.getManageSecretKey(),
clsDataNode.getRegion());
}
return topicId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.inlong.manager.common.consts.InlongConstants.PULSAR_TOPIC_FORMAT;

/**
* Pulsar sink operator
*/
Expand Down Expand Up @@ -123,7 +125,9 @@ public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<Stri
}

private String getFullTopicName(PulsarSinkDTO pulsarSinkDTO) {
return pulsarSinkDTO.getPulsarTenant() + "/" + pulsarSinkDTO.getNamespace() + "/" + pulsarSinkDTO.getTopic();
return String.format(PULSAR_TOPIC_FORMAT, pulsarSinkDTO.getPulsarTenant(), pulsarSinkDTO.getNamespace(),
pulsarSinkDTO.getTopic());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,6 @@ group.deleted.batchSize=100
group.deleted.enabled=false

metrics.audit.proxy.hosts=127.0.0.1:10081

# tencent cloud log service endpoint, The Operator cls resource by it
cls.manager.endpoint=xxx