Skip to content

Commit

Permalink
[INLONG-8995][Manager] Add an interface for querying used task inform…
Browse files Browse the repository at this point in the history
…ation for agent (#8996)
  • Loading branch information
fuweng11 authored Sep 27, 2023
1 parent ae9990c commit 94728b6
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class DataConfig {
private String snapshot;
private Integer syncSend;
private String syncPartitionKey;
private Integer state;
private String extParams;
/**
* The task version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ public enum SourceStatus {
TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE,
TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP);

public static final Set<SourceStatus> NORMAL_STATUS_SET = Sets.newHashSet(
SOURCE_NORMAL, TO_BE_ISSUED_ADD, TO_BE_ISSUED_RETRY,
TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_ACTIVE, TO_BE_ISSUED_CHECK,
TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP, BEEN_ISSUED_ADD,
BEEN_ISSUED_RETRY, BEEN_ISSUED_BACKTRACK, BEEN_ISSUED_ACTIVE,
BEEN_ISSUED_CHECK, BEEN_ISSUED_REDO_METRIC, BEEN_ISSUED_MAKEUP);

public static final Set<SourceStatus> STOP_STATUS_SET = Sets.newHashSet(SOURCE_STOP, SOURCE_FAILED,
TO_BE_ISSUED_STOP, BEEN_ISSUED_STOP);

private static final Map<SourceStatus, Set<SourceStatus>> SOURCE_STATE_AUTOMATON = Maps.newHashMap();

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ public class FileSource extends StreamSource {
+ " Json format, set this parameter to json ")
private String dataContentStyle;

@ApiModelProperty("Cycle unit")
private String cycleUnit;

@ApiModelProperty("Whether retry")
private Boolean retry;

@ApiModelProperty("Start time")
private Long startTime;

@ApiModelProperty("End time")
private Long endTime;

@ApiModelProperty("Metadata filters by label, special parameters for K8S")
private Map<String, String> filterMetaByLabels;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ public class FileSourceDTO {
@ApiModelProperty("Column separator of data source ")
private String dataSeparator;

@ApiModelProperty("Cycle unit")
private String cycleUnit = "D";

@ApiModelProperty("Whether retry")
private Boolean retry = false;

@ApiModelProperty("Start time")
private Long startTime = 0L;

@ApiModelProperty("End time")
private Long endTime = 0L;

@ApiModelProperty("Metadata filters by label, special parameters for K8S")
private Map<String, String> filterMetaByLabels;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ public class FileSourceRequest extends SourceRequest {
+ " Json format, set this parameter to json ")
private String dataContentStyle;

@ApiModelProperty("Cycle unit")
private String cycleUnit;

@ApiModelProperty("Whether retry")
private Boolean retry;

@ApiModelProperty("Start time")
private Long startTime;

@ApiModelProperty("End time")
private Long endTime;

@ApiModelProperty("Metadata filters by label, special parameters for K8S")
private Map<String, String> filterMetaByLabels;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public interface AgentService {
*/
TaskResult getTaskResult(TaskRequest request);

TaskResult getExistTaskConfig(TaskRequest request);

/**
* Divide the agent into different groups, which collect different stream source tasks.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import lombok.Getter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -119,6 +122,10 @@ public class AgentServiceImpl implements AgentService {
new ArrayBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(),
new CallerRunsPolicy());

@Getter
private LoadingCache<TaskRequest, List<StreamSourceEntity>> taskCache;

@Value("${source.update.enabled:false}")
private Boolean updateTaskTimeoutEnabled;
@Value("${source.update.before.seconds:60}")
Expand Down Expand Up @@ -149,6 +156,14 @@ public class AgentServiceImpl implements AgentService {
*/
@PostConstruct
private void startHeartbeatTask() {

// The expiry time of cluster info cache must be greater than taskCache cache
// because the eviction handler needs to query cluster info cache
long expireTime = 10 * 5;
taskCache = Caffeine.newBuilder()
.expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS)
.build(this::fetchTask);

if (updateTaskTimeoutEnabled) {
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat("scheduled-source-timeout-%d")
Expand Down Expand Up @@ -267,6 +282,32 @@ public TaskResult getTaskResult(TaskRequest request) {
return TaskResult.builder().dataConfigs(tasks).cmdConfigs(cmdConfigs).build();
}

@Override
public TaskResult getExistTaskConfig(TaskRequest request) {
LOGGER.debug("begin to get all exist task by request={}", request);
// Query pending special commands
List<DataConfig> runningTaskConfig = Lists.newArrayList();
List<StreamSourceEntity> sourceEntities = taskCache.get(request);
try {
List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
if (CollectionUtils.isEmpty(sourceEntities)) {
return TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
}
for (StreamSourceEntity sourceEntity : sourceEntities) {
int op = getOp(sourceEntity.getStatus());
DataConfig dataConfig = getDataConfig(sourceEntity, op);
runningTaskConfig.add(dataConfig);
}
TaskResult taskResult = TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();

return taskResult;
} catch (Exception e) {
LOGGER.error("get all exist task failed:", e);
throw new BusinessException("get all exist task failed:" + e.getMessage());
}

}

@Override
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
public Boolean bindGroup(AgentClusterNodeBindGroupRequest request) {
Expand Down Expand Up @@ -552,6 +593,8 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
String extParams = entity.getExtParams();
if (groupEntity != null && streamEntity != null) {
dataConfig.setState(
SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus())) ? 1 : 0);
dataConfig.setSyncSend(streamEntity.getSyncSend());
if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
String dataSeparator = streamEntity.getDataSeparator();
Expand Down Expand Up @@ -683,4 +726,20 @@ private boolean matchGroup(StreamSourceEntity sourceEntity, InlongClusterNodeEnt
return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
}

private List<StreamSourceEntity> fetchTask(TaskRequest request) {
final String clusterName = request.getClusterName();
final String ip = request.getAgentIp();
final String uuid = request.getUuid();
List<StreamSourceEntity> normalSourceEntities = sourceMapper.selectByStatusAndCluster(
SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()),
clusterName, ip, uuid);
List<StreamSourceEntity> taskLists = new ArrayList<>(normalSourceEntities);
List<StreamSourceEntity> stopSourceEntities = sourceMapper.selectByStatusAndCluster(
SourceStatus.STOP_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()),
clusterName, ip, uuid);
taskLists.addAll(stopSourceEntities);
LOGGER.debug("success to add task : {}", taskLists.size());
return taskLists;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public Response<TaskResult> reportAndGetTask(@RequestBody TaskRequest request) {
return Response.success(agentService.getTaskResult(request));
}

@PostMapping("/agent/getExistTaskConfig")
@ApiOperation(value = "Get all exist task config")
public Response<TaskResult> getExistTaskConfig(@RequestBody TaskRequest request) {
return Response.success(agentService.getExistTaskConfig(request));
}

@PostMapping("/agent/bindGroup")
@ApiOperation(value = "Divide the agent into different groups, which collect different stream source tasks.")
public Response<Boolean> bindGroup(@RequestBody AgentClusterNodeBindGroupRequest request) {
Expand Down

0 comments on commit 94728b6

Please sign in to comment.