Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9286][Agent] Adjust the time offset calculation function #9288

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;

public class DateTransUtils {

private static final Logger logger = LoggerFactory.getLogger(DateTransUtils.class);

// convert millSec to YYYMMDD by cycleUnit
public static String millSecConvertToTimeStr(long time, String cycleUnit) {
return millSecConvertToTimeStr(time, cycleUnit, TimeZone.getDefault());
}

// convert YYYMMDD to millSec by cycleUnit
public static long timeStrConvertTomillSec(String time, String cycleUnit)
throws ParseException {
return timeStrConvertTomillSec(time, cycleUnit, TimeZone.getDefault());
}

public static long timeStrConvertTomillSec(String time, String cycleUnit, TimeZone timeZone)
throws ParseException {
long retTime = 0;
// SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
justinwwhuang marked this conversation as resolved.
Show resolved Hide resolved
SimpleDateFormat df = null;
if (cycleUnit.equals("Y") && time.length() == 4) {
df = new SimpleDateFormat("yyyy");
} else if (cycleUnit.equals("M") && time.length() == 6) {
df = new SimpleDateFormat("yyyyMM");
} else if (cycleUnit.equals("D") && time.length() == 8) {
df = new SimpleDateFormat("yyyyMMdd");
} else if (cycleUnit.equalsIgnoreCase("h") && time.length() == 10) {
df = new SimpleDateFormat("yyyyMMddHH");
} else if (cycleUnit.contains("m") && time.length() == 12) {
df = new SimpleDateFormat("yyyyMMddHHmm");
} else {
logger.error("time {},cycleUnit {} can't parse!", time, cycleUnit);
throw new ParseException(time, 0);
}
try {
df.setTimeZone(timeZone);
retTime = df.parse(time).getTime();
if (cycleUnit.equals("10m")) {

}
} catch (ParseException e) {
logger.error("convert time string error. ", e);
}
return retTime;
}

// convert millSec to YYYMMDD by cycleUnit
public static String millSecConvertToTimeStr(long time, String cycleUnit, TimeZone tz) {
String retTime = null;

Calendar calendarInstance = Calendar.getInstance();
calendarInstance.setTimeInMillis(time);

Date dateTime = calendarInstance.getTime();
SimpleDateFormat df = null;
if ("Y".equalsIgnoreCase(cycleUnit)) {
df = new SimpleDateFormat("yyyy");
} else if ("M".equals(cycleUnit)) {
df = new SimpleDateFormat("yyyyMM");
} else if ("D".equalsIgnoreCase(cycleUnit)) {
df = new SimpleDateFormat("yyyyMMdd");
} else if ("h".equalsIgnoreCase(cycleUnit)) {
df = new SimpleDateFormat("yyyyMMddHH");
} else if (cycleUnit.contains("m")) {
df = new SimpleDateFormat("yyyyMMddHHmm");
} else {
logger.error("cycleUnit {} can't parse!", cycleUnit);
df = new SimpleDateFormat("yyyyMMddHH");
}
df.setTimeZone(tz);
retTime = df.format(dateTime);

if (cycleUnit.contains("m")) {

int cycleNum = Integer.parseInt(cycleUnit.substring(0,
cycleUnit.length() - 1));
int mmTime = Integer.parseInt(retTime.substring(
retTime.length() - 2, retTime.length()));
String realMMTime = "";
if (cycleNum * (mmTime / cycleNum) <= 0) {
realMMTime = "0" + cycleNum * (mmTime / cycleNum);
} else {
realMMTime = "" + cycleNum * (mmTime / cycleNum);
}
retTime = retTime.substring(0, retTime.length() - 2) + realMMTime;
}

return retTime;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator;
import org.apache.inlong.agent.plugin.utils.file.Files;
import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
import org.apache.inlong.agent.utils.DateTransUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -57,20 +58,19 @@ public BasicFileInfo(String fileName, String dataTime) {

private static final Logger logger = LoggerFactory.getLogger(FileScanner.class);

public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf, String originPattern, long failTime,
long recoverTime, boolean isRetry) {
public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf, String originPattern, long startTime,
long endTime, boolean isRetry) {
String cycleUnit = conf.getCycleUnit();
if (!isRetry) {
failTime -= NewDateUtils.calcOffset(conf.getTimeOffset());
recoverTime -= NewDateUtils.calcOffset(conf.getTimeOffset());
startTime += NewDateUtils.calcOffset(conf.getTimeOffset());
endTime += NewDateUtils.calcOffset(conf.getTimeOffset());
}

String startTime = NewDateUtils.millSecConvertToTimeStr(failTime, cycleUnit);
String endTime = NewDateUtils.millSecConvertToTimeStr(recoverTime, cycleUnit);
String strStartTime = DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit);
String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime, cycleUnit);
logger.info("task {} this scan time is between {} and {}.",
new Object[]{conf.getTaskId(), startTime, endTime});
new Object[]{conf.getTaskId(), strStartTime, strEndTime});

