From bbeda643a62c7e29caf59967538b837ae6d6495b Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 5 Jun 2020 13:32:35 +0100 Subject: [PATCH] Delete expired data by job (#57337) Deleting expired data can take a long time leading to timeouts if there are many jobs. Often the problem is due to a few large jobs which prevent the regular maintenance of the remaining jobs. This change adds a job_id parameter to the delete expired data endpoint to help clean up those problematic jobs. --- .../apis/delete-expired-data.asciidoc | 17 ++- .../ml/action/DeleteExpiredDataAction.java | 22 ++- .../xpack/core/ml/job/config/Job.java | 9 ++ .../DeleteExpiredDataActionRequestTests.java | 21 ++- .../xpack/core/ml/job/config/JobTests.java | 9 ++ .../TransportDeleteExpiredDataAction.java | 4 +- .../job/persistence/BatchedJobsIterator.java | 10 +- .../ml/job/persistence/JobConfigProvider.java | 6 +- .../AbstractExpiredJobDataRemover.java | 83 +++++------ .../ExpiredModelSnapshotsRemover.java | 6 +- .../job/retention/ExpiredResultsRemover.java | 7 +- .../ml/job/retention/UnusedStateRemover.java | 10 +- .../ml/rest/RestDeleteExpiredDataAction.java | 36 ++++- .../AbstractExpiredJobDataRemoverTests.java | 21 +-- .../ExpiredModelSnapshotsRemoverTests.java | 2 +- .../retention/ExpiredResultsRemoverTests.java | 17 ++- .../api/ml.delete_expired_data.json | 24 ++++ .../test/ml/delete_expired_data.yml | 130 +++++++++++++++++- 18 files changed, 339 insertions(+), 95 deletions(-) diff --git a/docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc b/docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc index 44679911ab255..3b5d1aba4fe44 100644 --- a/docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc @@ -11,7 +11,9 @@ Deletes expired and unused machine learning data. [[ml-delete-expired-data-request]] ==== {api-request-title} -`DELETE _ml/_delete_expired_data` +`DELETE _ml/_delete_expired_data` + + +`DELETE _ml/_delete_expired_data/` [[ml-delete-expired-data-prereqs]] ==== {api-prereq-title} @@ -27,6 +29,19 @@ Deletes all job results, model snapshots and forecast data that have exceeded their `retention days` period. Machine learning state documents that are not associated with any job are also deleted. +You can limit the request to a single or set of {anomaly-jobs} by using a job identifier, +a group name, a comma-separated list of jobs, or a wildcard expression. +You can delete expired data for all {anomaly-jobs} by using `_all`, by specifying +`*` as the ``, or by omitting the ``. + +[[ml-delete-expired-data-path-parms]] +==== {api-path-parms-title} + +``:: +(Optional, string) +Identifier for an {anomaly-job}. It can be a job identifier, a group name, or a +wildcard expression. + [[ml-delete-expired-data-request-body]] ==== {api-request-body-title} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java index ad5b85757108c..717d091628bf0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -19,6 +20,7 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ml.job.config.Job; import java.io.IOException; import java.util.Objects; @@ -46,10 +48,12 @@ public static class Request extends ActionRequest { PARSER.declareFloat(Request::setRequestsPerSecond, REQUESTS_PER_SECOND); PARSER.declareString((obj, value) -> obj.setTimeout(TimeValue.parseTimeValue(value, TIMEOUT.getPreferredName())), TIMEOUT); + PARSER.declareString(Request::setJobId, Job.ID); } private Float requestsPerSecond; private TimeValue timeout; + private String jobId = Metadata.ALL; public Request() {} @@ -67,6 +71,9 @@ public Request(StreamInput in) throws IOException { this.requestsPerSecond = null; this.timeout = null; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO BWC for V_7_9_0 + jobId = in.readString(); + } } public Float getRequestsPerSecond() { @@ -77,6 +84,10 @@ public TimeValue getTimeout() { return timeout; } + public String getJobId() { + return jobId; + } + public Request setRequestsPerSecond(Float requestsPerSecond) { this.requestsPerSecond = requestsPerSecond; return this; @@ -87,6 +98,11 @@ public Request setTimeout(TimeValue timeout) { return this; } + public Request setJobId(String jobId) { + this.jobId = jobId; + return this; + } + @Override public ActionRequestValidationException validate() { if (this.requestsPerSecond != null && this.requestsPerSecond != -1.0f && this.requestsPerSecond <= 0) { @@ -103,12 +119,13 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; return Objects.equals(requestsPerSecond, request.requestsPerSecond) + && Objects.equals(jobId, request.jobId) && Objects.equals(timeout, request.timeout); } @Override public int hashCode() { - return Objects.hash(requestsPerSecond, timeout); + return Objects.hash(requestsPerSecond, timeout, jobId); } @Override @@ -118,6 +135,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalFloat(requestsPerSecond); out.writeOptionalTimeValue(timeout); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO BWC for V_7_9_0 + out.writeString(jobId); + } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index fdeebc840858a..5a256467124fd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -248,6 +248,15 @@ public static String documentId(String jobId) { return ANOMALY_DETECTOR_JOB_TYPE + "-" + jobId; } + /** + * Returns the job id from the doc id. Returns {@code null} if the doc id is invalid. + */ + @Nullable + public static String extractJobIdFromDocumentId(String docId) { + String jobId = docId.replaceAll("^" + ANOMALY_DETECTOR_JOB_TYPE +"-", ""); + return jobId.equals(docId) ? null : jobId; + } + /** * Return the Job Id. diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java index 91fc144a8d085..8ef90e522a950 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java @@ -15,10 +15,17 @@ public class DeleteExpiredDataActionRequestTests extends AbstractBWCWireSerializ @Override protected Request createTestInstance() { - return new Request( - randomBoolean() ? null : randomFloat(), - randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test") - ); + Request request = new Request(); + if (randomBoolean()) { + request.setRequestsPerSecond(randomFloat()); + } + if (randomBoolean()) { + request.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "test")); + } + if (randomBoolean()) { + request.setJobId(randomAlphaOfLength(5)); + } + return request; } @Override @@ -31,6 +38,12 @@ protected Request mutateInstanceForVersion(Request instance, Version version) { if (version.before(Version.V_7_8_0)) { return new Request(); } + if (version.before(Version.V_8_0_0)) { // TODO make V_7_9_0 + Request request = new Request(); + request.setRequestsPerSecond(instance.getRequestsPerSecond()); + request.setTimeout(instance.getTimeout()); + return request; + } return instance; } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java index 94e3cae09e6d0..841c0ba2a1139 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java @@ -584,6 +584,15 @@ public void testCopyingJobDoesNotCauseStackOverflow() { } } + public void testDocumentId() { + String jobFoo = "foo"; + assertEquals("anomaly_detector-" + jobFoo, Job.documentId(jobFoo)); + assertEquals(jobFoo, Job.extractJobIdFromDocumentId( + Job.documentId(jobFoo) + )); + assertNull(Job.extractJobIdFromDocumentId("some_other_type-foo")); + } + public static Job.Builder buildJobBuilder(String id, Date date) { Job.Builder builder = new Job.Builder(id); builder.setCreateTime(date); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index 4357fa438ba75..e6ab9b0e82b63 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -88,9 +88,9 @@ private void deleteExpiredData(DeleteExpiredDataAction.Request request, Supplier isTimedOutSupplier) { AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); List dataRemovers = Arrays.asList( - new ExpiredResultsRemover(client, auditor, threadPool), + new ExpiredResultsRemover(client, request.getJobId(), auditor, threadPool), new ExpiredForecastsRemover(client, threadPool), - new ExpiredModelSnapshotsRemover(client, threadPool), + new ExpiredModelSnapshotsRemover(client, request.getJobId(), threadPool), new UnusedStateRemover(client, clusterService), new EmptyStateIndexRemover(client) ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedJobsIterator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedJobsIterator.java index f933769c9454f..ef121dd71b74d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedJobsIterator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedJobsIterator.java @@ -7,13 +7,13 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator; @@ -23,13 +23,17 @@ public class BatchedJobsIterator extends BatchedDocumentsIterator { - public BatchedJobsIterator(OriginSettingClient client, String index) { + private final String jobIdExpression; + + public BatchedJobsIterator(OriginSettingClient client, String index, String jobIdExpression) { super(client, index); + this.jobIdExpression = jobIdExpression; } @Override protected QueryBuilder getQuery() { - return new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE); + String [] tokens = Strings.tokenizeToStringArray(jobIdExpression, ","); + return JobConfigProvider.buildJobWildcardQuery(tokens, true); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 02281cee8ea49..80d9fde310edb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -512,7 +512,7 @@ public void expandJobsIds(String expression, boolean allowMissingConfigs, ActionListener> listener) { String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildJobWildcardQuery(tokens, excludeDeleting)); sourceBuilder.sort(Job.ID.getPreferredName()); sourceBuilder.fetchSource(false); sourceBuilder.docValueField(Job.ID.getPreferredName(), null); @@ -573,7 +573,7 @@ public void expandJobsIds(String expression, */ public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener> listener) { String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildJobWildcardQuery(tokens, excludeDeleting)); sourceBuilder.sort(Job.ID.getPreferredName()); SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) @@ -767,7 +767,7 @@ private Job.Builder parseJobLenientlyFromSource(BytesReference source) throws IO } } - private QueryBuilder buildQuery(String [] tokens, boolean excludeDeleting) { + public static QueryBuilder buildJobWildcardQuery(String [] tokens, boolean excludeDeleting) { QueryBuilder jobQuery = new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE); if (Strings.isAllOrWildcard(tokens) && excludeDeleting == false) { // match all diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index 322466b23dd6c..65d43434cb431 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -31,9 +31,11 @@ */ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { - private final OriginSettingClient client; + private final String jobIdExpression; + protected final OriginSettingClient client; - AbstractExpiredJobDataRemover(OriginSettingClient client) { + AbstractExpiredJobDataRemover(String jobIdExpression, OriginSettingClient client) { + this.jobIdExpression = jobIdExpression; this.client = client; } @@ -85,7 +87,7 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, } private WrappedBatchedJobsIterator newJobIterator() { - BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName()); + BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName(), jobIdExpression); return new WrappedBatchedJobsIterator(jobsIterator); } @@ -112,8 +114,44 @@ static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { } /** - * BatchedJobsIterator efficiently returns batches of jobs using a scroll - * search but AbstractExpiredJobDataRemover works with one job at a time. + * The latest time that cutoffs are measured from is not wall clock time, + * but some other reference point that makes sense for the type of data + * being removed. This class groups the cutoff time with it's "latest" + * reference point. + */ + protected static final class CutoffDetails { + + public final long latestTimeMs; + public final long cutoffEpochMs; + + public CutoffDetails(long latestTimeMs, long cutoffEpochMs) { + this.latestTimeMs = latestTimeMs; + this.cutoffEpochMs = cutoffEpochMs; + } + + @Override + public int hashCode() { + return Objects.hash(latestTimeMs, cutoffEpochMs); + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (other instanceof CutoffDetails == false) { + return false; + } + CutoffDetails that = (CutoffDetails) other; + return this.latestTimeMs == that.latestTimeMs && + this.cutoffEpochMs == that.cutoffEpochMs; + } + } + + /** + * A wrapper around {@link BatchedJobsIterator} that allows iterating jobs one + * at a time from the batches returned by {@code BatchedJobsIterator} + * * This class abstracts away the logic of pulling one job at a time from * multiple batches. */ @@ -155,39 +193,4 @@ private VolatileCursorIterator createBatchIteratorFromBatch(Deque(jobs); } } - - /** - * The latest time that cutoffs are measured from is not wall clock time, - * but some other reference point that makes sense for the type of data - * being removed. This class groups the cutoff time with it's "latest" - * reference point. - */ - protected static final class CutoffDetails { - - public final long latestTimeMs; - public final long cutoffEpochMs; - - public CutoffDetails(long latestTimeMs, long cutoffEpochMs) { - this.latestTimeMs = latestTimeMs; - this.cutoffEpochMs = cutoffEpochMs; - } - - @Override - public int hashCode() { - return Objects.hash(latestTimeMs, cutoffEpochMs); - } - - @Override - public boolean equals(Object other) { - if (other == this) { - return true; - } - if (other instanceof CutoffDetails == false) { - return false; - } - CutoffDetails that = (CutoffDetails) other; - return this.latestTimeMs == that.latestTimeMs && - this.cutoffEpochMs == that.cutoffEpochMs; - } - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 0b963c216e0ae..3da00cfe81251 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -63,12 +63,10 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover */ private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000; - private final OriginSettingClient client; private final ThreadPool threadPool; - public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threadPool) { - super(client); - this.client = Objects.requireNonNull(client); + public ExpiredModelSnapshotsRemover(OriginSettingClient client, String jobIdExpression, ThreadPool threadPool) { + super(jobIdExpression, client); this.threadPool = Objects.requireNonNull(threadPool); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index 13a5823483c1e..aa04d3f56f643 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -67,13 +67,12 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { private static final Logger LOGGER = LogManager.getLogger(ExpiredResultsRemover.class); - private final OriginSettingClient client; private final AnomalyDetectionAuditor auditor; private final ThreadPool threadPool; - public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor, ThreadPool threadPool) { - super(client); - this.client = Objects.requireNonNull(client); + public ExpiredResultsRemover(OriginSettingClient client, String jobIdExpression, + AnomalyDetectionAuditor auditor, ThreadPool threadPool) { + super(jobIdExpression, client); this.auditor = Objects.requireNonNull(auditor); this.threadPool = Objects.requireNonNull(threadPool); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index 723902073916d..d0b2de120af33 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -26,7 +26,6 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.dataframe.StoredProgress; -import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator; import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator; import org.elasticsearch.xpack.ml.utils.persistence.DocIdBatchedDocumentIterator; @@ -110,10 +109,11 @@ private Set getAnomalyDetectionJobIds() { // and remove cluster service as a member all together. jobIds.addAll(MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet()); - BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName()); - while (jobsIterator.hasNext()) { - Deque jobs = jobsIterator.next(); - jobs.stream().map(Job.Builder::getId).forEach(jobIds::add); + DocIdBatchedDocumentIterator iterator = new DocIdBatchedDocumentIterator(client, AnomalyDetectorsIndex.configIndexName(), + QueryBuilders.termQuery(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE)); + while (iterator.hasNext()) { + Deque docIds = iterator.next(); + docIds.stream().map(Job::extractJobIdFromDocumentId).filter(Objects::nonNull).forEach(jobIds::add); } return jobIds; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java index dca079e3c556e..0c906cad82f14 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java @@ -6,10 +6,12 @@ package org.elasticsearch.xpack.ml.rest; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.ml.MachineLearning; import java.io.IOException; @@ -22,7 +24,8 @@ public class RestDeleteExpiredDataAction extends BaseRestHandler { @Override public List routes() { - return Collections.emptyList(); + return Collections.singletonList( + new Route(DELETE, MachineLearning.BASE_PATH + "_delete_expired_data/{" + Job.ID.getPreferredName() + "}")); } @Override @@ -41,9 +44,34 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - DeleteExpiredDataAction.Request request = restRequest.hasContent() ? - DeleteExpiredDataAction.Request.PARSER.apply(restRequest.contentParser(), null) : - new DeleteExpiredDataAction.Request(); + DeleteExpiredDataAction.Request request; + if (restRequest.hasContent()) { + request = DeleteExpiredDataAction.Request.PARSER.apply(restRequest.contentParser(), null); + } else { + request = new DeleteExpiredDataAction.Request(); + + String perSecondParam = restRequest.param(DeleteExpiredDataAction.Request.REQUESTS_PER_SECOND.getPreferredName()); + if (perSecondParam != null) { + try { + request.setRequestsPerSecond(Float.parseFloat(perSecondParam)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to parse float parameter [" + + DeleteExpiredDataAction.Request.REQUESTS_PER_SECOND.getPreferredName() + + "] with value [" + perSecondParam + "]", e); + } + } + + String timeoutParam = restRequest.param(DeleteExpiredDataAction.Request.TIMEOUT.getPreferredName()); + if (timeoutParam != null) { + request.setTimeout(restRequest.paramAsTime(timeoutParam, null)); + } + } + + String jobId = restRequest.param(Job.ID.getPreferredName()); + if (Strings.isNullOrEmpty(jobId) == false) { + request.setJobId(jobId); + } + return channel -> client.execute(DeleteExpiredDataAction.INSTANCE, request, new RestToXContentListener<>(channel)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index a2e573ac855c2..beb312ba1adc6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -51,8 +51,8 @@ private static class ConcreteExpiredJobDataRemover extends AbstractExpiredJobDat private int getRetentionDaysCallCount = 0; - ConcreteExpiredJobDataRemover(OriginSettingClient client) { - super(client); + ConcreteExpiredJobDataRemover(String jobId, OriginSettingClient client) { + super(jobId, client); } @Override @@ -101,17 +101,6 @@ static SearchResponse createSearchResponseFromHits(List hits) { return searchResponse; } - @SuppressWarnings("unchecked") - static void givenJobs(Client client, List jobs) throws IOException { - SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); - - doAnswer(invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse(response); - return null; - }).when(client).execute(eq(SearchAction.INSTANCE), any(), any()); - } - private static SearchResponse createSearchResponse(List toXContents, int totalHits) throws IOException { SearchHit[] hitsArray = new SearchHit[toXContents.size()]; for (int i = 0; i < toXContents.size(); i++) { @@ -131,7 +120,7 @@ public void testRemoveGivenNoJobs() throws IOException { mockSearchResponse(response); TestListener listener = new TestListener(); - ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient); + ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover("*", originSettingClient); remover.remove(1.0f,listener, () -> false); listener.waitToCompletion(); @@ -170,7 +159,7 @@ public void testRemoveGivenMultipleBatches() throws IOException { }).when(client).execute(eq(SearchAction.INSTANCE), any(), any()); TestListener listener = new TestListener(); - ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient); + ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover("*", originSettingClient); remover.remove(1.0f,listener, () -> false); listener.waitToCompletion(); @@ -194,7 +183,7 @@ public void testRemoveGivenTimeOut() throws IOException { mockSearchResponse(response); TestListener listener = new TestListener(); - ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient); + ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover("*", originSettingClient); remover.remove(1.0f,listener, () -> attemptsLeft.getAndDecrement() <= 0); listener.waitToCompletion(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 7a644a5fe6503..93f97b3c10421 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -256,7 +256,7 @@ private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { return null; } ).when(executor).execute(any()); - return new ExpiredModelSnapshotsRemover(originSettingClient, threadPool); + return new ExpiredModelSnapshotsRemover(originSettingClient, "*", threadPool); } private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index 92b9e85f661c3..8977c9c189dcd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -63,7 +63,7 @@ public void setUpTests() { public void testRemove_GivenNoJobs() throws IOException { givenDBQRequestsSucceed(); - AbstractExpiredJobDataRemoverTests.givenJobs(client, Collections.emptyList()); + givenJobs(client, Collections.emptyList()); createExpiredResultsRemover().remove(1.0f, listener, () -> false); @@ -73,7 +73,7 @@ public void testRemove_GivenNoJobs() throws IOException { public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { givenDBQRequestsSucceed(); - AbstractExpiredJobDataRemoverTests.givenJobs(client, + givenJobs(client, Arrays.asList( JobTests.buildJobBuilder("foo").build(), JobTests.buildJobBuilder("bar").build() @@ -153,6 +153,17 @@ public void testCalcCutoffEpochMs() { verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(latest.getTime(), expectedCutoffTime))); } + @SuppressWarnings("unchecked") + static void givenJobs(Client client, List jobs) throws IOException { + SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); + + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(response); + return null; + }).when(client).execute(eq(SearchAction.INSTANCE), any(), any()); + } + private void givenDBQRequestsSucceed() { givenDBQRequest(true); } @@ -208,6 +219,6 @@ private ExpiredResultsRemover createExpiredResultsRemover() { } ).when(executor).execute(any()); - return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class), threadPool); + return new ExpiredResultsRemover(originSettingClient, "*", mock(AnomalyDetectionAuditor.class), threadPool); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json index 4c55f853a742d..6b6a1ec03e80d 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json @@ -7,6 +7,18 @@ "stability":"stable", "url":{ "paths":[ + { + "path":"/_ml/_delete_expired_data/{job_id}", + "methods":[ + "DELETE" + ], + "parts":{ + "job_id":{ + "type":"string", + "description":"The ID of the job(s) to perform expired data hygiene for" + } + } + }, { "path":"/_ml/_delete_expired_data", "methods":[ @@ -15,6 +27,18 @@ } ] }, + "params":{ + "requests_per_second":{ + "type":"number", + "required":false, + "description":"The desired requests per second for the deletion processes." + }, + "timeout":{ + "type":"time", + "required":false, + "description":"How long can the underlying delete processes run until they are canceled" + } + }, "body":{ "description":"deleting expired data parameters" } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml index 319c564b26699..ddba4476822ba 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml @@ -3,12 +3,11 @@ setup: features: headers - do: headers: - Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" ml.put_job: - job_id: delete-expired-data + job_id: delete-expired-data-a body: > { - "job_id": "delete-expired-data", "description":"Analysis of response time by airline", "analysis_config" : { "bucket_span" : "1h", @@ -18,7 +17,30 @@ setup: "field_delimiter":",", "time_field":"time", "time_format":"yyyy-MM-dd HH:mm:ssX" - } + }, + "results_retention_days" : 1, + "model_snapshot_retention_days" : 1 + } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" + ml.put_job: + job_id: delete-expired-data-b + body: > + { + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span" : "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "field_delimiter":",", + "time_field":"time", + "time_format":"yyyy-MM-dd HH:mm:ssX" + }, + "results_retention_days" : 1, + "model_snapshot_retention_days" : 1 } --- @@ -34,3 +56,103 @@ setup: body: > { "timeout": "10h", "requests_per_second": 100000.0 } - match: { deleted: true} +--- +"Test delete expired data with path parameters": + - do: + ml.delete_expired_data: + timeout: "10h" + requests_per_second: 100000.0 + - match: { deleted: true} + +--- +"Test delete expired data with job id": + - do: + headers: + Content-Type: application/json + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" + index: + index: .ml-anomalies-shared + id: "delete-expired-data-a_model_snapshot_inactive-snapshot" + body: > + { + "job_id": "delete-expired-data-a", + "timestamp": "2020-05-01T00:00:00Z", + "snapshot_id": "inactive-snapshot", + "description": "first", + "latest_record_time_stamp": "2020-05-01T00:00:00Z", + "latest_result_time_stamp": "2020-05-01T00:00:00Z", + "snapshot_doc_count": 1 + } + + - do: + headers: + Content-Type: application/json + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" + index: + index: .ml-anomalies-shared + id: "delete-expired-data-a_model_snapshot_active-snapshot" + body: > + { + "job_id": "delete-expired-data-a", + "timestamp": "2020-05-10T00:00:00Z", + "snapshot_id": "active-snapshot", + "description": "second", + "latest_record_time_stamp": "2020-05-10T00:00:00Z", + "latest_result_time_stamp": "2020-05-10T00:00:00Z", + "snapshot_doc_count": 1, + "model_size_stats": { + "job_id" : "delete-expired-data-a", + "result_type" : "model_size_stats", + "model_bytes" : 0 + }, + "quantiles": { + "job_id": "delete-expired-data-a", + "timestamp": 1, + "quantile_state": "quantiles-1" + } + } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" + indices.refresh: + index: [.ml-anomalies-shared] + + - do: + ml.get_model_snapshots: + job_id: delete-expired-data-a + - match: { count: 2 } + +# make the above document the current snapshot + - do: + ml.revert_model_snapshot: + job_id: delete-expired-data-a + snapshot_id: active-snapshot + + - do: + ml.get_model_snapshots: + job_id: delete-expired-data-a + - match: { count: 2 } + + - do: + ml.delete_expired_data: + job_id: delete-expired-data-b + + - do: + ml.get_model_snapshots: + job_id: delete-expired-data-a + - match: { count: 2 } + + - do: + ml.delete_expired_data: + job_id: delete-expired-data-a + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" + indices.refresh: {} + + - do: + ml.get_model_snapshots: + job_id: delete-expired-data-a + - match: { count: 1 }