From 94728b66760364d10cf38945fd6b1d3aad828de3 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Wed, 27 Sep 2023 20:50:02 +0800 Subject: [PATCH] [INLONG-8995][Manager] Add an interface for querying used task information for agent (#8996) --- .../inlong/common/pojo/agent/DataConfig.java | 1 + .../manager/common/enums/SourceStatus.java | 10 ++++ .../manager/pojo/source/file/FileSource.java | 12 ++++ .../pojo/source/file/FileSourceDTO.java | 12 ++++ .../pojo/source/file/FileSourceRequest.java | 12 ++++ .../manager/service/core/AgentService.java | 2 + .../service/core/impl/AgentServiceImpl.java | 59 +++++++++++++++++++ .../controller/openapi/AgentController.java | 6 ++ 8 files changed, 114 insertions(+) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java index e71f132b580..975f74d128c 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java @@ -47,6 +47,7 @@ public class DataConfig { private String snapshot; private Integer syncSend; private String syncPartitionKey; + private Integer state; private String extParams; /** * The task version. diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java index 306a9e60ccb..35c9f1aac4d 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java @@ -84,6 +84,16 @@ public enum SourceStatus { TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE, TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP); + public static final Set NORMAL_STATUS_SET = Sets.newHashSet( + SOURCE_NORMAL, TO_BE_ISSUED_ADD, TO_BE_ISSUED_RETRY, + TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_ACTIVE, TO_BE_ISSUED_CHECK, + TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP, BEEN_ISSUED_ADD, + BEEN_ISSUED_RETRY, BEEN_ISSUED_BACKTRACK, BEEN_ISSUED_ACTIVE, + BEEN_ISSUED_CHECK, BEEN_ISSUED_REDO_METRIC, BEEN_ISSUED_MAKEUP); + + public static final Set STOP_STATUS_SET = Sets.newHashSet(SOURCE_STOP, SOURCE_FAILED, + TO_BE_ISSUED_STOP, BEEN_ISSUED_STOP); + private static final Map> SOURCE_STATE_AUTOMATON = Maps.newHashMap(); static { diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java index 51d7d01d3ba..70364737987 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java @@ -77,6 +77,18 @@ public class FileSource extends StreamSource { + " Json format, set this parameter to json ") private String dataContentStyle; + @ApiModelProperty("Cycle unit") + private String cycleUnit; + + @ApiModelProperty("Whether retry") + private Boolean retry; + + @ApiModelProperty("Start time") + private Long startTime; + + @ApiModelProperty("End time") + private Long endTime; + @ApiModelProperty("Metadata filters by label, special parameters for K8S") private Map filterMetaByLabels; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java index f3a52f4f85c..19b7018b2aa 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java @@ -80,6 +80,18 @@ public class FileSourceDTO { @ApiModelProperty("Column separator of data source ") private String dataSeparator; + @ApiModelProperty("Cycle unit") + private String cycleUnit = "D"; + + @ApiModelProperty("Whether retry") + private Boolean retry = false; + + @ApiModelProperty("Start time") + private Long startTime = 0L; + + @ApiModelProperty("End time") + private Long endTime = 0L; + @ApiModelProperty("Metadata filters by label, special parameters for K8S") private Map filterMetaByLabels; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java index 822655813c4..c27ec4e761e 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java @@ -72,6 +72,18 @@ public class FileSourceRequest extends SourceRequest { + " Json format, set this parameter to json ") private String dataContentStyle; + @ApiModelProperty("Cycle unit") + private String cycleUnit; + + @ApiModelProperty("Whether retry") + private Boolean retry; + + @ApiModelProperty("Start time") + private Long startTime; + + @ApiModelProperty("End time") + private Long endTime; + @ApiModelProperty("Metadata filters by label, special parameters for K8S") private Map filterMetaByLabels; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java index 099ebca7207..2fd782dbe03 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java @@ -50,6 +50,8 @@ public interface AgentService { */ TaskResult getTaskResult(TaskRequest request); + TaskResult getExistTaskConfig(TaskRequest request); + /** * Divide the agent into different groups, which collect different stream source tasks. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index b20d76a8c0c..f8724045ac7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -58,10 +58,13 @@ import org.apache.inlong.manager.service.core.AgentService; import org.apache.inlong.manager.service.source.SourceSnapshotOperator; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.Gson; +import lombok.Getter; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; @@ -119,6 +122,10 @@ public class AgentServiceImpl implements AgentService { new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(), new CallerRunsPolicy()); + + @Getter + private LoadingCache> taskCache; + @Value("${source.update.enabled:false}") private Boolean updateTaskTimeoutEnabled; @Value("${source.update.before.seconds:60}") @@ -149,6 +156,14 @@ public class AgentServiceImpl implements AgentService { */ @PostConstruct private void startHeartbeatTask() { + + // The expiry time of cluster info cache must be greater than taskCache cache + // because the eviction handler needs to query cluster info cache + long expireTime = 10 * 5; + taskCache = Caffeine.newBuilder() + .expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS) + .build(this::fetchTask); + if (updateTaskTimeoutEnabled) { ThreadFactory factory = new ThreadFactoryBuilder() .setNameFormat("scheduled-source-timeout-%d") @@ -267,6 +282,32 @@ public TaskResult getTaskResult(TaskRequest request) { return TaskResult.builder().dataConfigs(tasks).cmdConfigs(cmdConfigs).build(); } + @Override + public TaskResult getExistTaskConfig(TaskRequest request) { + LOGGER.debug("begin to get all exist task by request={}", request); + // Query pending special commands + List runningTaskConfig = Lists.newArrayList(); + List sourceEntities = taskCache.get(request); + try { + List cmdConfigs = getAgentCmdConfigs(request); + if (CollectionUtils.isEmpty(sourceEntities)) { + return TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build(); + } + for (StreamSourceEntity sourceEntity : sourceEntities) { + int op = getOp(sourceEntity.getStatus()); + DataConfig dataConfig = getDataConfig(sourceEntity, op); + runningTaskConfig.add(dataConfig); + } + TaskResult taskResult = TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build(); + + return taskResult; + } catch (Exception e) { + LOGGER.error("get all exist task failed:", e); + throw new BusinessException("get all exist task failed:" + e.getMessage()); + } + + } + @Override @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public Boolean bindGroup(AgentClusterNodeBindGroupRequest request) { @@ -552,6 +593,8 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) { InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId); String extParams = entity.getExtParams(); if (groupEntity != null && streamEntity != null) { + dataConfig.setState( + SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus())) ? 1 : 0); dataConfig.setSyncSend(streamEntity.getSyncSend()); if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) { String dataSeparator = streamEntity.getDataSeparator(); @@ -683,4 +726,20 @@ private boolean matchGroup(StreamSourceEntity sourceEntity, InlongClusterNodeEnt return sourceGroups.stream().anyMatch(clusterNodeGroups::contains); } + private List fetchTask(TaskRequest request) { + final String clusterName = request.getClusterName(); + final String ip = request.getAgentIp(); + final String uuid = request.getUuid(); + List normalSourceEntities = sourceMapper.selectByStatusAndCluster( + SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()), + clusterName, ip, uuid); + List taskLists = new ArrayList<>(normalSourceEntities); + List stopSourceEntities = sourceMapper.selectByStatusAndCluster( + SourceStatus.STOP_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()), + clusterName, ip, uuid); + taskLists.addAll(stopSourceEntities); + LOGGER.debug("success to add task : {}", taskLists.size()); + return taskLists; + } + } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java index cded7e39622..77011bd2e17 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java @@ -70,6 +70,12 @@ public Response reportAndGetTask(@RequestBody TaskRequest request) { return Response.success(agentService.getTaskResult(request)); } + @PostMapping("/agent/getExistTaskConfig") + @ApiOperation(value = "Get all exist task config") + public Response getExistTaskConfig(@RequestBody TaskRequest request) { + return Response.success(agentService.getExistTaskConfig(request)); + } + @PostMapping("/agent/bindGroup") @ApiOperation(value = "Divide the agent into different groups, which collect different stream source tasks.") public Response bindGroup(@RequestBody AgentClusterNodeBindGroupRequest request) {