diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 24f9bd48682557..a4b75be28bea53 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -190,6 +190,12 @@ public class Config extends ConfigBase { "For ALTER, EXPORT jobs, remove the finished job if expired."}) public static int history_job_keep_max_second = 7 * 24 * 3600; // 7 days + @ConfField(mutable = true, masterOnly = true, description = { + "针对 EXPORT 作业,如果系统内 EXPORT 作业数量超过这个值,则会删除最老的记录。", + "For EXPORT jobs, If the number of EXPORT jobs in the system exceeds this value, " + + "the oldest records will be deleted."}) + public static int max_export_history_job_num = 1000; + @ConfField(description = {"事务的清理周期,单位为秒。每个周期内,将会清理已经结束的并且过期的历史事务信息", "The clean interval of transaction, in seconds. " + "In each cycle, the expired history transaction will be cleaned"}) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index fdfd1e3e9e2e24..5ca0961ca6a8ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -165,6 +165,7 @@ import org.apache.doris.journal.bdbje.Timestamp; import org.apache.doris.load.DeleteHandler; import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportJobState; import org.apache.doris.load.ExportMgr; import org.apache.doris.load.GroupCommitManager; import org.apache.doris.load.Load; @@ -2547,6 +2548,16 @@ public long saveExportJob(CountingDataOutputStream dos, long checksum) throws IO long curTime = System.currentTimeMillis(); List jobs = exportMgr.getJobs().stream().filter(t -> !t.isExpired(curTime)) .collect(Collectors.toList()); + if (jobs.size() > Config.max_export_history_job_num) { + jobs.sort(Comparator.comparingLong(ExportJob::getCreateTimeMs)); + Iterator iterator = jobs.iterator(); + while (jobs.size() > Config.max_export_history_job_num && iterator.hasNext()) { + ExportJob job = iterator.next(); + if (job.getState() == ExportJobState.FINISHED || job.getState() == ExportJobState.CANCELLED) { + iterator.remove(); + } + } + } int size = jobs.size(); checksum ^= size; dos.writeInt(size); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index ed6ee29bb9c969..06386698cda28a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -54,6 +54,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -543,6 +544,23 @@ public void removeOldExportJobs() { } } } + + if (exportIdToJob.size() > Config.max_export_history_job_num) { + List> jobList = new ArrayList<>(exportIdToJob.entrySet()); + jobList.sort(Comparator.comparingLong(entry -> entry.getValue().getCreateTimeMs())); + while (exportIdToJob.size() > Config.max_export_history_job_num) { + // Remove the oldest job + Map.Entry oldestEntry = jobList.remove(0); + exportIdToJob.remove(oldestEntry.getKey()); + Map labelJobs = dbTolabelToExportJobId.get(oldestEntry.getValue().getDbId()); + if (labelJobs != null) { + labelJobs.remove(oldestEntry.getValue().getLabel()); + if (labelJobs.isEmpty()) { + dbTolabelToExportJobId.remove(oldestEntry.getValue().getDbId()); + } + } + } + } } finally { writeUnlock(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java index 448e7608a7bb3c..38549537fbc82f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java @@ -19,9 +19,11 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.TableName; +import org.apache.doris.common.Config; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportJobState; import org.apache.doris.load.ExportMgr; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.MockedAuth; @@ -31,6 +33,7 @@ import org.junit.Before; import org.junit.Test; +import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -76,6 +79,49 @@ public void testShowExport() throws Exception { } + @Test + public void testRemoveOldExportJobs() { + // Setup: Create jobs with different creation times + long currentTime = System.currentTimeMillis(); + for (int i = 1; i <= 10; i++) { + ExportJob job = makeExportJob(i, "label" + i); + // Jobs created 1, 2...10 days ago + Deencapsulation.setField(job, "createTimeMs", currentTime - (i * 24 * 3600 * 1000)); + Deencapsulation.setField(job, "state", ExportJobState.FINISHED); + exportMgr.unprotectAddJob(job); + } + + // Invoke the method + exportMgr.removeOldExportJobs(); + + // Assertions: Check the number of jobs remaining + List remainingJobs = exportMgr.getJobs(); + Assert.assertTrue(remainingJobs.size() <= Config.history_job_keep_max_second); + Assert.assertEquals(7, remainingJobs.size()); // Expecting 8 jobs to remain + + + for (int i = 11; i <= 1010; i++) { + ExportJob job = makeExportJob(i, "label" + i); + // Jobs created 0, 1, 2, 3, 4...1000 seconds ago + Deencapsulation.setField(job, "createTimeMs", currentTime - (i * 1000)); + Deencapsulation.setField(job, "state", ExportJobState.FINISHED); + exportMgr.unprotectAddJob(job); + } + + // Invoke the method + exportMgr.removeOldExportJobs(); + // Assertions: Check the number of jobs remaining + remainingJobs = exportMgr.getJobs(); + Assert.assertTrue(remainingJobs.size() <= Config.history_job_keep_max_second); + Assert.assertEquals(1000, remainingJobs.size()); // Expecting 1000 jobs to remain + + // check the created time + remainingJobs.sort(Comparator.comparingLong(entry -> entry.getCreateTimeMs())); + for (int i = 0; i < remainingJobs.size(); ++i) { + Assert.assertEquals(1010 - i, remainingJobs.get(i).getId()); + } + } + private ExportJob makeExportJob(long id, String label) { ExportJob job1 = new ExportJob(id); Deencapsulation.setField(job1, "label", label);