Skip to content

Commit

Permalink
[INLONG-9149][Agent] Add sender manager for file collect (#9150)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Oct 30, 2023
1 parent 119240c commit eb5f890
Show file tree
Hide file tree
Showing 7 changed files with 619 additions and 45 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,47 @@ public class AgentErrMsg {

public static final String CONFIG_SUCCESS = "SUCCESS";

// 数据源配置异常 */
public static final String DATA_SOURCE_CONFIG_ERROR = "ERROR-0-TDAgent|10001|ERROR"
// data source config error */
public static final String DATA_SOURCE_CONFIG_ERROR = "ERROR-0-INLONG_AGENT|10001|ERROR"
+ "|ERROR_DATA_SOURCE_CONFIG|";

// 监控文件夹不存在 */
public static final String DIRECTORY_NOT_FOUND_ERROR = "ERROR-0-TDAgent|11001|WARN"
// directory not found error */
public static final String DIRECTORY_NOT_FOUND_ERROR = "ERROR-0-INLONG_AGENT|11001|WARN"
+ "|WARN_DIRECTORY_NOT_EXIST|";

// 监控文件夹时出错 */
public static final String WATCH_DIR_ERROR = "ERROR-0-TDAgent|11002|ERROR"
// watch directory error */
public static final String WATCH_DIR_ERROR = "ERROR-0-INLONG_AGENT|11002|ERROR"
+ "|ERROR_WATCH_DIR_ERROR|";

// 要读取的文件异常(不存在,rotate)
public static final String FILE_ERROR = "ERROR-0-TDAgent|10002|ERROR|ERROR_SOURCE_FILE|";
// file error(not found,rotate)
public static final String FILE_ERROR = "ERROR-0-INLONG_AGENT|10002|ERROR|ERROR_SOURCE_FILE|";

// 读取文件异常
public static final String FILE_OP_ERROR = "ERROR-1-TDAgent|30002|ERROR|ERROR_OPERATE_FILE|";
// read file error
public static final String FILE_OP_ERROR = "ERROR-1-INLONG_AGENT|30002|ERROR|ERROR_OPERATE_FILE|";

// 磁盘满
public static final String DISK_FULL = "ERROR-1-TDAgent|30001|FATAL|FATAL_DISK_FULL|";
// disk full
public static final String DISK_FULL = "ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_DISK_FULL|";

// 内存溢出
public static final String OOM_ERROR = "ERROR-1-TDAgent|30001|FATAL|FATAL_OOM_ERROR|";
// out of memory
public static final String OOM_ERROR = "ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_OOM_ERROR|";

// watcher异常
public static final String WATCHER_INVALID = "ERROR-1-TDAgent|40001|WARN|WARN_INVALID_WATCHER|";
// watcher error
public static final String WATCHER_INVALID = "ERROR-1-INLONG_AGENT|40001|WARN|WARN_INVALID_WATCHER|";

// 连不上tdmanager
public static final String CONNECT_TDM_ERROR = "ERROR-1-TDAgent|30002|ERROR"
+ "|ERROR_CANNOT_CONNECT_TO_TDM|";
// could not connect to manager
public static final String CONNECT_MANAGER_ERROR = "ERROR-1-INLONG_AGENT|30002|ERROR"
+ "|ERROR_CANNOT_CONNECT_TO_MANAGER|";

// 发送数据到tdbus失败
public static final String SEND_TO_BUS_ERROR = "ERROR-1-TDAgent|30003|ERROR|ERROR_SEND_TO_BUS|";
// send data to dataProxy failed
public static final String SEND_TO_BUS_ERROR = "ERROR-1-INLONG_AGENT|30003|ERROR|ERROR_SEND_TO_BUS|";

// 操作bdb异常
public static final String BDB_ERROR = "ERROR-1-TDAgent|30003|ERROR|BDB_OPERATION_ERROR|";
// operate bdb error
public static final String BDB_ERROR = "ERROR-1-INLONG_AGENT|30003|ERROR|BDB_OPERATION_ERROR|";

// 内部缓存满
public static final String MSG_BUFFER_FULL = "ERROR-1-TDAgent|40002|WARN|WARN_MSG_BUFFER_FULL|";
// buffer full
public static final String MSG_BUFFER_FULL = "ERROR-1-INLONG_AGENT|40002|WARN|WARN_MSG_BUFFER_FULL|";

// 监控到的事件不合法(任务已删除
public static final String FOUND_EVENT_INVALID = "ERROR-1-TDAgent|30003|ERROR"
// found event invalid(task has been delete
public static final String FOUND_EVENT_INVALID = "ERROR-1-INLONG_AGENT|30003|ERROR"
+ "|FOUND_EVENT_INVALID|";
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

/*
* This class is mainly used for scanning log file that we want to read. We use this class at
* tdagent recover process, the do and redo tasks and the current log file access when we deploy a
* inlong_agent recover process, the do and redo tasks and the current log file access when we deploy a
* new data source.
*/
public class FileScanner {
Expand Down Expand Up @@ -114,10 +114,6 @@ public static ArrayList<String> scanFile(TaskProfile conf, String originPattern,

private static ArrayList<String> getUpdatedOrNewFiles(String firstDir, String secondDir,
String fileName, long depth, int maxFileNum) {

// logger.info("getUpdatedOrNewFiles: firstdir: {}, seconddir: {} filename: {}",
// new Object[]{firstDir, secondDir, fileName});

ArrayList<String> ret = new ArrayList<String>();
ArrayList<File> readyFiles = new ArrayList<File>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void registerRecursively(File dir, int beginIndex) throws IOException {
logger.info("Register a new directory: " + dirName);
} catch (IOException e) {
/**
* 捕获异常,不能注册的子目录就忽略。
* catch error,ignore the child directory that can not register
*/
logger.error("Register directory {} error, skip it. ", dirName, e);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class NewDateUtils {
public static long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
public static long HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000;
// data source config error */
public static final String DATA_SOURCE_CONFIG_ERROR = "ERROR-0-TDAgent|10001|ERROR"
public static final String DATA_SOURCE_CONFIG_ERROR = "ERROR-0-INLONG_AGENT|10001|ERROR"
+ "|ERROR_DATA_SOURCE_CONFIG|";

/* Return the time in milliseconds for a data time. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,18 @@ public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Lon
private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
TaskStateEnum state) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId"); // 老字段 groupId
dataConfig.setInlongStreamId("testStreamId"); // 老字段 streamId
dataConfig.setDataReportType(1); // 老字段 reportType
dataConfig.setTaskType(3); // 老字段 任务类型,3 代表文件采集
dataConfig.setTaskId(taskId); // 老字段 任务 id
dataConfig.setState(state.ordinal()); // 新增! 任务状态 1 正常 2 暂停
dataConfig.setInlongGroupId("testGroupId");
dataConfig.setInlongStreamId("testStreamId");
dataConfig.setDataReportType(1);
dataConfig.setTaskType(3);
dataConfig.setTaskId(taskId);
dataConfig.setState(state.ordinal());
FileTaskConfig fileTaskConfig = new FileTaskConfig();
fileTaskConfig.setPattern(pattern);// 正则
fileTaskConfig.setTimeOffset("0d"); // 老字段 时间偏移 "-1d" 采一天前的 "-2h" 采 2 小时前的
fileTaskConfig.setMaxFileCount(100); // 最大文件数
fileTaskConfig.setCycleUnit("D"); // 新增! 任务周期 "D" 天 "h" 小时
fileTaskConfig.setRetry(retry); // 新增! 是否补录,如果是补录任务则为 true
fileTaskConfig.setPattern(pattern);
fileTaskConfig.setTimeOffset("0d");
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit("D");
fileTaskConfig.setRetry(retry);
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.sinks.filecollect;

import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.message.filecollect.PackageAckInfo;
import org.apache.inlong.agent.message.filecollect.SenderMessage;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.SendResult;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;

@RunWith(PowerMockRunner.class)
@PrepareForTest(SenderManager.class)
@PowerMockIgnore({"javax.management.*"})
public class TestSenderManager {

private static final Logger LOGGER = LoggerFactory.getLogger(TestSenderManager.class);
private static final ClassLoader LOADER = TestSenderManager.class.getClassLoader();
private static AgentBaseTestsHelper helper;
private static InstanceProfile profile;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
1L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new AgentThreadFactory("TestLogfileCollectTask"));

@BeforeClass
public static void setup() {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING);
profile = taskProfile.createInstanceProfile("", fileName,
"20230927");
}

@AfterClass
public static void teardown() throws Exception {
helper.teardownAgentHome();
}

@Test
public void testNormalAck() {
List<SendMessageCallback> cbList = new ArrayList<>();
try {
profile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(profile.getInstanceId()));
SenderManager senderManager = PowerMockito.spy(new SenderManager(profile, "inlongGroupId", "sourceName"));
PowerMockito.doNothing().when(senderManager, "createMessageSender", Mockito.anyString());

PowerMockito.doAnswer(invocation -> {
SendMessageCallback cb = invocation.getArgument(0);
cbList.add(cb);
return null;
}).when(senderManager, "asyncSendByMessageSender", Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.any(),
Mockito.anyLong(), Mockito.any(),
Mockito.any(), Mockito.anyBoolean());

senderManager.Start();
Long packageIndex = 0L;
Long packageOffset = 100L;
List<byte[]> bodyList = new ArrayList<>();
bodyList.add("123456789".getBytes(StandardCharsets.UTF_8));
Integer resultBatchSize = 0;
for (int i = 0; i < bodyList.size(); i++) {
resultBatchSize += bodyList.get(i).length;
}
for (int i = 0; i < 10; i++) {
PackageAckInfo ackInfo = new PackageAckInfo(packageIndex++, packageOffset, resultBatchSize, false);
SenderMessage senderMessage = new SenderMessage("taskId", "instanceId", "groupId", "streamId", bodyList,
AgentUtils.getCurrentTime(), null, ackInfo);
senderManager.sendBatch(senderMessage);
packageOffset += 100;
}
Assert.assertTrue(cbList.size() == 10);
for (int i = 0; i < 5; i++) {
cbList.get(4 - i).onMessageAck(SendResult.OK);
}

await().atMost(2, TimeUnit.SECONDS).until(() -> !senderManager.sendFinished());
for (int i = 5; i < 10; i++) {
cbList.get(i).onMessageAck(SendResult.OK);
AgentUtils.silenceSleepInMs(10);
}
await().atMost(2, TimeUnit.SECONDS).until(() -> senderManager.sendFinished());
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue("testNormalAck failed", false);
}
}
}

0 comments on commit eb5f890

Please sign in to comment.