Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Search After job iterators #57875

Merged
merged 7 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.SearchAfterJobsIterator;
import org.elasticsearch.xpack.ml.job.retention.EmptyStateIndexRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredForecastsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
Expand All @@ -30,6 +34,7 @@
import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
import org.elasticsearch.xpack.ml.utils.persistence.WrappedBatchedJobsIterator;

import java.time.Clock;
import java.time.Duration;
Expand All @@ -38,9 +43,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class TransportDeleteExpiredDataAction extends HandledTransportAction<DeleteExpiredDataAction.Request,
DeleteExpiredDataAction.Response> {
DeleteExpiredDataAction.Response> {

private static final Logger logger = LogManager.getLogger(TransportDeleteExpiredDataAction.class);

Expand All @@ -51,22 +57,26 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
private final OriginSettingClient client;
private final ClusterService clusterService;
private final Clock clock;
private final JobConfigProvider jobConfigProvider;

@Inject
public TransportDeleteExpiredDataAction(ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService) {
ActionFilters actionFilters, Client client, ClusterService clusterService,
JobConfigProvider jobConfigProvider) {
this(threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, transportService, actionFilters, client, clusterService,
Clock.systemUTC());
jobConfigProvider, Clock.systemUTC());
}

TransportDeleteExpiredDataAction(ThreadPool threadPool, String executor, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService, Clock clock) {
ActionFilters actionFilters, Client client, ClusterService clusterService,
JobConfigProvider jobConfigProvider, Clock clock) {
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
this.threadPool = threadPool;
this.executor = executor;
this.client = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
this.clusterService = clusterService;
this.clock = clock;
this.jobConfigProvider = jobConfigProvider;
}

@Override
Expand All @@ -78,22 +88,34 @@ protected void doExecute(Task task, DeleteExpiredDataAction.Request request,
);

Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(
() -> deleteExpiredData(request, listener, isTimedOutSupplier)
);
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());

if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(new String[]{request.getJobId()})) {
List<MlDataRemover> dataRemovers = createDataRemovers(client, auditor);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(
() -> deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier)
);
} else {
jobConfigProvider.expandJobs(request.getJobId(), false, true, ActionListener.wrap(
jobBuilders -> {
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
List<Job> jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
List<MlDataRemover> dataRemovers = createDataRemovers(jobs, auditor);
deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier);
}
);
},
listener::onFailure
));
}
}

private void deleteExpiredData(DeleteExpiredDataAction.Request request,
List<MlDataRemover> dataRemovers,
ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, request.getJobId(), auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, request.getJobId(), threadPool),
new UnusedStateRemover(client, clusterService),
new EmptyStateIndexRemover(client)
);


Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
// If there is no throttle provided, default to none
float requestsPerSec = request.getRequestsPerSecond() == null ? Float.POSITIVE_INFINITY : request.getRequestsPerSecond();
Expand All @@ -103,7 +125,7 @@ private void deleteExpiredData(DeleteExpiredDataAction.Request request,
// 1 million documents over 5000 seconds ~= 83 minutes.
// If we have > 5 data nodes, we don't set our throttling.
requestsPerSec = numberOfDatanodes < 5 ?
(float)(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5) * numberOfDatanodes :
(float) (AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5) * numberOfDatanodes :
Float.POSITIVE_INFINITY;
}
deleteExpiredData(dataRemoversIterator, requestsPerSec, listener, isTimedOutSupplier, true);
Expand All @@ -117,15 +139,15 @@ void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
if (haveAllPreviousDeletionsCompleted && mlDataRemoversIterator.hasNext()) {
MlDataRemover remover = mlDataRemoversIterator.next();
ActionListener<Boolean> nextListener = ActionListener.wrap(
booleanResponse ->
deleteExpiredData(
mlDataRemoversIterator,
requestsPerSecond,
listener,
isTimedOutSupplier,
booleanResponse
),
listener::onFailure);
booleanResponse ->
deleteExpiredData(
mlDataRemoversIterator,
requestsPerSecond,
listener,
isTimedOutSupplier,
booleanResponse
),
listener::onFailure);
// Removing expired ML data and artifacts requires multiple operations.
// These are queued up and executed sequentially in the action listener,
// the chained calls must all run the ML utility thread pool NOT the thread
Expand All @@ -142,4 +164,23 @@ void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
listener.onResponse(new DeleteExpiredDataAction.Response(haveAllPreviousDeletionsCompleted));
}
}

private List<MlDataRemover> createDataRemovers(OriginSettingClient client, AnomalyDetectionAuditor auditor) {
return Arrays.asList(
new ExpiredResultsRemover(client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool),
new UnusedStateRemover(client, clusterService),
new EmptyStateIndexRemover(client));
}

private List<MlDataRemover> createDataRemovers(List<Job> jobs, AnomalyDetectionAuditor auditor) {
return Arrays.asList(
new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool),
new UnusedStateRemover(client, clusterService),
new EmptyStateIndexRemover(client));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,64 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.utils.persistence.SearchAfterDocumentsIterator;

import java.io.IOException;
import java.io.InputStream;

