Skip to content

Commit

Permalink
Add wildcard query to BatchedJobsIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Jun 4, 2020
1 parent da7ae6d commit ecda5c1
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.retention.EmptyStateIndexRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredForecastsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
Expand All @@ -40,7 +38,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class TransportDeleteExpiredDataAction extends HandledTransportAction<DeleteExpiredDataAction.Request,
DeleteExpiredDataAction.Response> {
Expand All @@ -54,25 +51,21 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
private final OriginSettingClient client;
private final ClusterService clusterService;
private final Clock clock;
private final JobConfigProvider jobConfigProvider;

@Inject
public TransportDeleteExpiredDataAction(ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService,
JobConfigProvider jobConfigProvider) {
ActionFilters actionFilters, Client client, ClusterService clusterService) {
this(threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, transportService, actionFilters, client, clusterService,
jobConfigProvider, Clock.systemUTC());
Clock.systemUTC());
}

TransportDeleteExpiredDataAction(ThreadPool threadPool, String executor, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService,
JobConfigProvider jobConfigProvider, Clock clock) {
ActionFilters actionFilters, Client client, ClusterService clusterService, Clock clock) {
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
this.threadPool = threadPool;
this.executor = executor;
this.client = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
this.clusterService = clusterService;
this.jobConfigProvider = jobConfigProvider;
this.clock = clock;
}

Expand All @@ -84,31 +77,20 @@ protected void doExecute(Task task, DeleteExpiredDataAction.Request request,
request.getTimeout() == null ? DEFAULT_MAX_DURATION : Duration.ofMillis(request.getTimeout().millis())
);

jobConfigProvider.expandJobs(request.getJobId(), true, true, ActionListener.wrap(
jobBuilders -> {
Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(
() -> {
List<Job> jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
deleteExpiredData(request, jobs, listener, isTimedOutSupplier);
}
);
},
listener::onFailure
));


Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(
() -> deleteExpiredData(request, listener, isTimedOutSupplier)
);
}

private void deleteExpiredData(DeleteExpiredDataAction.Request request,
List<Job> jobs,
ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, jobs, auditor, threadPool),
new ExpiredResultsRemover(client, request.getJobId(), auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, jobs, threadPool),
new ExpiredModelSnapshotsRemover(client, request.getJobId(), threadPool),
new UnusedStateRemover(client, clusterService),
new EmptyStateIndexRemover(client)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
Expand All @@ -23,13 +23,17 @@

