Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-8995][Manager] Add an interface for querying used task information for agent #8996

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class DataConfig {
private String snapshot;
private Integer syncSend;
private String syncPartitionKey;
private Integer state;
private String extParams;
/**
* The task version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ public enum SourceStatus {
TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE,
TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP);

public static final Set<SourceStatus> NORMAL_STATUS_SET = Sets.newHashSet(
SOURCE_NORMAL, TO_BE_ISSUED_ADD, TO_BE_ISSUED_RETRY,
TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_ACTIVE, TO_BE_ISSUED_CHECK,
TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP, BEEN_ISSUED_ADD,
BEEN_ISSUED_RETRY, BEEN_ISSUED_BACKTRACK, BEEN_ISSUED_ACTIVE,
BEEN_ISSUED_CHECK, BEEN_ISSUED_REDO_METRIC, BEEN_ISSUED_MAKEUP);

public static final Set<SourceStatus> STOP_STATUS_SET = Sets.newHashSet(SOURCE_STOP, SOURCE_FAILED,
TO_BE_ISSUED_STOP, BEEN_ISSUED_STOP);

private static final Map<SourceStatus, Set<SourceStatus>> SOURCE_STATE_AUTOMATON = Maps.newHashMap();

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ public class FileSource extends StreamSource {
+ " Json format, set this parameter to json ")
private String dataContentStyle;

@ApiModelProperty("Cycle unit")
private String cycleUnit;

@ApiModelProperty("Whether retry")
private Boolean retry;

@ApiModelProperty("Start time")
private Long startTime;

@ApiModelProperty("End time")
private Long endTime;

@ApiModelProperty("Metadata filters by label, special parameters for K8S")
private Map<String, String> filterMetaByLabels;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ public class FileSourceDTO {
@ApiModelProperty("Column separator of data source ")
private String dataSeparator;

@ApiModelProperty("Cycle unit")
private String cycleUnit = "D";

@ApiModelProperty("Whether retry")
private Boolean retry = false;

@ApiModelProperty("Start time")
private Long startTime = 0L;

@ApiModelProperty("End time")
private Long endTime = 0L;

@ApiModelProperty("Metadata filters by label, special parameters for K8S")
private Map<String, String> filterMetaByLabels;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ public class FileSourceRequest extends SourceRequest {
+ " Json format, set this parameter to json ")
private String dataContentStyle;

@ApiModelProperty("Cycle unit")
private String cycleUnit;

@ApiModelProperty("Whether retry")
private Boolean retry;

@ApiModelProperty("Start time")
private Long startTime;

@ApiModelProperty("End time")
private Long endTime;

@ApiModelProperty("Metadata filters by label, special parameters for K8S")
private Map<String, String> filterMetaByLabels;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public interface AgentService {
*/
TaskResult getTaskResult(TaskRequest request);

TaskResult getExistTaskConfig(TaskRequest request);

/**
* Divide the agent into different groups, which collect different stream source tasks.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import lombok.Getter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -119,6 +122,10 @@ public class AgentServiceImpl implements AgentService {
new ArrayBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(),
new CallerRunsPolicy());

@Getter
private LoadingCache<TaskRequest, List<StreamSourceEntity>> taskCache;

@Value("${source.update.enabled:false}")
private Boolean updateTaskTimeoutEnabled;
@Value("${source.update.before.seconds:60}")
Expand Down Expand Up @@ -149,6 +156,14 @@ public class AgentServiceImpl implements AgentService {
*/
@PostConstruct
private void startHeartbeatTask() {

// The expiry time of cluster info cache must be greater than taskCache cache
// because the eviction handler needs to query cluster info cache
long expireTime = 10 * 5;
taskCache = Caffeine.newBuilder()
.expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS)
.build(this::fetchTask);

if (updateTaskTimeoutEnabled) {
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat("scheduled-source-timeout-%d")
Expand Down Expand Up @@ -267,6 +282,32 @@ public TaskResult getTaskResult(TaskRequest request) {
return TaskResult.builder().dataConfigs(tasks).cmdConfigs(cmdConfigs).build();
}

@Override
public TaskResult getExistTaskConfig(TaskRequest request) {
LOGGER.debug("begin to get all exist task by request={}", request);
// Query pending special commands
List<DataConfig> runningTaskConfig = Lists.newArrayList();
List<StreamSourceEntity> sourceEntities = taskCache.get(request);
try {
List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
if (CollectionUtils.isEmpty(sourceEntities)) {
return TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
}
for (StreamSourceEntity sourceEntity : sourceEntities) {
int op = getOp(sourceEntity.getStatus());
DataConfig dataConfig = getDataConfig(sourceEntity, op);
runningTaskConfig.add(dataConfig);
}
TaskResult taskResult = TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();

return taskResult;
} catch (Exception e) {
LOGGER.error("get all exist task failed:", e);
throw new BusinessException("get all exist task failed:" + e.getMessage());
}

}

@Override
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
public Boolean bindGroup(AgentClusterNodeBindGroupRequest request) {
Expand Down Expand Up @@ -552,6 +593,8 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
String extParams = entity.getExtParams();
if (groupEntity != null && streamEntity != null) {
dataConfig.setState(
SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus())) ? 1 : 0);
dataConfig.setSyncSend(streamEntity.getSyncSend());
if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
String dataSeparator = streamEntity.getDataSeparator();
Expand Down Expand Up @@ -683,4 +726,20 @@ private boolean matchGroup(StreamSourceEntity sourceEntity, InlongClusterNodeEnt
return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
}

private List<StreamSourceEntity> fetchTask(TaskRequest request) {
final String clusterName = request.getClusterName();
final String ip = request.getAgentIp();
final String uuid = request.getUuid();
List<StreamSourceEntity> normalSourceEntities = sourceMapper.selectByStatusAndCluster(
SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()),
clusterName, ip, uuid);
List<StreamSourceEntity> taskLists = new ArrayList<>(normalSourceEntities);
List<StreamSourceEntity> stopSourceEntities = sourceMapper.selectByStatusAndCluster(
SourceStatus.STOP_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()),
clusterName, ip, uuid);
taskLists.addAll(stopSourceEntities);
LOGGER.debug("success to add task : {}", taskLists.size());
return taskLists;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public Response<TaskResult> reportAndGetTask(@RequestBody TaskRequest request) {
return Response.success(agentService.getTaskResult(request));
}

@PostMapping("/agent/getExistTaskConfig")
@ApiOperation(value = "Get all exist task config")
public Response<TaskResult> getExistTaskConfig(@RequestBody TaskRequest request) {
return Response.success(agentService.getExistTaskConfig(request));
}

@PostMapping("/agent/bindGroup")
@ApiOperation(value = "Divide the agent into different groups, which collect different stream source tasks.")
public Response<Boolean> bindGroup(@RequestBody AgentClusterNodeBindGroupRequest request) {
Expand Down