public class BatchedJobsIterator extends BatchedDocumentsIterator<Job.Builder> {
public class SearchAfterJobsIterator extends SearchAfterDocumentsIterator<Job.Builder> {

private final String jobIdExpression;
private String lastJobId;

public BatchedJobsIterator(OriginSettingClient client, String index, String jobIdExpression) {
super(client, index);
this.jobIdExpression = jobIdExpression;
public SearchAfterJobsIterator(OriginSettingClient client) {
super(client, AnomalyDetectorsIndex.configIndexName());
}

@Override
protected QueryBuilder getQuery() {
String [] tokens = Strings.tokenizeToStringArray(jobIdExpression, ",");
return JobConfigProvider.buildJobWildcardQuery(tokens, true);
return new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
}

@Override
protected FieldSortBuilder sortField() {
return new FieldSortBuilder(Job.ID.getPreferredName());
}

@Override
protected Object[] searchAfterFields() {
if (lastJobId == null) {
return null;
} else {
return new Object[] {lastJobId};
}
}

@Override
protected void extractSearchAfterFields(SearchHit lastSearchHit) {
lastJobId = Job.extractJobIdFromDocumentId(lastSearchHit.getId());
}

@Override
protected Job.Builder map(SearchHit hit) {
try (InputStream stream = hit.getSourceRef().streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
return Job.LENIENT_PARSER.apply(parser, null);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse job document [" + hit.getId() + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,11 @@
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Removes job data that expired with respect to their retention period.
Expand All @@ -31,22 +25,22 @@
*/
abstract class AbstractExpiredJobDataRemover implements MlDataRemover {

private final String jobIdExpression;
protected final OriginSettingClient client;
private final Iterator<Job> jobIterator;

AbstractExpiredJobDataRemover(String jobIdExpression, OriginSettingClient client) {
this.jobIdExpression = jobIdExpression;
AbstractExpiredJobDataRemover(OriginSettingClient client, Iterator<Job> jobIterator) {
this.client = client;
this.jobIterator = jobIterator;
}

@Override
public void remove(float requestsPerSecond,
ActionListener<Boolean> listener,
Supplier<Boolean> isTimedOutSupplier) {
removeData(newJobIterator(), requestsPerSecond, listener, isTimedOutSupplier);
removeData(jobIterator, requestsPerSecond, listener, isTimedOutSupplier);
}

private void removeData(WrappedBatchedJobsIterator jobIterator,
private void removeData(Iterator<Job> jobIterator,
float requestsPerSecond,
ActionListener<Boolean> listener,
Supplier<Boolean> isTimedOutSupplier) {
Expand Down Expand Up @@ -86,11 +80,6 @@ private void removeData(WrappedBatchedJobsIterator jobIterator,
));
}

private WrappedBatchedJobsIterator newJobIterator() {
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName(), jobIdExpression);
return new WrappedBatchedJobsIterator(jobsIterator);
}

abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<CutoffDetails> listener);

abstract Long getRetentionDays(Job job);
Expand Down Expand Up @@ -147,50 +136,4 @@ public boolean equals(Object other) {
this.cutoffEpochMs == that.cutoffEpochMs;
}
}

/**
* A wrapper around {@link BatchedJobsIterator} that allows iterating jobs one
* at a time from the batches returned by {@code BatchedJobsIterator}
*
* This class abstracts away the logic of pulling one job at a time from
* multiple batches.
*/
private static class WrappedBatchedJobsIterator implements Iterator<Job> {
private final BatchedJobsIterator batchedIterator;
private VolatileCursorIterator<Job> currentBatch;

WrappedBatchedJobsIterator(BatchedJobsIterator batchedIterator) {
this.batchedIterator = batchedIterator;
}

@Override
public boolean hasNext() {
return (currentBatch != null && currentBatch.hasNext()) || batchedIterator.hasNext();
}

/**
* Before BatchedJobsIterator has run a search it reports hasNext == true
* but the first search may return no results. In that case null is return
* and clients have to handle null.
*/
@Override
public Job next() {
if (currentBatch != null && currentBatch.hasNext()) {
return currentBatch.next();
}

// currentBatch is either null or all its elements have been iterated.
// get the next currentBatch
currentBatch = createBatchIteratorFromBatch(batchedIterator.next());

// BatchedJobsIterator.hasNext maybe true if searching the first time
// but no results are returned.
return currentBatch.hasNext() ? currentBatch.next() : null;
}

private VolatileCursorIterator<Job> createBatchIteratorFromBatch(Deque<Job.Builder> builders) {
List<Job> jobs = builders.stream().map(Job.Builder::build).collect(Collectors.toList());
return new VolatileCursorIterator<>(jobs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover

private final ThreadPool threadPool;

public ExpiredModelSnapshotsRemover(OriginSettingClient client, String jobIdExpression, ThreadPool threadPool) {
super(jobIdExpression, client);
public ExpiredModelSnapshotsRemover(OriginSettingClient client, Iterator<Job> jobIterator, ThreadPool threadPool) {
super(client, jobIterator);
this.threadPool = Objects.requireNonNull(threadPool);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

Expand All @@ -70,9 +71,9 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
private final AnomalyDetectionAuditor auditor;
private final ThreadPool threadPool;

public ExpiredResultsRemover(OriginSettingClient client, String jobIdExpression,
public ExpiredResultsRemover(OriginSettingClient client, Iterator<Job> jobIterator,
AnomalyDetectionAuditor auditor, ThreadPool threadPool) {
super(jobIdExpression, client);
super(client, jobIterator);
this.auditor = Objects.requireNonNull(auditor);
this.threadPool = Objects.requireNonNull(threadPool);
}
Expand Down
Loading