Skip to content

Commit

Permalink
[ML] Check dest index is empty when starting DF analytics (#45094)
Browse files Browse the repository at this point in the history
If one tries to start a DF analytics job that has already run,
the result will be that the task will fail after reindexing the
dest index from the source index. The results of the prior run
will be gone and the task state is not properly set to failed
with the failure reason.

This commit improves the behavior in this scenario. First, we
set the task state to `failed` in a set of failures that were
missed. Second, a validation is added that if the destination
index exists, it must be empty.
  • Loading branch information
dimitris-athanasiou authored Aug 1, 2019
1 parent 7c43894 commit 530ccc0
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 22 deletions.
1 change: 1 addition & 0 deletions x-pack/plugin/ml/qa/ml-with-security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ integTest.runner {
'ml/start_data_frame_analytics/Test start given missing source index',
'ml/start_data_frame_analytics/Test start given source index has no compatible fields',
'ml/start_data_frame_analytics/Test start with inconsistent body/param ids',
'ml/start_data_frame_analytics/Test start given dest index is not empty',
'ml/start_stop_datafeed/Test start datafeed job, but not open',
'ml/start_stop_datafeed/Test start non existing datafeed',
'ml/start_stop_datafeed/Test stop non existing datafeed',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -31,6 +34,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.AllocatedPersistentTask;
Expand All @@ -44,6 +48,7 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
Expand Down Expand Up @@ -178,26 +183,57 @@ public void onFailure(Exception e) {
}

private void getConfigAndValidate(String id, ActionListener<DataFrameAnalyticsConfig> finalListener) {
// Validate mappings can be merged
ActionListener<DataFrameAnalyticsConfig> firstValidationListener = ActionListener.wrap(
// Step 4. Validate mappings can be merged
ActionListener<DataFrameAnalyticsConfig> toValidateMappingsListener = ActionListener.wrap(
config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap(
mappings -> finalListener.onResponse(config), finalListener::onFailure)),
finalListener::onFailure
);

// Validate source and dest; check data extraction is possible
// Step 3. Validate dest index is empty
ActionListener<DataFrameAnalyticsConfig> toValidateDestEmptyListener = ActionListener.wrap(
config -> checkDestIndexIsEmptyIfExists(config, toValidateMappingsListener),
finalListener::onFailure
);

// Step 2. Validate source and dest; check data extraction is possible
ActionListener<DataFrameAnalyticsConfig> getConfigListener = ActionListener.wrap(
config -> {
new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config);
DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, firstValidationListener);
DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, toValidateDestEmptyListener);
},
finalListener::onFailure
);

// First, get the config
// Step 1. Get the config
configProvider.get(id, getConfigListener);
}

private void checkDestIndexIsEmptyIfExists(DataFrameAnalyticsConfig config, ActionListener<DataFrameAnalyticsConfig> listener) {
String destIndex = config.getDest().getIndex();
SearchRequest destEmptySearch = new SearchRequest(destIndex);
destEmptySearch.source().size(0);
destEmptySearch.allowPartialSearchResults(false);
ClientHelper.executeWithHeadersAsync(config.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE,
destEmptySearch, ActionListener.wrap(
searchResponse -> {
if (searchResponse.getHits().getTotalHits().value > 0) {
listener.onFailure(ExceptionsHelper.badRequestException("dest index [{}] must be empty", destIndex));
} else {
listener.onResponse(config);
}
},
e -> {
if (e instanceof IndexNotFoundException) {
listener.onResponse(config);
} else {
listener.onFailure(e);
}
}
)
);
}

private void waitForAnalyticsStarted(PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> task,
TimeValue timeout, ActionListener<AcknowledgedResponse> listener) {
AnalyticsPredicate predicate = new AnalyticsPredicate();
Expand Down Expand Up @@ -373,6 +409,15 @@ private void cancelReindexingTask(String reason, TimeValue timeout) {
LOGGER.debug("[{}] Reindex task was successfully cancelled", taskParams.getId());
}
}

public void updateState(DataFrameAnalyticsState state, @Nullable String reason) {
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason);
updatePersistentTaskState(newTaskState, ActionListener.wrap(
updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state),
e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]",
getParams().getId(), state), e)
));
}
}

static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, String... indexNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public DataFrameAnalyticsManager(NodeClient client, DataFrameAnalyticsConfigProv
public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState currentState) {
ActionListener<DataFrameAnalyticsConfig> reindexingStateListener = ActionListener.wrap(
config -> reindexDataframeAndStartAnalysis(task, config),
task::markAsFailed
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
);

// With config in hand, determine action to take
Expand Down Expand Up @@ -129,7 +129,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
task.setReindexingTaskId(null);
startAnalytics(task, config, false);
},
task::markAsFailed
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
);

// Refresh to ensure copied index is fully searchable
Expand All @@ -140,7 +140,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
RefreshAction.INSTANCE,
new RefreshRequest(config.getDest().getIndex()),
refreshListener),
task::markAsFailed
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
);

// Reindex
Expand Down Expand Up @@ -196,15 +196,15 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi
updatedTask -> processManager.runJob(task, config, dataExtractorFactory,
error -> {
if (error != null) {
task.markAsFailed(error);
task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
} else {
task.markAsCompleted();
}
}),
task::markAsFailed
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
));
},
task::markAsFailed
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
);

// TODO This could fail with errors. In that case we get stuck with the copied index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.client.Client;
Expand All @@ -17,7 +16,6 @@
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask;
Expand Down Expand Up @@ -114,7 +112,7 @@ private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig c
finishHandler.accept(null);
} else {
LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
updateTaskState(task, DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
}
}
}
Expand Down Expand Up @@ -176,14 +174,6 @@ private Consumer<String> onProcessCrash(DataFrameAnalyticsTask task) {
};
}

private void updateTaskState(DataFrameAnalyticsTask task, DataFrameAnalyticsState state, @Nullable String reason) {
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, task.getAllocationId(), reason);
task.updatePersistentTaskState(newTaskState, ActionListener.wrap(
updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", task.getParams().getId(), state),
e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]", task.getParams().getId(), state), e)
));
}

@Nullable
public Integer getProgressPercent(long allocationId) {
ProcessContext processContext = processContextByAllocation.get(allocationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,43 @@
{
"id": "body_id"
}
---
"Test start given dest index is not empty":

- do:
index:
index: non-empty-source
refresh: ""
body: >
{
"numeric": 42.0
}
- do:
index:
index: non-empty-dest
refresh: ""
body: >
{
"numeric": 42.0
}
- do:
ml.put_data_frame_analytics:
id: "start_given_empty_dest_index"
body: >
{
"source": {
"index": "non-empty-source"
},
"dest": {
"index": "non-empty-dest"
},
"analysis": {"outlier_detection":{}}
}
- do:
catch: /dest index \[non-empty-dest\] must be empty/
ml.start_data_frame_analytics:
id: "start_given_empty_dest_index"

0 comments on commit 530ccc0

Please sign in to comment.