return scanTaskBetweenTimes(conf.getCycleUnit(), originPattern, startTime, endTime);
return scanTaskBetweenTimes(conf.getCycleUnit(), originPattern, strStartTime, strEndTime);
}

/* Scan log files and create tasks between two times. */
Expand All @@ -89,10 +89,10 @@ public static List<BasicFileInfo> scanTaskBetweenTimes(String cycleUnit, String
DEFAULT_FILE_MAX_NUM);
for (String file : fileList) {
// TODO the time is not YYYYMMDDHH
String dataTime = NewDateUtils.millSecConvertToTimeStr(time, cycleUnit);
String dataTime = DateTransUtils.millSecConvertToTimeStr(time, cycleUnit);
BasicFileInfo info = new BasicFileInfo(file, dataTime);
logger.info("scan new task fileName {} ,dataTime {}", file,
NewDateUtils.millSecConvertToTimeStr(time, cycleUnit));
DateTransUtils.millSecConvertToTimeStr(time, cycleUnit));
infos.add(info);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class LogFileCollectTask extends Task {
public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
public static final int CORE_THREAD_SLEEP_TIME = 1000;
public static final int CORE_THREAD_MAX_GAP_TIME_MS = 60 * 1000;
public static final int CORE_THREAD_PRINT_TIME = 10000;
private long lastPrintTime = 0;
private boolean retry;
private long startTime;
private long endTime;
Expand Down Expand Up @@ -221,6 +223,9 @@ public TaskProfile getProfile() {

@Override
public String getTaskId() {
if (taskProfile == null) {
return "";
}
return taskProfile.getTaskId();
}

Expand All @@ -234,6 +239,10 @@ public void run() {
Thread.currentThread().setName("directory-task-core-" + getTaskId());
running = true;
while (!isFinished()) {
if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_TIME) {
LOGGER.info("log file task running! taskId {}", getTaskId());
lastPrintTime = AgentUtils.getCurrentTime();
}
coreThreadUpdateTime = AgentUtils.getCurrentTime();
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
if (!initOK) {
Expand Down Expand Up @@ -274,7 +283,7 @@ private void runForNormal() {
private void scanExistingFile() {
originPatterns.forEach((originPattern) -> {
List<BasicFileInfo> fileInfos = scanExistingFileByPattern(originPattern);
LOGGER.info("scan {} get file count {}", originPattern, fileInfos.size());
LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), originPattern, fileInfos.size());
fileInfos.forEach((fileInfo) -> {
addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
});
Expand All @@ -299,7 +308,7 @@ private List<BasicFileInfo> scanExistingFileByPattern(String originPattern) {
long currentTime = System.currentTimeMillis();
// only scan two cycle, like two hours or two days
long offset = NewDateUtils.calcOffset("-2" + taskProfile.getCycleUnit());
startScanTime = currentTime - offset;
startScanTime = currentTime + offset;
endScanTime = currentTime;
}
return FileScanner.scanTaskBetweenTimes(taskProfile, originPattern, startScanTime, endScanTime, retry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
import org.apache.inlong.agent.plugin.utils.file.NonRegexPatternPosition;
import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
import org.apache.inlong.agent.utils.DateTransUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -273,7 +274,7 @@ public void removeUselessWatchDirectories(String curDataTime)
logger.info("removeUselessWatchDirectories {}", curDataTime);

/* Calculate the data time which is 3 cycle units earlier than current task data time. */
long curDataTimeMillis = NewDateUtils.timeStrConvertTomillSec(curDataTime, cycleUnit);
long curDataTimeMillis = DateTransUtils.timeStrConvertTomillSec(curDataTime, cycleUnit);
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(curDataTimeMillis);
if ("D".equalsIgnoreCase(cycleUnit)) {
Expand Down
Loading