Skip to content

Commit

Permalink
[ML][Transforms] fixing stop on changes check bug (elastic#46162)
Browse files Browse the repository at this point in the history
* [ML][Transforms] fixing stop on changes check bug

* Adding new method finishAndCheckState to cover race conditions in early terminations

* changing stopping conditions in `onStart`

* allow indexer to finish when exiting early
  • Loading branch information
benwtrent committed Sep 3, 2019
1 parent 11d8b40 commit b112518
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
if (r) {
nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
} else {
finishAndSetState();
onFinish(ActionListener.wrap(
onFinishResponse -> doSaveState(finishAndSetState(), position.get(), () -> {}),
onFinishFailure -> doSaveState(finishAndSetState(), position.get(), () -> {})));
}
},
this::finishWithFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
Expand All @@ -27,7 +26,6 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.oneOf;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45610")
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {

private static final String TEST_USER_NAME = "df_user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,8 @@ static class ClientDataFrameIndexer extends DataFrameIndexer {
private final DataFrameTransformTask transformTask;
private final AtomicInteger failureCount;
private volatile boolean auditBulkFailures = true;
// Indicates that the source has changed for the current run
private volatile boolean hasSourceChanged = true;
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
private volatile String lastAuditedExceptionMessage = null;
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);
Expand Down Expand Up @@ -760,18 +762,26 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
if (transformTask.currentCheckpoint.get() > 0 && initialRun()) {
sourceHasChanged(ActionListener.wrap(
hasChanged -> {
hasSourceChanged = hasChanged;
if (hasChanged) {
transformTask.changesLastDetectedAt = Instant.now();
logger.debug("[{}] source has changed, triggering new indexer run.", transformId);
changedSourceListener.onResponse(null);
} else {
logger.trace("[{}] source has not changed, finish indexer early.", transformId);
// No changes, stop executing
listener.onResponse(false);
}
},
listener::onFailure
failure -> {
// If we failed determining if the source changed, it's safer to assume there were changes.
// We should allow the failure path to complete as normal
hasSourceChanged = true;
listener.onFailure(failure);
}
));
} else {
hasSourceChanged = true;
changedSourceListener.onResponse(null);
}
}
Expand Down Expand Up @@ -869,6 +879,13 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p
next.run();
return;
}
// This means that the indexer was triggered to discover changes, found none, and exited early.
// If the state is `STOPPED` this means that DataFrameTransformTask#stop was called while we were checking for changes.
// Allow the stop call path to continue
if (hasSourceChanged == false && indexerState.equals(IndexerState.STOPPED) == false) {
next.run();
return;
}
// If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING`
// OR we called `doSaveState` manually as the indexer was not actively running.
// Since we save the state to an index, we should make sure that our task state is in parity with the indexer state
Expand Down Expand Up @@ -959,6 +976,11 @@ protected void onFailure(Exception exc) {
@Override
protected void onFinish(ActionListener<Void> listener) {
try {
// This indicates an early exit since no changes were found.
// So, don't treat this like a checkpoint being completed, as no work was done.
if (hasSourceChanged == false) {
listener.onResponse(null);
}
// TODO: needs cleanup super is called with a listener, but listener.onResponse is called below
// super.onFinish() fortunately ignores the listener
super.onFinish(listener);
Expand Down

0 comments on commit b112518

Please sign in to comment.