diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml index 8273c333437..2fae94247fe 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml @@ -123,6 +123,7 @@ select inlong_group_id, inlong_stream_id, + sink_id, field_name from stream_sink_field where is_deleted = 0 diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java index b4fcbd9fd94..fc8ad991569 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java @@ -24,5 +24,6 @@ public class SortFieldInfo { private String inlongGroupId; private String inlongStreamId; + private Integer sinkId; private String fieldName; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java index 6af22b75b6f..170b2952493 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java @@ -20,11 +20,14 @@ import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig; import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse; import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig; +import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo; +import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam; import org.apache.inlong.manager.service.core.SortClusterService; import org.apache.inlong.manager.service.core.SortConfigLoader; import org.apache.inlong.manager.service.node.DataNodeOperator; @@ -34,6 +37,7 @@ import com.google.gson.Gson; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +80,9 @@ public class SortClusterServiceImpl implements SortClusterService { private static final String KEY_GROUP_ID = "inlongGroupId"; private static final String KEY_STREAM_ID = "inlongStreamId"; - private Map> fieldMap; + private static final String FILED_OFFSET = "fieldOffset"; + // key: sink id, value: fileNames + private Map> fieldMap; // key : sort cluster name, value : md5 private Map sortClusterMd5Map = new ConcurrentHashMap<>(); @@ -84,6 +90,8 @@ public class SortClusterServiceImpl implements SortClusterService { private Map sortClusterConfigMap = new ConcurrentHashMap<>(); // key : sort cluster name, value : error log private Map sortClusterErrorLogMap = new ConcurrentHashMap<>(); + // key: group id ,value: {key: stream id, value: stream info} + private Map> allStreams; private long reloadInterval; @@ -169,7 +177,7 @@ private void reloadAllClusterConfig() { List fieldInfos = sortConfigLoader.loadAllFields(); fieldMap = new HashMap<>(); fieldInfos.forEach(info -> { - List fields = fieldMap.computeIfAbsent(info.getInlongGroupId(), k -> new ArrayList<>()); + List fields = fieldMap.computeIfAbsent(info.getSinkId(), k -> new ArrayList<>()); fields.add(info.getFieldName()); }); @@ -183,6 +191,12 @@ private void reloadAllClusterConfig() { && StringUtils.isNotBlank(dto.getSinkType())) .collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName)); + // reload all streams + allStreams = sortConfigLoader.loadAllStreams() + .stream() + .collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId, + Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info))); + // get all stream sinks Map> task2AllStreams = sinkEntities.stream() .filter(entity -> StringUtils.isNotBlank(entity.getInlongClusterName())) @@ -265,8 +279,10 @@ private List> parseIdParams(List streams, .map(streamSink -> { try { StreamSinkOperator operator = sinkOperatorFactory.getInstance(streamSink.getSinkType()); - List fields = fieldMap.get(streamSink.getInlongGroupId()); - return operator.parse2IdParams(streamSink, fields, dataNodeInfo); + List fields = fieldMap.get(streamSink.getId()); + Map params = operator.parse2IdParams(streamSink, fields, dataNodeInfo); + setFiledOffset(streamSink, params); + return params; } catch (Exception e) { LOGGER.error("fail to parse id params of groupId={}, streamId={} name={}, type={}}", streamSink.getInlongGroupId(), streamSink.getInlongStreamId(), @@ -278,6 +294,17 @@ private List> parseIdParams(List streams, .collect(Collectors.toList()); } + private void setFiledOffset(StreamSinkEntity streamSink, Map params) { + + SortSourceStreamInfo sortSourceStreamInfo = allStreams.get(streamSink.getInlongGroupId()) + .get(streamSink.getInlongStreamId()); + InlongStreamExtParam inlongStreamExtParam = JsonUtils.parseObject( + sortSourceStreamInfo.getExtParams(), InlongStreamExtParam.class); + if (ObjectUtils.anyNotNull(inlongStreamExtParam) && !inlongStreamExtParam.getUseExtendedFields()) { + params.put(FILED_OFFSET, String.valueOf(0)); + } + } + private Map parseSinkParams(DataNodeInfo nodeInfo) { DataNodeOperator operator = dataNodeOperatorFactory.getInstance(nodeInfo.getType()); return operator.parse2SinkParams(nodeInfo); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java index 9aa29d99936..d15223b8613 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java @@ -47,6 +47,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.ArrayList; @@ -55,26 +56,28 @@ @Service public class ClsOperator { + @Value("${cls.manager.endpoint}") + private String endpoint; private static final Logger LOG = LoggerFactory.getLogger(ClsOperator.class); private static final String TOPIC_NAME = "topicName"; private static final String LOG_SET_ID = "logsetId"; private static final long PRECISE_SEARCH = 1L; - public String createTopicReturnTopicId(String topicName, String logSetId, String tag, String secretId, - String secretKey, String endPoint, String region) + public String createTopicReturnTopicId(String topicName, String logSetId, String tag, Integer storageDuration, + String secretId, String secretKey, String region) throws TencentCloudSDKException { - ClsClient client = getClsClient(secretId, secretKey, endPoint, region); - CreateTopicRequest req = getCreateTopicRequest(tag, logSetId, topicName); + ClsClient client = getClsClient(secretId, secretKey, region); + CreateTopicRequest req = getCreateTopicRequest(tag, logSetId, topicName, storageDuration); CreateTopicResponse resp = client.CreateTopic(req); LOG.info("create cls topic success for topicName = {}, topicId = {}, requestId = {}", topicName, resp.getTopicId(), resp.getRequestId()); - updateTopicTag(resp.getTopicId(), tag, secretId, secretKey, endPoint, region); + updateTopicTag(resp.getTopicId(), tag, secretId, secretKey, region); return resp.getTopicId(); } - public void updateTopicTag(String topicId, String tag, String secretId, - String secretKey, String endPoint, String region) throws TencentCloudSDKException { - ClsClient client = getClsClient(secretId, secretKey, endPoint, region); + public void updateTopicTag(String topicId, String tag, String secretId, String secretKey, String region) + throws TencentCloudSDKException { + ClsClient client = getClsClient(secretId, secretKey, region); ModifyTopicRequest modifyTopicRequest = new ModifyTopicRequest(); modifyTopicRequest.setTags(convertTags(tag.split(InlongConstants.CENTER_LINE))); modifyTopicRequest.setTopicId(topicId); @@ -85,22 +88,22 @@ public void updateTopicTag(String topicId, String tag, String secretId, /** * Create topic index by tokenizer */ - public void createTopicIndex(String tokenizer, String topicId, String secretId, String secretKey, String endPoint, - String region) throws BusinessException { + public void createTopicIndex(String tokenizer, String topicId, String secretId, String secretKey, String region) + throws BusinessException { LOG.debug("create topic index start for topicId = {}, tokenizer = {}", topicId, tokenizer); if (StringUtils.isBlank(tokenizer)) { LOG.warn("tokenizer is blank for topic = {}", topicId); return; } - FullTextInfo topicIndexFullText = getTopicIndexFullText(secretId, secretKey, endPoint, region, topicId); + FullTextInfo topicIndexFullText = getTopicIndexFullText(secretId, secretKey, region, topicId); if (ObjectUtils.anyNotNull(topicIndexFullText)) { // if topic index exist, update LOG.debug("cls topic is exist and update for topicId = {},tokenizer = {}", topicId, tokenizer); - updateTopicIndex(tokenizer, topicId, secretId, secretKey, endPoint, region); + updateTopicIndex(tokenizer, topicId, secretId, secretKey, region); return; } - ClsClient clsClient = getClsClient(secretId, secretKey, endPoint, region); + ClsClient clsClient = getClsClient(secretId, secretKey, region); CreateIndexRequest req = getCreateIndexRequest(tokenizer, topicId); try { CreateIndexResponse createIndexResponse = clsClient.CreateIndex(req); @@ -117,9 +120,9 @@ public void createTopicIndex(String tokenizer, String topicId, String secretId, /** * Describe cls topicId by topic name */ - public String describeTopicIDByTopicName(String topicName, String logSetId, String tag, String secretId, - String secretKey, String endPoint, String region) { - ClsClient clsClient = getClsClient(secretId, secretKey, endPoint, region); + public String describeTopicIDByTopicName(String topicName, String logSetId, String secretId, String secretKey, + String region) { + ClsClient clsClient = getClsClient(secretId, secretKey, region); Filter[] filters = getDescribeFilters(topicName, logSetId); DescribeTopicsRequest req = new DescribeTopicsRequest(); req.setFilters(filters); @@ -154,10 +157,9 @@ public Filter[] getDescribeFilters(String topicName, String logSetId) { /** * Get cls topic index full text */ - public FullTextInfo getTopicIndexFullText(String secretId, String secretKey, String endPoint, String region, - String topicId) { + public FullTextInfo getTopicIndexFullText(String secretId, String secretKey, String region, String topicId) { - ClsClient clsClient = getClsClient(secretId, secretKey, endPoint, region); + ClsClient clsClient = getClsClient(secretId, secretKey, region); DescribeIndexRequest req = new DescribeIndexRequest(); req.setTopicId(topicId); try { @@ -170,9 +172,8 @@ public FullTextInfo getTopicIndexFullText(String secretId, String secretKey, Str } } - public void updateTopicIndex(String tokenizer, String topicId, - String secretId, String secretKey, String endPoint, String region) { - ClsClient clsClient = getClsClient(secretId, secretKey, endPoint, region); + public void updateTopicIndex(String tokenizer, String topicId, String secretId, String secretKey, String region) { + ClsClient clsClient = getClsClient(secretId, secretKey, region); RuleInfo ruleInfo = new RuleInfo(); FullTextInfo fullTextInfo = new FullTextInfo(); fullTextInfo.setTokenizer(tokenizer); @@ -192,11 +193,11 @@ public void updateTopicIndex(String tokenizer, String topicId, } } - public ClsClient getClsClient(String secretId, String secretKey, String endPoint, String region) { + public ClsClient getClsClient(String secretId, String secretKey, String region) { Credential cred = new Credential(secretId, secretKey); HttpProfile httpProfile = new HttpProfile(); - httpProfile.setEndpoint(endPoint); + httpProfile.setEndpoint(endpoint); ClientProfile clientProfile = new ClientProfile(); clientProfile.setHttpProfile(httpProfile); @@ -215,11 +216,13 @@ public CreateIndexRequest getCreateIndexRequest(String tokenizer, String topicId return req; } - public CreateTopicRequest getCreateTopicRequest(String tags, String logSetId, String topicName) { + public CreateTopicRequest getCreateTopicRequest(String tags, String logSetId, String topicName, + Integer storageDuration) { CreateTopicRequest req = new CreateTopicRequest(); req.setTags(convertTags(tags.split(InlongConstants.CENTER_LINE))); req.setLogsetId(logSetId); req.setTopicName(topicName); + req.setPeriod(storageDuration == null ? null : Long.valueOf(storageDuration)); return req; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java index c16391907a4..173a139758f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java @@ -86,7 +86,7 @@ private void createClsResource(SinkInfo sinkInfo) { // create topic index by tokenizer clsOperator.createTopicIndex(clsSinkDTO.getTokenizer(), clsSinkDTO.getTopicId(), clsDataNode.getManageSecretId(), - clsDataNode.getManageSecretKey(), clsDataNode.getEndpoint(), clsDataNode.getRegion()); + clsDataNode.getManageSecretKey(), clsDataNode.getRegion()); // update set topic id into sink info updateSinkInfo(sinkInfo, clsSinkDTO); String info = "success to create cls resource"; @@ -104,14 +104,13 @@ private void createClsResource(SinkInfo sinkInfo) { private String getTopicID(ClsDataNodeDTO clsDataNode, ClsSinkDTO clsSinkDTO) throws TencentCloudSDKException { String topicId = clsOperator.describeTopicIDByTopicName(clsSinkDTO.getTopicName(), clsDataNode.getLogSetId(), - clsSinkDTO.getTag(), - clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(), clsDataNode.getEndpoint(), + clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(), clsDataNode.getRegion()); if (StringUtils.isBlank(topicId)) { // if topic don't exist, create topic in cls topicId = clsOperator.createTopicReturnTopicId(clsSinkDTO.getTopicName(), clsDataNode.getLogSetId(), - clsSinkDTO.getTag(), clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(), - clsDataNode.getEndpoint(), + clsSinkDTO.getTag(), clsSinkDTO.getStorageDuration(), clsDataNode.getManageSecretId(), + clsDataNode.getManageSecretKey(), clsDataNode.getRegion()); } return topicId; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java index 6a1657b1ec0..dcad49ae908 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java @@ -46,6 +46,8 @@ import java.util.List; import java.util.Map; +import static org.apache.inlong.manager.common.consts.InlongConstants.PULSAR_TOPIC_FORMAT; + /** * Pulsar sink operator */ @@ -123,7 +125,9 @@ public Map parse2IdParams(StreamSinkEntity streamSink, List