public class BatchedJobsIterator extends BatchedDocumentsIterator<Job.Builder> {

public BatchedJobsIterator(OriginSettingClient client, String index) {
private final String jobIdExpression;

public BatchedJobsIterator(OriginSettingClient client, String index, String jobIdExpression) {
super(client, index);
this.jobIdExpression = jobIdExpression;
}

@Override
protected QueryBuilder getQuery() {
return new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
String [] tokens = Strings.tokenizeToStringArray(jobIdExpression, ",");
return JobConfigProvider.buildJobWildcardQuery(tokens, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ public void expandJobsIds(String expression,
boolean allowMissingConfigs,
ActionListener<SortedSet<String>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildJobWildcardQuery(tokens, excludeDeleting));
sourceBuilder.sort(Job.ID.getPreferredName());
sourceBuilder.fetchSource(false);
sourceBuilder.docValueField(Job.ID.getPreferredName(), null);
Expand Down Expand Up @@ -573,7 +573,7 @@ public void expandJobsIds(String expression,
*/
public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<List<Job.Builder>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildJobWildcardQuery(tokens, excludeDeleting));
sourceBuilder.sort(Job.ID.getPreferredName());

SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
Expand Down Expand Up @@ -767,7 +767,7 @@ private Job.Builder parseJobLenientlyFromSource(BytesReference source) throws IO
}
}

private QueryBuilder buildQuery(String [] tokens, boolean excludeDeleting) {
public static QueryBuilder buildJobWildcardQuery(String [] tokens, boolean excludeDeleting) {
QueryBuilder jobQuery = new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
if (Strings.isAllOrWildcard(tokens) && excludeDeleting == false) {
// match all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@
package org.elasticsearch.xpack.ml.job.retention;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Removes job data that expired with respect to their retention period.
Expand All @@ -26,17 +31,24 @@
*/
abstract class AbstractExpiredJobDataRemover implements MlDataRemover {

private final List<Job> jobs;
private final String jobIdExpression;
protected final OriginSettingClient client;

AbstractExpiredJobDataRemover(List<Job> jobs) {
this.jobs = jobs;
AbstractExpiredJobDataRemover(String jobIdExpression, OriginSettingClient client) {
this.jobIdExpression = jobIdExpression;
this.client = client;
}

private WrappedBatchedJobsIterator newJobIterator() {
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, jobIdExpression, AnomalyDetectorsIndex.configIndexName());
return new WrappedBatchedJobsIterator(jobsIterator);
}

@Override
public void remove(float requestsPerSecond,
ActionListener<Boolean> listener,
Supplier<Boolean> isTimedOutSupplier) {
removeData(new VolatileCursorIterator<>(jobs), requestsPerSecond, listener, isTimedOutSupplier);
removeData(newJobIterator(), requestsPerSecond, listener, isTimedOutSupplier);
}

private void removeData(Iterator<Job> jobIterator,
Expand Down Expand Up @@ -135,4 +147,50 @@ public boolean equals(Object other) {
this.cutoffEpochMs == that.cutoffEpochMs;
}
}

/**
* A wrapper around {@link BatchedJobsIterator} that allows iterating jobs one
* at a time from the batches returned by {@code BatchedJobsIterator}
*
* This class abstracts away the logic of pulling one job at a time from
* multiple batches.
*/
private static class WrappedBatchedJobsIterator implements Iterator<Job> {
private final BatchedJobsIterator batchedIterator;
private VolatileCursorIterator<Job> currentBatch;

WrappedBatchedJobsIterator(BatchedJobsIterator batchedIterator) {
this.batchedIterator = batchedIterator;
}

@Override
public boolean hasNext() {
return (currentBatch != null && currentBatch.hasNext()) || batchedIterator.hasNext();
}

/**
* Before BatchedJobsIterator has run a search it reports hasNext == true
* but the first search may return no results. In that case null is return
* and clients have to handle null.
*/
@Override
public Job next() {
if (currentBatch != null && currentBatch.hasNext()) {
return currentBatch.next();
}

// currentBatch is either null or all its elements have been iterated.
// get the next currentBatch
currentBatch = createBatchIteratorFromBatch(batchedIterator.next());

// BatchedJobsIterator.hasNext maybe true if searching the first time
// but no results are returned.
return currentBatch.hasNext() ? currentBatch.next() : null;
}

private VolatileCursorIterator<Job> createBatchIteratorFromBatch(Deque<Job.Builder> builders) {
List<Job> jobs = builders.stream().map(Job.Builder::build).collect(Collectors.toList());
return new VolatileCursorIterator<>(jobs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
*/
private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000;

private final OriginSettingClient client;
private final ThreadPool threadPool;

public ExpiredModelSnapshotsRemover(OriginSettingClient client, List<Job> jobs, ThreadPool threadPool) {
super(jobs);
this.client = Objects.requireNonNull(client);
public ExpiredModelSnapshotsRemover(OriginSettingClient client, String jobIdExpression, ThreadPool threadPool) {
super(jobIdExpression, client);
this.threadPool = Objects.requireNonNull(threadPool);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

Expand All @@ -68,14 +67,12 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {

private static final Logger LOGGER = LogManager.getLogger(ExpiredResultsRemover.class);

private final OriginSettingClient client;
private final AnomalyDetectionAuditor auditor;
private final ThreadPool threadPool;

public ExpiredResultsRemover(OriginSettingClient client, List<Job> jobs,
public ExpiredResultsRemover(OriginSettingClient client, String jobIdExpression,
AnomalyDetectionAuditor auditor, ThreadPool threadPool) {
super(jobs);
this.client = Objects.requireNonNull(client);
super(jobIdExpression, client);
this.auditor = Objects.requireNonNull(auditor);
this.threadPool = Objects.requireNonNull(threadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.xpack.core.ml.MlMetadata;
Expand All @@ -27,7 +26,6 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;
import org.elasticsearch.xpack.ml.utils.persistence.DocIdBatchedDocumentIterator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -55,9 +54,8 @@ public void setup() {
TransportService transportService = mock(TransportService.class);
Client client = mock(Client.class);
ClusterService clusterService = mock(ClusterService.class);
JobConfigProvider configProvider = mock(JobConfigProvider.class);
transportDeleteExpiredDataAction = new TransportDeleteExpiredDataAction(threadPool, ThreadPool.Names.SAME, transportService,
new ActionFilters(Collections.emptySet()), client, clusterService, configProvider, Clock.systemUTC());
new ActionFilters(Collections.emptySet()), client, clusterService, Clock.systemUTC());
}

@After
Expand Down
Loading

0 comments on commit ecda5c1

Please sign in to comment.