From af0fe7459eadd13d71bfc273e3b09ea44e91d1bd Mon Sep 17 00:00:00 2001 From: BePPPower Date: Fri, 7 Feb 2025 18:11:02 +0800 Subject: [PATCH 1/5] 1 --- .../java/org/apache/doris/common/Config.java | 6 +++ .../java/org/apache/doris/load/ExportMgr.java | 18 ++++++++ .../doris/load/loadv2/ExportMgrTest.java | 46 +++++++++++++++++++ 3 files changed, 70 insertions(+) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 24f9bd48682557..03277e6371af6f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -190,6 +190,12 @@ public class Config extends ConfigBase { "For ALTER, EXPORT jobs, remove the finished job if expired."}) public static int history_job_keep_max_second = 7 * 24 * 3600; // 7 days + @ConfField(mutable = true, masterOnly = true, description = { + "针对 EXPORT 作业,如果系统内 EXPORT 作业数量超过这个值,则会删除最老的记录。", + "For EXPORT jobs, If the number of EXPORT jobs in the system exceeds this value, " + + "the oldest records will be deleted."}) + public static int maximum_history_job_num = 1000; // 7 days + @ConfField(description = {"事务的清理周期,单位为秒。每个周期内,将会清理已经结束的并且过期的历史事务信息", "The clean interval of transaction, in seconds. " + "In each cycle, the expired history transaction will be cleaned"}) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index ed6ee29bb9c969..b8fec37e3b073d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -54,6 +54,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -543,6 +544,23 @@ public void removeOldExportJobs() { } } } + + int maximumHistoryJobNum = Config.maximum_history_job_num; + List> jobList = new ArrayList<>(exportIdToJob.entrySet()); + jobList.sort(Comparator.comparingLong(entry -> entry.getValue().getCreateTimeMs())); + + while (exportIdToJob.size() > maximumHistoryJobNum) { + // Remove the oldest job + Map.Entry oldestEntry = jobList.remove(0); + exportIdToJob.remove(oldestEntry.getKey()); + Map labelJobs = dbTolabelToExportJobId.get(oldestEntry.getValue().getDbId()); + if (labelJobs != null) { + labelJobs.remove(oldestEntry.getValue().getLabel()); + if (labelJobs.isEmpty()) { + dbTolabelToExportJobId.remove(oldestEntry.getValue().getDbId()); + } + } + } } finally { writeUnlock(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java index 448e7608a7bb3c..721f34346e1b53 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java @@ -19,9 +19,11 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.TableName; +import org.apache.doris.common.Config; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportJobState; import org.apache.doris.load.ExportMgr; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.MockedAuth; @@ -31,6 +33,7 @@ import org.junit.Before; import org.junit.Test; +import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -76,6 +79,49 @@ public void testShowExport() throws Exception { } + @Test + public void testRemoveOldExportJobs() { + // Setup: Create jobs with different creation times + long currentTime = System.currentTimeMillis(); + for (int i = 1; i <= 10; i++) { + ExportJob job = makeExportJob(i, "label" + i); + // Jobs created 1, 2...10 days ago + Deencapsulation.setField(job, "createTimeMs", currentTime - (i * 24 * 3600 * 1000)); + Deencapsulation.setField(job, "state", ExportJobState.FINISHED); + exportMgr.unprotectAddJob(job); + } + + // Invoke the method + exportMgr.removeOldExportJobs(); + + // Assertions: Check the number of jobs remaining + List remainingJobs = exportMgr.getJobs(); + Assert.assertTrue(remainingJobs.size() <= Config.maximum_history_job_num); + Assert.assertEquals(7, remainingJobs.size()); // Expecting 8 jobs to remain + + + for (int i = 11; i <= 1010; i++) { + ExportJob job = makeExportJob(i, "label" + i); + // Jobs created 0, 1, 2, 3, 4...1000 seconds ago + Deencapsulation.setField(job, "createTimeMs", currentTime - (i * 1000)); + Deencapsulation.setField(job, "state", ExportJobState.FINISHED); + exportMgr.unprotectAddJob(job); + } + + // Invoke the method + exportMgr.removeOldExportJobs(); + // Assertions: Check the number of jobs remaining + remainingJobs = exportMgr.getJobs(); + Assert.assertTrue(remainingJobs.size() <= Config.maximum_history_job_num); + Assert.assertEquals(1000, remainingJobs.size()); // Expecting 1000 jobs to remain + + // check the created time + remainingJobs.sort(Comparator.comparingLong(entry -> entry.getCreateTimeMs())); + for (int i = 0; i < remainingJobs.size(); ++i) { + Assert.assertEquals(1010 - i, remainingJobs.get(i).getId()); + } + } + private ExportJob makeExportJob(long id, String label) { ExportJob job1 = new ExportJob(id); Deencapsulation.setField(job1, "label", label); From 9cb94c113129d5fcdba1a4c9054f7fa6a85b3103 Mon Sep 17 00:00:00 2001 From: BePPPower Date: Mon, 10 Feb 2025 15:16:42 +0800 Subject: [PATCH 2/5] 2 --- .../java/org/apache/doris/common/Config.java | 2 +- .../java/org/apache/doris/load/ExportMgr.java | 26 +++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 03277e6371af6f..a4b75be28bea53 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -194,7 +194,7 @@ public class Config extends ConfigBase { "针对 EXPORT 作业,如果系统内 EXPORT 作业数量超过这个值,则会删除最老的记录。", "For EXPORT jobs, If the number of EXPORT jobs in the system exceeds this value, " + "the oldest records will be deleted."}) - public static int maximum_history_job_num = 1000; // 7 days + public static int max_export_history_job_num = 1000; @ConfField(description = {"事务的清理周期,单位为秒。每个周期内,将会清理已经结束的并且过期的历史事务信息", "The clean interval of transaction, in seconds. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index b8fec37e3b073d..06386698cda28a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -545,19 +545,19 @@ public void removeOldExportJobs() { } } - int maximumHistoryJobNum = Config.maximum_history_job_num; - List> jobList = new ArrayList<>(exportIdToJob.entrySet()); - jobList.sort(Comparator.comparingLong(entry -> entry.getValue().getCreateTimeMs())); - - while (exportIdToJob.size() > maximumHistoryJobNum) { - // Remove the oldest job - Map.Entry oldestEntry = jobList.remove(0); - exportIdToJob.remove(oldestEntry.getKey()); - Map labelJobs = dbTolabelToExportJobId.get(oldestEntry.getValue().getDbId()); - if (labelJobs != null) { - labelJobs.remove(oldestEntry.getValue().getLabel()); - if (labelJobs.isEmpty()) { - dbTolabelToExportJobId.remove(oldestEntry.getValue().getDbId()); + if (exportIdToJob.size() > Config.max_export_history_job_num) { + List> jobList = new ArrayList<>(exportIdToJob.entrySet()); + jobList.sort(Comparator.comparingLong(entry -> entry.getValue().getCreateTimeMs())); + while (exportIdToJob.size() > Config.max_export_history_job_num) { + // Remove the oldest job + Map.Entry oldestEntry = jobList.remove(0); + exportIdToJob.remove(oldestEntry.getKey()); + Map labelJobs = dbTolabelToExportJobId.get(oldestEntry.getValue().getDbId()); + if (labelJobs != null) { + labelJobs.remove(oldestEntry.getValue().getLabel()); + if (labelJobs.isEmpty()) { + dbTolabelToExportJobId.remove(oldestEntry.getValue().getDbId()); + } } } } From e4519fa55e2fb7d017a944268851b649b08b57e3 Mon Sep 17 00:00:00 2001 From: BePPPower Date: Mon, 10 Feb 2025 16:11:31 +0800 Subject: [PATCH 3/5] 3 --- .../test/java/org/apache/doris/load/loadv2/ExportMgrTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java index 721f34346e1b53..38549537fbc82f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java @@ -96,7 +96,7 @@ public void testRemoveOldExportJobs() { // Assertions: Check the number of jobs remaining List remainingJobs = exportMgr.getJobs(); - Assert.assertTrue(remainingJobs.size() <= Config.maximum_history_job_num); + Assert.assertTrue(remainingJobs.size() <= Config.history_job_keep_max_second); Assert.assertEquals(7, remainingJobs.size()); // Expecting 8 jobs to remain @@ -112,7 +112,7 @@ public void testRemoveOldExportJobs() { exportMgr.removeOldExportJobs(); // Assertions: Check the number of jobs remaining remainingJobs = exportMgr.getJobs(); - Assert.assertTrue(remainingJobs.size() <= Config.maximum_history_job_num); + Assert.assertTrue(remainingJobs.size() <= Config.history_job_keep_max_second); Assert.assertEquals(1000, remainingJobs.size()); // Expecting 1000 jobs to remain // check the created time From ef9bdc53ad638ddf9142701f9ed4212ebf49f355 Mon Sep 17 00:00:00 2001 From: BePPPower Date: Fri, 14 Feb 2025 16:11:51 +0800 Subject: [PATCH 4/5] 4 --- fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index fdfd1e3e9e2e24..bf4cd024c4142f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -2547,6 +2547,10 @@ public long saveExportJob(CountingDataOutputStream dos, long checksum) throws IO long curTime = System.currentTimeMillis(); List jobs = exportMgr.getJobs().stream().filter(t -> !t.isExpired(curTime)) .collect(Collectors.toList()); + jobs.sort(Comparator.comparingLong(ExportJob::getCreateTimeMs)); + while (jobs.size() > Config.max_export_history_job_num) { + jobs.remove(0); + } int size = jobs.size(); checksum ^= size; dos.writeInt(size); From 5d47c164d02c1562eb1c369cc115a0d7cd260e7b Mon Sep 17 00:00:00 2001 From: BePPPower Date: Mon, 17 Feb 2025 14:37:35 +0800 Subject: [PATCH 5/5] 5 --- .../src/main/java/org/apache/doris/catalog/Env.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index bf4cd024c4142f..5ca0961ca6a8ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -165,6 +165,7 @@ import org.apache.doris.journal.bdbje.Timestamp; import org.apache.doris.load.DeleteHandler; import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportJobState; import org.apache.doris.load.ExportMgr; import org.apache.doris.load.GroupCommitManager; import org.apache.doris.load.Load; @@ -2547,9 +2548,15 @@ public long saveExportJob(CountingDataOutputStream dos, long checksum) throws IO long curTime = System.currentTimeMillis(); List jobs = exportMgr.getJobs().stream().filter(t -> !t.isExpired(curTime)) .collect(Collectors.toList()); - jobs.sort(Comparator.comparingLong(ExportJob::getCreateTimeMs)); - while (jobs.size() > Config.max_export_history_job_num) { - jobs.remove(0); + if (jobs.size() > Config.max_export_history_job_num) { + jobs.sort(Comparator.comparingLong(ExportJob::getCreateTimeMs)); + Iterator iterator = jobs.iterator(); + while (jobs.size() > Config.max_export_history_job_num && iterator.hasNext()) { + ExportJob job = iterator.next(); + if (job.getState() == ExportJobState.FINISHED || job.getState() == ExportJobState.CANCELLED) { + iterator.remove(); + } + } } int size = jobs.size(); checksum ^= size;