From e3f6b83c8b96f624b83565d11a443e6f5532f5f3 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 29 Aug 2019 16:58:15 -0500 Subject: [PATCH 1/4] [ML][Transforms] fixing stop on changes check bug --- .../xpack/core/indexing/AsyncTwoPhaseIndexer.java | 8 +++++++- .../dataframe/integration/DataFrameGetAndGetStatsIT.java | 2 -- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index ac5bbb144940c..4a7c167820dde 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -158,7 +158,13 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { if (r) { nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); } else { - finishAndSetState(); + // If our state is INDEXING then transition to STARTED + // Calling checkState here covers the scenarios when + // the indexer was aborted or set to STOPPING between `onStart` being called + // and this branch being executed + if (checkState(getState())) { + finishAndSetState(); + } } }, this::finishWithFailure)); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java index 76bb90d300adb..3a7809125c7d9 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.xpack.core.dataframe.DataFrameField; @@ -27,7 +26,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.oneOf; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45610") public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { private static final String TEST_USER_NAME = "df_user"; From b31b5e0d766aeda596b31341e0214a3373ae43a7 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Fri, 30 Aug 2019 08:14:05 -0500 Subject: [PATCH 2/4] Adding new method finishAndCheckState to cover race conditions in early terminations --- .../core/indexing/AsyncTwoPhaseIndexer.java | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 4a7c167820dde..9409376e4e011 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -159,12 +159,9 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); } else { // If our state is INDEXING then transition to STARTED - // Calling checkState here covers the scenarios when - // the indexer was aborted or set to STOPPING between `onStart` being called - // and this branch being executed - if (checkState(getState())) { - finishAndSetState(); - } + // Calling finishAndCheckState here covers the scenarios when + // The indexer was aborted or stopped between the transition to INDEXING and now + finishAndCheckState(); } }, this::finishWithFailure)); @@ -464,4 +461,39 @@ private boolean checkState(IndexerState currentState) { } } + /** + * This method is an atomic combination of `finishAndSetState` and `checkState` + */ + private void finishAndCheckState() { + AtomicBoolean callOnStop = new AtomicBoolean(false); + AtomicBoolean callOnAbort = new AtomicBoolean(false); + state.updateAndGet(prev -> { + callOnAbort.set(false); + callOnStop.set(false); + switch (prev) { + case INDEXING: + return IndexerState.STARTED; + case STOPPING: + callOnStop.set(true); + return IndexerState.STOPPED; + case ABORTING: + callOnAbort.set(true); + return IndexerState.ABORTING; + case STOPPED: + return IndexerState.STOPPED; + default: + // any other state is unanticipated at this point + throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]"); + } + }); + + if (callOnStop.get()) { + onStop(); + logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer."); + doSaveState(finishAndSetState(), getPosition(), () -> {}); + } else if (callOnAbort.get()) { + onAbort(); + } + } + } From 6520cacdfc4d2331ce2d0cc0a275def8aa228dac Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Fri, 30 Aug 2019 12:44:49 -0500 Subject: [PATCH 3/4] changing stopping conditions in `onStart` --- .../core/indexing/AsyncTwoPhaseIndexer.java | 46 +++---------------- 1 file changed, 7 insertions(+), 39 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 9409376e4e011..ca6aa5f822298 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -158,10 +158,13 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { if (r) { nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); } else { - // If our state is INDEXING then transition to STARTED - // Calling finishAndCheckState here covers the scenarios when - // The indexer was aborted or stopped between the transition to INDEXING and now - finishAndCheckState(); + // If we transition to `STOPPED` this means the previous state was `STOPPING`. + // This is because we are guaranteed to originally been in an INDEXING state if `stop` is called. + // Treat this like a `checkState` call that is transitioning to a STOPPED state. + if (finishAndSetState() == IndexerState.STOPPED) { + logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer."); + doSaveState(getState(), getPosition(), () -> {}); + } } }, this::finishWithFailure)); @@ -461,39 +464,4 @@ private boolean checkState(IndexerState currentState) { } } - /** - * This method is an atomic combination of `finishAndSetState` and `checkState` - */ - private void finishAndCheckState() { - AtomicBoolean callOnStop = new AtomicBoolean(false); - AtomicBoolean callOnAbort = new AtomicBoolean(false); - state.updateAndGet(prev -> { - callOnAbort.set(false); - callOnStop.set(false); - switch (prev) { - case INDEXING: - return IndexerState.STARTED; - case STOPPING: - callOnStop.set(true); - return IndexerState.STOPPED; - case ABORTING: - callOnAbort.set(true); - return IndexerState.ABORTING; - case STOPPED: - return IndexerState.STOPPED; - default: - // any other state is unanticipated at this point - throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]"); - } - }); - - if (callOnStop.get()) { - onStop(); - logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer."); - doSaveState(finishAndSetState(), getPosition(), () -> {}); - } else if (callOnAbort.get()) { - onAbort(); - } - } - } From 442b707dcf11686e50e2a6fa25f85e4b850389c2 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Fri, 30 Aug 2019 15:14:31 -0500 Subject: [PATCH 4/4] allow indexer to finish when exiting early --- .../core/indexing/AsyncTwoPhaseIndexer.java | 10 +++----- .../transforms/DataFrameTransformTask.java | 24 ++++++++++++++++++- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index ca6aa5f822298..52aa9304ce264 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -158,13 +158,9 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { if (r) { nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); } else { - // If we transition to `STOPPED` this means the previous state was `STOPPING`. - // This is because we are guaranteed to originally been in an INDEXING state if `stop` is called. - // Treat this like a `checkState` call that is transitioning to a STOPPED state. - if (finishAndSetState() == IndexerState.STOPPED) { - logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer."); - doSaveState(getState(), getPosition(), () -> {}); - } + onFinish(ActionListener.wrap( + onFinishResponse -> doSaveState(finishAndSetState(), position.get(), () -> {}), + onFinishFailure -> doSaveState(finishAndSetState(), position.get(), () -> {}))); } }, this::finishWithFailure)); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 7a4162ad6e5ef..0515640f2baa2 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -640,6 +640,8 @@ static class ClientDataFrameIndexer extends DataFrameIndexer { private final DataFrameTransformTask transformTask; private final AtomicInteger failureCount; private volatile boolean auditBulkFailures = true; + // Indicates that the source has changed for the current run + private volatile boolean hasSourceChanged = true; // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index private volatile String lastAuditedExceptionMessage = null; private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); @@ -760,18 +762,26 @@ protected void onStart(long now, ActionListener listener) { if (transformTask.currentCheckpoint.get() > 0 && initialRun()) { sourceHasChanged(ActionListener.wrap( hasChanged -> { + hasSourceChanged = hasChanged; if (hasChanged) { transformTask.changesLastDetectedAt = Instant.now(); logger.debug("[{}] source has changed, triggering new indexer run.", transformId); changedSourceListener.onResponse(null); } else { + logger.trace("[{}] source has not changed, finish indexer early.", transformId); // No changes, stop executing listener.onResponse(false); } }, - listener::onFailure + failure -> { + // If we failed determining if the source changed, it's safer to assume there were changes. + // We should allow the failure path to complete as normal + hasSourceChanged = true; + listener.onFailure(failure); + } )); } else { + hasSourceChanged = true; changedSourceListener.onResponse(null); } } @@ -869,6 +879,13 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p next.run(); return; } + // This means that the indexer was triggered to discover changes, found none, and exited early. + // If the state is `STOPPED` this means that DataFrameTransformTask#stop was called while we were checking for changes. + // Allow the stop call path to continue + if (hasSourceChanged == false && indexerState.equals(IndexerState.STOPPED) == false) { + next.run(); + return; + } // If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING` // OR we called `doSaveState` manually as the indexer was not actively running. // Since we save the state to an index, we should make sure that our task state is in parity with the indexer state @@ -959,6 +976,11 @@ protected void onFailure(Exception exc) { @Override protected void onFinish(ActionListener listener) { try { + // This indicates an early exit since no changes were found. + // So, don't treat this like a checkpoint being completed, as no work was done. + if (hasSourceChanged == false) { + listener.onResponse(null); + } // TODO: needs cleanup super is called with a listener, but listener.onResponse is called below // super.onFinish() fortunately ignores the listener super.onFinish(listener);