Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](Export) Enhance removeOldExportJobs Logic #47604

Merged
merged 5 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ public class Config extends ConfigBase {
"For ALTER, EXPORT jobs, remove the finished job if expired."})
public static int history_job_keep_max_second = 7 * 24 * 3600; // 7 days

@ConfField(mutable = true, masterOnly = true, description = {
"针对 EXPORT 作业,如果系统内 EXPORT 作业数量超过这个值,则会删除最老的记录。",
"For EXPORT jobs, If the number of EXPORT jobs in the system exceeds this value, "
+ "the oldest records will be deleted."})
public static int max_export_history_job_num = 1000;

@ConfField(description = {"事务的清理周期,单位为秒。每个周期内,将会清理已经结束的并且过期的历史事务信息",
"The clean interval of transaction, in seconds. "
+ "In each cycle, the expired history transaction will be cleaned"})
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
import org.apache.doris.journal.bdbje.Timestamp;
import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.GroupCommitManager;
import org.apache.doris.load.Load;
Expand Down Expand Up @@ -2547,6 +2548,16 @@ public long saveExportJob(CountingDataOutputStream dos, long checksum) throws IO
long curTime = System.currentTimeMillis();
List<ExportJob> jobs = exportMgr.getJobs().stream().filter(t -> !t.isExpired(curTime))
.collect(Collectors.toList());
if (jobs.size() > Config.max_export_history_job_num) {
jobs.sort(Comparator.comparingLong(ExportJob::getCreateTimeMs));
Iterator<ExportJob> iterator = jobs.iterator();
while (jobs.size() > Config.max_export_history_job_num && iterator.hasNext()) {
ExportJob job = iterator.next();
if (job.getState() == ExportJobState.FINISHED || job.getState() == ExportJobState.CANCELLED) {
iterator.remove();
}
}
}
int size = jobs.size();
checksum ^= size;
dos.writeInt(size);
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -543,6 +544,23 @@ public void removeOldExportJobs() {
}
}
}

if (exportIdToJob.size() > Config.max_export_history_job_num) {
List<Map.Entry<Long, ExportJob>> jobList = new ArrayList<>(exportIdToJob.entrySet());
jobList.sort(Comparator.comparingLong(entry -> entry.getValue().getCreateTimeMs()));
while (exportIdToJob.size() > Config.max_export_history_job_num) {
// Remove the oldest job
Map.Entry<Long, ExportJob> oldestEntry = jobList.remove(0);
exportIdToJob.remove(oldestEntry.getKey());
Map<String, Long> labelJobs = dbTolabelToExportJobId.get(oldestEntry.getValue().getDbId());
if (labelJobs != null) {
labelJobs.remove(oldestEntry.getValue().getLabel());
if (labelJobs.isEmpty()) {
dbTolabelToExportJobId.remove(oldestEntry.getValue().getDbId());
}
}
}
}
} finally {
writeUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.TableName;
import org.apache.doris.common.Config;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.MockedAuth;
Expand All @@ -31,6 +33,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;

Expand Down Expand Up @@ -76,6 +79,49 @@ public void testShowExport() throws Exception {

}

@Test
public void testRemoveOldExportJobs() {
// Setup: Create jobs with different creation times
long currentTime = System.currentTimeMillis();
for (int i = 1; i <= 10; i++) {
ExportJob job = makeExportJob(i, "label" + i);
// Jobs created 1, 2...10 days ago
Deencapsulation.setField(job, "createTimeMs", currentTime - (i * 24 * 3600 * 1000));
Deencapsulation.setField(job, "state", ExportJobState.FINISHED);
exportMgr.unprotectAddJob(job);
}

// Invoke the method
exportMgr.removeOldExportJobs();

// Assertions: Check the number of jobs remaining
List<ExportJob> remainingJobs = exportMgr.getJobs();
Assert.assertTrue(remainingJobs.size() <= Config.history_job_keep_max_second);
Assert.assertEquals(7, remainingJobs.size()); // Expecting 8 jobs to remain


for (int i = 11; i <= 1010; i++) {
ExportJob job = makeExportJob(i, "label" + i);
// Jobs created 0, 1, 2, 3, 4...1000 seconds ago
Deencapsulation.setField(job, "createTimeMs", currentTime - (i * 1000));
Deencapsulation.setField(job, "state", ExportJobState.FINISHED);
exportMgr.unprotectAddJob(job);
}

// Invoke the method
exportMgr.removeOldExportJobs();
// Assertions: Check the number of jobs remaining
remainingJobs = exportMgr.getJobs();
Assert.assertTrue(remainingJobs.size() <= Config.history_job_keep_max_second);
Assert.assertEquals(1000, remainingJobs.size()); // Expecting 1000 jobs to remain

// check the created time
remainingJobs.sort(Comparator.comparingLong(entry -> entry.getCreateTimeMs()));
for (int i = 0; i < remainingJobs.size(); ++i) {
Assert.assertEquals(1010 - i, remainingJobs.get(i).getId());
}
}

private ExportJob makeExportJob(long id, String label) {
ExportJob job1 = new ExportJob(id);
Deencapsulation.setField(job1, "label", label);
Expand Down
Loading