Skip to content

Commit

Permalink
[INLONG-11760][Agent] Increase the number of global instances control (
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Feb 14, 2025
1 parent 82cc76e commit 92d7774
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class AgentConstants {
public static final String DEFAULT_AGENT_SCAN_RANGE_DAY = "-2";
public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-10";
public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-600";
public static final String AGENT_INSTANCE_LIMIT = "agent.instance.limit";
public static final int DEFAULT_AGENT_INSTANCE_LIMIT = 100;

// pulsar sink config
public static final String PULSAR_CLIENT_IO_TREHAD_NUM = "agent.sink.pulsar.client.io.thread.num";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class CommonConstants {
public static final String DEFAULT_PROXY_INLONG_STREAM_ID = "default_inlong_stream_id";

public static final String PROXY_TOTAL_ASYNC_PROXY_SIZE = "proxy.total.async.proxy.size";
public static final int DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE = 200 * 1024 * 1024;
public static final int DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE_KB = 200 * 1024;

public static final String PROXY_ALIVE_CONNECTION_NUM = "proxy.alive.connection.num";
public static final int DEFAULT_PROXY_ALIVE_CONNECTION_NUM = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,6 @@ public abstract class Task extends AbstractStateWrapper {
* is profile valid
*/
public abstract boolean isProfileValid(TaskProfile profile);

public abstract int getInstanceNum();
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private void createMessageSender() {
proxyClientConfig = new TcpMsgSenderConfig(managerAddr,
INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
proxyClientConfig.setMaxInFlightSizeInKb(
CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE / 1024);
CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE_KB);
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
proxyClientConfig.setNettyWorkerThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setRequestTimeoutMs(30000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.store.InstanceStore;
Expand Down Expand Up @@ -81,6 +83,8 @@ public class InstanceManager extends AbstractDaemon {
private long auditVersion;
private volatile boolean running = false;
private final double reserveCoefficient = 0.8;
protected TaskManager taskManager;
private final int globalInstanceLimit;

private class InstancePrintStat {

Expand Down Expand Up @@ -118,7 +122,9 @@ public String toString() {
/**
* Init task manager.
*/
public InstanceManager(String taskId, int instanceLimit, Store basicStore, TaskStore taskStore) {
public InstanceManager(TaskManager taskManager, String taskId, int instanceLimit, Store basicStore,
TaskStore taskStore) {
this.taskManager = taskManager;
this.taskId = taskId;
instanceStore = new InstanceStore(basicStore);
this.taskStore = taskStore;
Expand All @@ -127,6 +133,8 @@ public InstanceManager(String taskId, int instanceLimit, Store basicStore, TaskS
this.instanceLimit = instanceLimit;
actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
addActionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
globalInstanceLimit = agentConf.getInt(AgentConstants.AGENT_INSTANCE_LIMIT,
AgentConstants.DEFAULT_AGENT_INSTANCE_LIMIT);
}

public String getTaskId() {
Expand Down Expand Up @@ -292,8 +300,12 @@ private void dealWithActionQueue() {

private void dealWithAddActionQueue() {
while (isRunnable()) {
if (taskManager != null && taskManager.getInstanceNum() > globalInstanceLimit) {
LOGGER.error("global instance num {} over limit {}", taskManager.getInstanceNum(), globalInstanceLimit);
return;
}
if (instanceMap.size() > instanceLimit) {
LOGGER.error("instanceMap size {} over limit {}", instanceMap.size(), instanceLimit);
LOGGER.error("task {} instanceMap size {} over limit {}", taskId, instanceMap.size(), instanceLimit);
return;
}
InstanceAction action = addActionQueue.poll();
Expand Down Expand Up @@ -507,4 +519,9 @@ public long getFinishedInstanceCount() {
}
return count;
}

public int getInstanceNum() {

return instanceMap.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,14 @@ public Task getTask(String taskId) {
return taskMap.get(taskId);
}

public int getInstanceNum() {
int num = 0;
for (Task task : taskMap.values()) {
num += task.getInstanceNum();
}
return num;
}

public TaskProfile getTaskProfile(String taskId) {
return taskStore.getTask(taskId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ private void doFlushOffset() {
}
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, lenToRelease);
if (info != null) {
LOGGER.info("save offset {} taskId {} instanceId {}", info.getOffset(), profile.getTaskId(),
profile.getInstanceId());
LOGGER.info("save offset {} taskId {} instanceId {} ackInfoList {}", info.getOffset(), profile.getTaskId(),
profile.getInstanceId(), ackInfoList.size());
OffsetProfile offsetProfile = new OffsetProfile(profile.getTaskId(), profile.getInstanceId(),
info.getOffset(), profile.get(INODE_INFO));
offsetManager.setOffset(offsetProfile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public Sender(InstanceProfile profile, String inlongGroupId, String sourcePath)
totalAsyncBufSize = profile
.getInt(
CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE,
CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE_KB);
aliveConnectionNum = profile
.getInt(
CommonConstants.PROXY_ALIVE_CONNECTION_NUM, CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
Expand Down Expand Up @@ -203,7 +203,7 @@ private AgentMetricItem getMetricItem(String groupId, String streamId) {
private void createMessageSender() throws Exception {
TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig(
managerAddr, inlongGroupId, authSecretId, authSecretKey);
proxyClientConfig.setMaxInFlightSizeInKb(totalAsyncBufSize / 1024);
proxyClientConfig.setMaxInFlightSizeInKb(totalAsyncBufSize);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L);
proxyClientConfig.setNettyWorkerThreadNum(ioThreadNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void init(Object srcManager, TaskProfile taskProfile, Store basicStore) t
this.taskProfile = taskProfile;
this.basicStore = basicStore;
auditVersion = Long.parseLong(taskProfile.get(TASK_AUDIT_VERSION));
instanceManager = new InstanceManager(taskProfile.getTaskId(), getInstanceLimit(),
instanceManager = new InstanceManager(taskManager, taskProfile.getTaskId(), getInstanceLimit(),
basicStore, taskManager.getTaskStore());
try {
instanceManager.start();
Expand Down Expand Up @@ -163,4 +163,9 @@ protected boolean shouldAddAgain(String fileName, long lastModifyTime) {
protected boolean isFull() {
return instanceManager.isFull();
}

@Override
public int getInstanceNum() {
return instanceManager.getInstanceNum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public boolean isProfileValid(TaskProfile profile) {
return true;
}

@Override
public int getInstanceNum() {
return 0;
}

@Override
public void addCallbacks() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static void setup() {
Store taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
TaskStore taskStore = new TaskStore(taskBasicStore);
taskStore.storeTask(taskProfile);
manager = new InstanceManager("1", 20, basicInstanceStore, taskStore);
manager = new InstanceManager(null, "1", 20, basicInstanceStore, taskStore);
manager.CORE_THREAD_SLEEP_TIME_MS = 100;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public boolean isProfileValid(TaskProfile profile) {
return true;
}

@Override
public int getInstanceNum() {
return 0;
}

@Override
public void addCallbacks() {

Expand Down

0 comments on commit 92d7774

Please sign in to comment.