Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML][Transforms] fixing stop on changes check bug #46162

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
if (r) {
nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
} else {
finishAndSetState();
onFinish(ActionListener.wrap(
onFinishResponse -> doSaveState(finishAndSetState(), position.get(), () -> {}),
onFinishFailure -> doSaveState(finishAndSetState(), position.get(), () -> {})));
}
},
this::finishWithFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
Expand All @@ -27,7 +26,6 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.oneOf;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45610")
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {

private static final String TEST_USER_NAME = "df_user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,8 @@ static class ClientDataFrameIndexer extends DataFrameIndexer {
private final DataFrameTransformTask transformTask;
private final AtomicInteger failureCount;
private volatile boolean auditBulkFailures = true;
// Indicates that the source has changed for the current run
private volatile boolean hasSourceChanged = true;
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
private volatile String lastAuditedExceptionMessage = null;
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);
Expand Down Expand Up @@ -760,18 +762,26 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
if (transformTask.currentCheckpoint.get() > 0 && initialRun()) {
sourceHasChanged(ActionListener.wrap(
hasChanged -> {
hasSourceChanged = hasChanged;
if (hasChanged) {
transformTask.changesLastDetectedAt = Instant.now();
logger.debug("[{}] source has changed, triggering new indexer run.", transformId);
changedSourceListener.onResponse(null);
} else {
logger.trace("[{}] source has not changed, finish indexer early.", transformId);
// No changes, stop executing
listener.onResponse(false);
}
},
listener::onFailure
failure -> {
// If we failed determining if the source changed, it's safer to assume there were changes.
// We should allow the failure path to complete as normal
hasSourceChanged = true;
listener.onFailure(failure);
}
));
} else {
hasSourceChanged = true;
changedSourceListener.onResponse(null);
}
}
Expand Down Expand Up @@ -869,6 +879,13 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p
next.run();
return;
}
// This means that the indexer was triggered to discover changes, found none, and exited early.
// If the state is `STOPPED` this means that DataFrameTransformTask#stop was called while we were checking for changes.
// Allow the stop call path to continue
if (hasSourceChanged == false && indexerState.equals(IndexerState.STOPPED) == false) {
next.run();
return;
}
// If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING`
// OR we called `doSaveState` manually as the indexer was not actively running.
// Since we save the state to an index, we should make sure that our task state is in parity with the indexer state
Expand Down Expand Up @@ -959,6 +976,11 @@ protected void onFailure(Exception exc) {
@Override
protected void onFinish(ActionListener<Void> listener) {
try {
// This indicates an early exit since no changes were found.
// So, don't treat this like a checkpoint being completed, as no work was done.
if (hasSourceChanged == false) {
listener.onResponse(null);
}
// TODO: needs cleanup super is called with a listener, but listener.onResponse is called below
// super.onFinish() fortunately ignores the listener
super.onFinish(listener);
Expand Down