Skip to content

Commit

Permalink
Delete expired data by job (elastic#57337)
Browse files Browse the repository at this point in the history
Deleting expired data can take a long time leading to timeouts if there
are many jobs. Often the problem is due to a few large jobs which 
prevent the regular maintenance of the remaining jobs. This change adds
a job_id parameter to the delete expired data endpoint to help clean up
those problematic jobs.
  • Loading branch information
davidkyle committed Jun 8, 2020
1 parent 619e4f8 commit aea0d87
Show file tree
Hide file tree
Showing 18 changed files with 339 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ Deletes expired and unused machine learning data.
[[ml-delete-expired-data-request]]
==== {api-request-title}

`DELETE _ml/_delete_expired_data`
`DELETE _ml/_delete_expired_data` +

`DELETE _ml/_delete_expired_data/<job_id>`

[[ml-delete-expired-data-prereqs]]
==== {api-prereq-title}
Expand All @@ -27,6 +29,19 @@ Deletes all job results, model snapshots and forecast data that have exceeded
their `retention days` period. Machine learning state documents that are not
associated with any job are also deleted.

You can limit the request to a single or set of {anomaly-jobs} by using a job identifier,
a group name, a comma-separated list of jobs, or a wildcard expression.
You can delete expired data for all {anomaly-jobs} by using `_all`, by specifying
`*` as the `<job_id>`, or by omitting the `<job_id>`.

[[ml-delete-expired-data-path-parms]]
==== {api-path-parms-title}

`<job_id>`::
(Optional, string)
Identifier for an {anomaly-job}. It can be a job identifier, a group name, or a
wildcard expression.

[[ml-delete-expired-data-request-body]]
==== {api-request-body-title}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.io.IOException;
import java.util.Objects;
Expand Down Expand Up @@ -46,10 +48,12 @@ public static class Request extends ActionRequest {
PARSER.declareFloat(Request::setRequestsPerSecond, REQUESTS_PER_SECOND);
PARSER.declareString((obj, value) -> obj.setTimeout(TimeValue.parseTimeValue(value, TIMEOUT.getPreferredName())),
TIMEOUT);
PARSER.declareString(Request::setJobId, Job.ID);
}

private Float requestsPerSecond;
private TimeValue timeout;
private String jobId = Metadata.ALL;

public Request() {}

Expand All @@ -67,6 +71,9 @@ public Request(StreamInput in) throws IOException {
this.requestsPerSecond = null;
this.timeout = null;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO BWC for V_7_9_0
jobId = in.readString();
}
}

public Float getRequestsPerSecond() {
Expand All @@ -77,6 +84,10 @@ public TimeValue getTimeout() {
return timeout;
}

public String getJobId() {
return jobId;
}

public Request setRequestsPerSecond(Float requestsPerSecond) {
this.requestsPerSecond = requestsPerSecond;
return this;
Expand All @@ -87,6 +98,11 @@ public Request setTimeout(TimeValue timeout) {
return this;
}

public Request setJobId(String jobId) {
this.jobId = jobId;
return this;
}

@Override
public ActionRequestValidationException validate() {
if (this.requestsPerSecond != null && this.requestsPerSecond != -1.0f && this.requestsPerSecond <= 0) {
Expand All @@ -103,12 +119,13 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(requestsPerSecond, request.requestsPerSecond)
&& Objects.equals(jobId, request.jobId)
&& Objects.equals(timeout, request.timeout);
}

@Override
public int hashCode() {
return Objects.hash(requestsPerSecond, timeout);
return Objects.hash(requestsPerSecond, timeout, jobId);
}

@Override
Expand All @@ -118,6 +135,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalFloat(requestsPerSecond);
out.writeOptionalTimeValue(timeout);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO BWC for V_7_9_0
out.writeString(jobId);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ public static String documentId(String jobId) {
return ANOMALY_DETECTOR_JOB_TYPE + "-" + jobId;
}

/**
* Returns the job id from the doc id. Returns {@code null} if the doc id is invalid.
*/
@Nullable
public static String extractJobIdFromDocumentId(String docId) {
String jobId = docId.replaceAll("^" + ANOMALY_DETECTOR_JOB_TYPE +"-", "");
return jobId.equals(docId) ? null : jobId;
}


/**
* Return the Job Id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@ public class DeleteExpiredDataActionRequestTests extends AbstractBWCWireSerializ

@Override
protected Request createTestInstance() {
return new Request(
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test")
);
Request request = new Request();
if (randomBoolean()) {
request.setRequestsPerSecond(randomFloat());
}
if (randomBoolean()) {
request.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "test"));
}
if (randomBoolean()) {
request.setJobId(randomAlphaOfLength(5));
}
return request;
}

@Override
Expand All @@ -31,6 +38,12 @@ protected Request mutateInstanceForVersion(Request instance, Version version) {
if (version.before(Version.V_7_8_0)) {
return new Request();
}
if (version.before(Version.V_8_0_0)) { // TODO make V_7_9_0
Request request = new Request();
request.setRequestsPerSecond(instance.getRequestsPerSecond());
request.setTimeout(instance.getTimeout());
return request;
}
return instance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,15 @@ public void testCopyingJobDoesNotCauseStackOverflow() {
}
}

public void testDocumentId() {
String jobFoo = "foo";
assertEquals("anomaly_detector-" + jobFoo, Job.documentId(jobFoo));
assertEquals(jobFoo, Job.extractJobIdFromDocumentId(
Job.documentId(jobFoo)
));
assertNull(Job.extractJobIdFromDocumentId("some_other_type-foo"));
}

public static Job.Builder buildJobBuilder(String id, Date date) {
Job.Builder builder = new Job.Builder(id);
builder.setCreateTime(date);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ private void deleteExpiredData(DeleteExpiredDataAction.Request request,
Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, auditor, threadPool),
new ExpiredResultsRemover(client, request.getJobId(), auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, request.getJobId(), threadPool),
new UnusedStateRemover(client, clusterService),
new EmptyStateIndexRemover(client)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
Expand All @@ -23,13 +23,17 @@

public class BatchedJobsIterator extends BatchedDocumentsIterator<Job.Builder> {

public BatchedJobsIterator(OriginSettingClient client, String index) {
private final String jobIdExpression;

public BatchedJobsIterator(OriginSettingClient client, String index, String jobIdExpression) {
super(client, index);
this.jobIdExpression = jobIdExpression;
}

@Override
protected QueryBuilder getQuery() {
return new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
String [] tokens = Strings.tokenizeToStringArray(jobIdExpression, ",");
return JobConfigProvider.buildJobWildcardQuery(tokens, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ public void expandJobsIds(String expression,
boolean allowMissingConfigs,
ActionListener<SortedSet<String>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildJobWildcardQuery(tokens, excludeDeleting));
sourceBuilder.sort(Job.ID.getPreferredName());
sourceBuilder.fetchSource(false);
sourceBuilder.docValueField(Job.ID.getPreferredName(), null);
Expand Down Expand Up @@ -579,7 +579,7 @@ public void expandJobsIds(String expression,
*/
public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<List<Job.Builder>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildJobWildcardQuery(tokens, excludeDeleting));
sourceBuilder.sort(Job.ID.getPreferredName());

SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
Expand Down Expand Up @@ -773,7 +773,7 @@ private Job.Builder parseJobLenientlyFromSource(BytesReference source) throws IO
}
}

private QueryBuilder buildQuery(String [] tokens, boolean excludeDeleting) {
public static QueryBuilder buildJobWildcardQuery(String [] tokens, boolean excludeDeleting) {
QueryBuilder jobQuery = new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
if (Strings.isAllOrWildcard(tokens) && excludeDeleting == false) {
// match all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
*/
abstract class AbstractExpiredJobDataRemover implements MlDataRemover {

private final OriginSettingClient client;
private final String jobIdExpression;
protected final OriginSettingClient client;

AbstractExpiredJobDataRemover(OriginSettingClient client) {
AbstractExpiredJobDataRemover(String jobIdExpression, OriginSettingClient client) {
this.jobIdExpression = jobIdExpression;
this.client = client;
}

Expand Down Expand Up @@ -85,7 +87,7 @@ private void removeData(WrappedBatchedJobsIterator jobIterator,
}

private WrappedBatchedJobsIterator newJobIterator() {
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName(), jobIdExpression);
return new WrappedBatchedJobsIterator(jobsIterator);
}

Expand All @@ -112,8 +114,44 @@ static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
}

/**
* BatchedJobsIterator efficiently returns batches of jobs using a scroll
* search but AbstractExpiredJobDataRemover works with one job at a time.
* The latest time that cutoffs are measured from is not wall clock time,
* but some other reference point that makes sense for the type of data
* being removed. This class groups the cutoff time with it's "latest"
* reference point.
*/
protected static final class CutoffDetails {

public final long latestTimeMs;
public final long cutoffEpochMs;

public CutoffDetails(long latestTimeMs, long cutoffEpochMs) {
this.latestTimeMs = latestTimeMs;
this.cutoffEpochMs = cutoffEpochMs;
}

@Override
public int hashCode() {
return Objects.hash(latestTimeMs, cutoffEpochMs);
}

@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other instanceof CutoffDetails == false) {
return false;
}
CutoffDetails that = (CutoffDetails) other;
return this.latestTimeMs == that.latestTimeMs &&
this.cutoffEpochMs == that.cutoffEpochMs;
}
}

/**
* A wrapper around {@link BatchedJobsIterator} that allows iterating jobs one
* at a time from the batches returned by {@code BatchedJobsIterator}
*
* This class abstracts away the logic of pulling one job at a time from
* multiple batches.
*/
Expand Down Expand Up @@ -155,39 +193,4 @@ private VolatileCursorIterator<Job> createBatchIteratorFromBatch(Deque<Job.Build
return new VolatileCursorIterator<>(jobs);
}
}

/**
* The latest time that cutoffs are measured from is not wall clock time,
* but some other reference point that makes sense for the type of data
* being removed. This class groups the cutoff time with it's "latest"
* reference point.
*/
protected static final class CutoffDetails {

public final long latestTimeMs;
public final long cutoffEpochMs;

public CutoffDetails(long latestTimeMs, long cutoffEpochMs) {
this.latestTimeMs = latestTimeMs;
this.cutoffEpochMs = cutoffEpochMs;
}

@Override
public int hashCode() {
return Objects.hash(latestTimeMs, cutoffEpochMs);
}

@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other instanceof CutoffDetails == false) {
return false;
}
CutoffDetails that = (CutoffDetails) other;
return this.latestTimeMs == that.latestTimeMs &&
this.cutoffEpochMs == that.cutoffEpochMs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
*/
private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000;

private final OriginSettingClient client;
private final ThreadPool threadPool;

public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threadPool) {
super(client);
this.client = Objects.requireNonNull(client);
public ExpiredModelSnapshotsRemover(OriginSettingClient client, String jobIdExpression, ThreadPool threadPool) {
super(jobIdExpression, client);
this.threadPool = Objects.requireNonNull(threadPool);
}

Expand Down
Loading

0 comments on commit aea0d87

Please sign in to comment.