diff --git a/docs/changelog/93000.yaml b/docs/changelog/93000.yaml new file mode 100644 index 0000000000000..33c933e393b64 --- /dev/null +++ b/docs/changelog/93000.yaml @@ -0,0 +1,5 @@ +pr: 93000 +summary: Persist data counts and datafeed timing stats asynchronously +area: Machine Learning +type: enhancement +issues: [] diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index a5996f517a353..8f6b82c9d674e 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -491,7 +491,7 @@ public void testGetDataCountsModelSizeAndTimingStatsWithSomeDocs() throws Except storedDataCounts.incrementInputBytes(1L); storedDataCounts.incrementMissingFieldCount(1L); JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client(), resultsPersisterService, auditor); - jobDataCountsPersister.persistDataCounts(job.getId(), storedDataCounts); + jobDataCountsPersister.persistDataCounts(job.getId(), storedDataCounts, true); jobResultsPersister.commitWrites(job.getId(), JobResultsPersister.CommitType.RESULTS); setOrThrow.get(); @@ -1046,9 +1046,9 @@ private void indexScheduledEvents(List events) throws IOExceptio } } - private void indexDataCounts(DataCounts counts, String jobId) { + private void indexDataCounts(DataCounts counts, String jobId) throws InterruptedException { JobDataCountsPersister persister = new JobDataCountsPersister(client(), resultsPersisterService, auditor); - persister.persistDataCounts(jobId, counts); + persister.persistDataCounts(jobId, counts, true); } private void indexFilters(List filters) throws IOException { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index 1c9c01e55cbe4..e9f857b2650af 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -140,7 +140,7 @@ private void previewDatafeed( job, xContentRegistry, // Fake DatafeedTimingStatsReporter that does not have access to results index - new DatafeedTimingStatsReporter(new DatafeedTimingStats(datafeedConfig.getJobId()), (ts, refreshPolicy) -> {}), + new DatafeedTimingStatsReporter(new DatafeedTimingStats(datafeedConfig.getJobId()), (ts, refreshPolicy, listener1) -> {}), listener.delegateFailure( (l, dataExtractorFactory) -> isDateNanos( previewDatafeedConfig, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 4212e80d9dc06..ce5ab42fa2eea 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -355,7 +355,7 @@ private void createDataExtractor( job, xContentRegistry, // Fake DatafeedTimingStatsReporter that does not have access to results index - new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), (ts, refreshPolicy) -> {}), + new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), (ts, refreshPolicy, listener1) -> {}), ActionListener.wrap( unused -> persistentTasksService.sendStartRequest( MlTasks.datafeedTaskId(params.getDatafeedId()), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java index 93ba370e93b89..5fa73fab46647 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java @@ -8,38 +8,47 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.core.TimeValue; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import java.util.Objects; +import java.util.concurrent.CountDownLatch; /** - * {@link DatafeedTimingStatsReporter} class handles the logic of persisting {@link DatafeedTimingStats} if they changed significantly - * since the last time they were persisted. - * + * {@link DatafeedTimingStatsReporter} class handles the logic of persisting {@link DatafeedTimingStats} for a single job + * if they changed significantly since the last time they were persisted. + *

* This class is not thread-safe. */ public class DatafeedTimingStatsReporter { - private static final Logger LOGGER = LogManager.getLogger(DatafeedTimingStatsReporter.class); + private static final Logger logger = LogManager.getLogger(DatafeedTimingStatsReporter.class); /** Interface used for persisting current timing stats to the results index. */ @FunctionalInterface public interface DatafeedTimingStatsPersister { /** Does nothing by default. This behavior is useful when creating fake {@link DatafeedTimingStatsReporter} objects. */ - void persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy); + void persistDatafeedTimingStats( + DatafeedTimingStats timingStats, + WriteRequest.RefreshPolicy refreshPolicy, + ActionListener listener + ); } /** Persisted timing stats. May be stale. */ - private DatafeedTimingStats persistedTimingStats; + private volatile DatafeedTimingStats persistedTimingStats; /** Current timing stats. */ - private volatile DatafeedTimingStats currentTimingStats; + private final DatafeedTimingStats currentTimingStats; /** Object used to persist current timing stats. */ private final DatafeedTimingStatsPersister persister; /** Whether or not timing stats will be persisted by the persister object. */ private volatile boolean allowedPersisting; + /** Records whether a persist is currently in progress. */ + private CountDownLatch persistInProgressLatch; public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, DatafeedTimingStatsPersister persister) { Objects.requireNonNull(timingStats); @@ -81,9 +90,11 @@ public void reportDataCounts(DataCounts dataCounts) { /** Finishes reporting of timing stats. Makes timing stats persisted immediately. */ public void finishReporting() { - // Don't flush if current timing stats are identical to the persisted ones - if (currentTimingStats.equals(persistedTimingStats) == false) { - flush(WriteRequest.RefreshPolicy.IMMEDIATE); + try { + flush(WriteRequest.RefreshPolicy.IMMEDIATE, true); + } catch (InterruptedException e) { + logger.warn("[{}] interrupted while finishing reporting of datafeed timing stats", currentTimingStats.getJobId()); + Thread.currentThread().interrupt(); } } @@ -94,19 +105,50 @@ public void disallowPersisting() { private void flushIfDifferSignificantly() { if (differSignificantly(currentTimingStats, persistedTimingStats)) { - flush(WriteRequest.RefreshPolicy.NONE); + try { + flush(WriteRequest.RefreshPolicy.NONE, false); + } catch (InterruptedException e) { + assert false : "This should never happen when flush is called with mustWait set to false"; + Thread.currentThread().interrupt(); + } } } - private void flush(WriteRequest.RefreshPolicy refreshPolicy) { - persistedTimingStats = new DatafeedTimingStats(currentTimingStats); - if (allowedPersisting) { - try { - persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy); - } catch (Exception ex) { - // Since persisting datafeed timing stats is not critical, we just log a warning here. - LOGGER.warn(() -> "[" + currentTimingStats.getJobId() + "] failed to report datafeed timing stats", ex); - } + private synchronized void flush(WriteRequest.RefreshPolicy refreshPolicy, boolean mustWait) throws InterruptedException { + String jobId = currentTimingStats.getJobId(); + if (allowedPersisting && mustWait && persistInProgressLatch != null) { + persistInProgressLatch.await(); + persistInProgressLatch = null; + } + // Don't persist if: + // 1. Persistence is disallowed + // 2. There is already a persist in progress + // 3. Current timing stats are identical to the persisted ones + if (allowedPersisting == false) { + logger.trace("[{}] not persisting datafeed timing stats as persistence is disallowed", jobId); + return; + } + if (persistInProgressLatch != null && persistInProgressLatch.getCount() > 0) { + logger.trace("[{}] not persisting datafeed timing stats as the previous persist is still in progress", jobId); + return; + } + if (currentTimingStats.equals(persistedTimingStats)) { + logger.trace("[{}] not persisting datafeed timing stats as they are identical to latest already persisted", jobId); + return; + } + final CountDownLatch thisPersistLatch = new CountDownLatch(1); + final DatafeedTimingStats thisPersistTimingStats = new DatafeedTimingStats(currentTimingStats); + persistInProgressLatch = thisPersistLatch; + persister.persistDatafeedTimingStats(thisPersistTimingStats, refreshPolicy, ActionListener.wrap(r -> { + persistedTimingStats = thisPersistTimingStats; + thisPersistLatch.countDown(); + }, e -> { + thisPersistLatch.countDown(); + // Since persisting datafeed timing stats is not critical, we just log a warning here. + logger.warn(() -> "[" + jobId + "] failed to report datafeed timing stats", e); + })); + if (mustWait) { + thisPersistLatch.await(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 2ad36e4123a36..aa07df14a10d4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -688,7 +688,7 @@ public void revertSnapshot( CheckedConsumer updateHandler = response -> { if (response) { ModelSizeStats revertedModelSizeStats = new ModelSizeStats.Builder(modelSizeStats).setLogTime(new Date()).build(); - jobResultsPersister.persistModelSizeStats( + jobResultsPersister.persistModelSizeStatsWithoutRetries( revertedModelSizeStats, WriteRequest.RefreshPolicy.IMMEDIATE, ActionListener.wrap(modelSizeStatsResponseHandler, actionListener::onFailure) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java index 04d6429d4ab39..c5c296374f65c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java @@ -23,14 +23,17 @@ import java.io.IOException; import java.time.Instant; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** - * Update a job's dataCounts - * i.e. the number of processed records, fields etc. + * Updates job data counts, i.e. the number of processed records, fields etc. + * One instance of this class handles updates for all jobs. */ public class JobDataCountsPersister { @@ -40,6 +43,8 @@ public class JobDataCountsPersister { private final Client client; private final AnomalyDetectionAuditor auditor; + private final Map ongoingPersists = new ConcurrentHashMap<>(); + public JobDataCountsPersister(Client client, ResultsPersisterService resultsPersisterService, AnomalyDetectionAuditor auditor) { this.resultsPersisterService = resultsPersisterService; this.client = client; @@ -52,12 +57,27 @@ private static XContentBuilder serialiseCounts(DataCounts counts) throws IOExcep } /** - * Update the job's data counts stats and figures. - * NOTE: This call is synchronous and pauses the calling thread. - * @param jobId Job to update - * @param counts The counts + * Update a job's data counts stats and figures. + * If the previous call for the same job is still in progress + * @param jobId Job to update. + * @param counts The counts. + * @param mustWait Whether to wait for the counts to be persisted. + * This will involve waiting for the supplied counts + * and also potentially the previous counts to be + * persisted if that previous persist is still ongoing. + * @return true if the counts were sent for persistence, or false + * if the previous persist was still in progress. */ - public void persistDataCounts(String jobId, DataCounts counts) { + public boolean persistDataCounts(String jobId, DataCounts counts, boolean mustWait) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch previousLatch = ongoingPersists.putIfAbsent(jobId, latch); + while (previousLatch != null) { + if (mustWait == false) { + return false; + } + previousLatch.await(); + previousLatch = ongoingPersists.putIfAbsent(jobId, latch); + } counts.setLogTime(Instant.now()); try { resultsPersisterService.indexWithRetry( @@ -69,21 +89,29 @@ public void persistDataCounts(String jobId, DataCounts counts) { DataCounts.documentId(jobId), true, () -> true, - retryMessage -> logger.debug("[{}] Job data_counts {}", jobId, retryMessage) + retryMessage -> logger.debug("[{}] Job data_counts {}", jobId, retryMessage), + ActionListener.wrap(r -> ongoingPersists.remove(jobId).countDown(), e -> { + ongoingPersists.remove(jobId).countDown(); + logger.error(() -> "[" + jobId + "] Failed persisting data_counts stats", e); + auditor.error(jobId, "Failed persisting data_counts stats: " + e.getMessage()); + }) ); - } catch (IOException ioe) { - logger.error(() -> "[" + jobId + "] Failed writing data_counts stats", ioe); - } catch (Exception ex) { - logger.error(() -> "[" + jobId + "] Failed persisting data_counts stats", ex); - auditor.error(jobId, "Failed persisting data_counts stats: " + ex.getMessage()); + } catch (IOException e) { + // An exception caught here basically means toXContent() failed, which should never happen + logger.error(() -> "[" + jobId + "] Failed writing data_counts stats", e); + return false; + } + if (mustWait) { + latch.await(); } + return true; } /** - * The same as {@link JobDataCountsPersister#persistDataCounts(String, DataCounts)} but done Asynchronously. - * + * Very similar to {@link JobDataCountsPersister#persistDataCounts(String, DataCounts, boolean)}. + *

* Two differences are: - * - The listener is notified on persistence failure + * - The caller is notified on persistence failure * - If the persistence fails, it is not automatically retried * @param jobId Job to update * @param counts The counts diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 57b906990df92..c2661fc933ca9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -350,7 +351,7 @@ public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy ref Persistable persistable = new Persistable(indexOrAlias, quantiles.getJobId(), quantiles, quantilesDocId); persistable.setRefreshPolicy(refreshPolicy); - persistable.persist(listener, AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)); + persistable.persistWithoutRetries(listener, AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)); }, listener::onFailure); // Step 1: Search for existing quantiles document in .ml-state* @@ -410,7 +411,7 @@ public void persistModelSizeStats(ModelSizeStats modelSizeStats, Supplier listener @@ -424,7 +425,7 @@ public void persistModelSizeStats( modelSizeStats.getId() ); persistable.setRefreshPolicy(refreshPolicy); - persistable.persist(listener, true); + persistable.persistWithoutRetries(listener, true); } /** @@ -486,8 +487,13 @@ public void commitWrites(String jobId, Set commitTypes) { * * @param timingStats datafeed timing stats to persist * @param refreshPolicy refresh policy to apply + * @param listener listener for response or error */ - public BulkResponse persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy) { + public void persistDatafeedTimingStats( + DatafeedTimingStats timingStats, + WriteRequest.RefreshPolicy refreshPolicy, + ActionListener listener + ) { String jobId = timingStats.getJobId(); logger.trace("[{}] Persisting datafeed timing stats", jobId); Persistable persistable = new Persistable( @@ -498,7 +504,7 @@ public BulkResponse persistDatafeedTimingStats(DatafeedTimingStats timingStats, DatafeedTimingStats.documentId(timingStats.getJobId()) ); persistable.setRefreshPolicy(refreshPolicy); - return persistable.persist(() -> true, true); + persistable.persist(() -> true, true, listener); } private static XContentBuilder toXContentBuilder(ToXContent obj, ToXContent.Params params) throws IOException { @@ -534,9 +540,15 @@ void setRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) { } BulkResponse persist(Supplier shouldRetry, boolean requireAlias) { + final PlainActionFuture getResponseFuture = PlainActionFuture.newFuture(); + persist(shouldRetry, requireAlias, getResponseFuture); + return getResponseFuture.actionGet(); + } + + void persist(Supplier shouldRetry, boolean requireAlias, ActionListener listener) { logCall(); try { - return resultsPersisterService.indexWithRetry( + resultsPersisterService.indexWithRetry( jobId, indexName, object, @@ -545,20 +557,23 @@ BulkResponse persist(Supplier shouldRetry, boolean requireAlias) { id, requireAlias, shouldRetry, - retryMessage -> logger.debug("[{}] {} {}", jobId, id, retryMessage) + retryMessage -> logger.debug("[{}] {} {}", jobId, id, retryMessage), + listener ); } catch (IOException e) { logger.error(() -> format("[%s] Error writing [%s]", jobId, (id == null) ? "auto-generated ID" : id), e); IndexResponse.Builder notCreatedResponse = new IndexResponse.Builder(); notCreatedResponse.setResult(Result.NOOP); - return new BulkResponse( - new BulkItemResponse[] { BulkItemResponse.success(0, DocWriteRequest.OpType.INDEX, notCreatedResponse.build()) }, - 0 + listener.onResponse( + new BulkResponse( + new BulkItemResponse[] { BulkItemResponse.success(0, DocWriteRequest.OpType.INDEX, notCreatedResponse.build()) }, + 0 + ) ); } } - void persist(ActionListener listener, boolean requireAlias) { + void persistWithoutRetries(ActionListener listener, boolean requireAlias) { logCall(); try (XContentBuilder content = toXContentBuilder(object, params)) { @@ -585,5 +600,4 @@ private void logCall() { } } } - } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index 306425c50d3ca..610386e33bc2c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -20,7 +20,7 @@ /** * Status reporter for tracking counts of the good/bad records written to the API. * Call one of the reportXXX() methods to update the records counts. - * + *

* Stats are logged at specific stages *

    *
  1. Every 10,000 records for the first 100,000 records
  2. @@ -43,6 +43,8 @@ public class DataCountsReporter { private final DataCounts totalRecordStats; private volatile DataCounts incrementalRecordStats; + private DataCounts unreportedStats; + private long analyzedFieldsPerRecord = 1; private long lastRecordCountQuotient = 0; @@ -236,7 +238,32 @@ public void finishReporting() { totalRecordStats.setLastDataTimeStamp(now); diagnostics.flush(); retrieveDiagnosticsIntermediateResults(); - dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats()); + synchronized (this) { + unreportedStats = null; + } + DataCounts statsToReport = runningTotalStats(); + try { + if (dataCountsPersister.persistDataCounts(job.getId(), statsToReport, false) == false) { + synchronized (this) { + unreportedStats = statsToReport; + } + } + } catch (InterruptedException e) { + assert false : "This should never happen when persistDataCounts is called with mustWait set to false"; + Thread.currentThread().interrupt(); + } + } + + /** + * Report the most recent counts, if any, that {@link #finishReporting} failed to + * report when the previous persistence was in progress. + */ + public synchronized void writeUnreportedCounts() throws InterruptedException { + if (unreportedStats != null) { + boolean persisted = dataCountsPersister.persistDataCounts(job.getId(), unreportedStats, true); + assert persisted; + unreportedStats = null; + } } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index c21c3ddb6b9cb..1136f4c083cc0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -180,6 +180,7 @@ public void close() { try { future.get(); autodetectWorkerExecutor.shutdown(); + dataCountsReporter.writeUnreportedCounts(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { @@ -256,6 +257,11 @@ public void writeUpdateProcessMessage(UpdateProcessMessage update, BiConsumer handler) { submitOperation(() -> { String flushId = autodetectProcess.flushJob(params); + try { + dataCountsReporter.writeUnreportedCounts(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } return waitFlushToCompletion(flushId, params.isWaitForNormalization()); }, handler); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java index cf168e370424d..de418364732e0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java @@ -41,8 +41,6 @@ import java.io.IOException; import java.time.Duration; import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.function.BiConsumer; @@ -58,20 +56,16 @@ public class ResultsPersisterService { /** * List of rest statuses that we consider irrecoverable */ - public static final Set IRRECOVERABLE_REST_STATUSES = Collections.unmodifiableSet( - new HashSet<>( - Arrays.asList( - RestStatus.GONE, - RestStatus.NOT_IMPLEMENTED, - // Not found is returned when we require an alias but the index is NOT an alias. - RestStatus.NOT_FOUND, - RestStatus.BAD_REQUEST, - RestStatus.UNAUTHORIZED, - RestStatus.FORBIDDEN, - RestStatus.METHOD_NOT_ALLOWED, - RestStatus.NOT_ACCEPTABLE - ) - ) + public static final Set IRRECOVERABLE_REST_STATUSES = Set.of( + RestStatus.GONE, + RestStatus.NOT_IMPLEMENTED, + // Not found is returned when we require an alias but the index is NOT an alias. + RestStatus.NOT_FOUND, + RestStatus.BAD_REQUEST, + RestStatus.UNAUTHORIZED, + RestStatus.FORBIDDEN, + RestStatus.METHOD_NOT_ALLOWED, + RestStatus.NOT_ACCEPTABLE ); private static final Logger LOGGER = LogManager.getLogger(ResultsPersisterService.class); @@ -161,6 +155,25 @@ public BulkResponse indexWithRetry( return bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, retryMsgHandler); } + public void indexWithRetry( + String jobId, + String indexName, + ToXContent object, + ToXContent.Params params, + WriteRequest.RefreshPolicy refreshPolicy, + String id, + boolean requireAlias, + Supplier shouldRetry, + Consumer retryMsgHandler, + ActionListener finalListener + ) throws IOException { + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy); + try (XContentBuilder content = object.toXContent(XContentFactory.jsonBuilder(), params)) { + bulkRequest.add(new IndexRequest(indexName).id(id).source(content).setRequireAlias(requireAlias)); + } + bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, retryMsgHandler, finalListener); + } + public BulkResponse bulkIndexWithRetry( BulkRequest bulkRequest, String jobId, @@ -170,6 +183,25 @@ public BulkResponse bulkIndexWithRetry( return bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, retryMsgHandler, client::bulk); } + public void bulkIndexWithRetry( + BulkRequest bulkRequest, + String jobId, + Supplier shouldRetry, + Consumer retryMsgHandler, + ActionListener finalListener + ) { + if (isShutdown || isResetMode) { + finalListener.onFailure( + new ElasticsearchException( + "Bulk indexing has failed as {}", + isShutdown ? "node is shutting down." : "machine learning feature is being reset." + ) + ); + return; + } + bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, retryMsgHandler, client::bulk, finalListener); + } + public BulkResponse bulkIndexWithHeadersWithRetry( Map headers, BulkRequest bulkRequest, @@ -206,10 +238,22 @@ private BulkResponse bulkIndexWithRetry( isShutdown ? "node is shutting down." : "machine learning feature is being reset." ); } - final PlainActionFuture getResponse = PlainActionFuture.newFuture(); + final PlainActionFuture getResponseFuture = PlainActionFuture.newFuture(); + bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, retryMsgHandler, actionExecutor, getResponseFuture); + return getResponseFuture.actionGet(); + } + + private void bulkIndexWithRetry( + BulkRequest bulkRequest, + String jobId, + Supplier shouldRetry, + Consumer retryMsgHandler, + BiConsumer> actionExecutor, + ActionListener finalListener + ) { final Object key = new Object(); final ActionListener removeListener = ActionListener.runBefore( - getResponse, + finalListener, () -> onGoingRetryableBulkActions.remove(key) ); BulkRetryableAction bulkRetryableAction = new BulkRetryableAction( @@ -229,7 +273,6 @@ private BulkResponse bulkIndexWithRetry( ) ); } - return getResponse.actionGet(); } public SearchResponse searchWithRetry( diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java index 35267efcf493d..54a791d0e577b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ml.datafeed; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; @@ -23,6 +25,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -40,6 +44,12 @@ public class DatafeedTimingStatsReporterTests extends ESTestCase { @Before public void setUpTests() { timingStatsPersister = mock(DatafeedTimingStatsPersister.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(mock(BulkResponse.class)); + return Void.TYPE; + }).when(timingStatsPersister).persistDatafeedTimingStats(any(), any(), any()); } public void testReportSearchDuration_Null() { @@ -59,7 +69,11 @@ public void testReportSearchDuration_Zero() { reporter.reportSearchDuration(TimeValue.ZERO); assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0))); - verify(timingStatsPersister).persistDatafeedTimingStats(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0), RefreshPolicy.NONE); + verify(timingStatsPersister).persistDatafeedTimingStats( + eq(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0)), + eq(RefreshPolicy.NONE), + any() + ); verifyNoMoreInteractions(timingStatsPersister); } @@ -81,9 +95,9 @@ public void testReportSearchDuration() { InOrder inOrder = inOrder(timingStatsPersister); inOrder.verify(timingStatsPersister) - .persistDatafeedTimingStats(createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0), RefreshPolicy.NONE); + .persistDatafeedTimingStats(eq(createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0)), eq(RefreshPolicy.NONE), any()); inOrder.verify(timingStatsPersister) - .persistDatafeedTimingStats(createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0), RefreshPolicy.NONE); + .persistDatafeedTimingStats(eq(createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0)), eq(RefreshPolicy.NONE), any()); verifyNoMoreInteractions(timingStatsPersister); } @@ -112,7 +126,7 @@ public void testReportDataCounts() { InOrder inOrder = inOrder(timingStatsPersister); inOrder.verify(timingStatsPersister) - .persistDatafeedTimingStats(createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0), RefreshPolicy.NONE); + .persistDatafeedTimingStats(eq(createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0)), eq(RefreshPolicy.NONE), any()); verifyNoMoreInteractions(timingStatsPersister); } @@ -130,8 +144,9 @@ public void testFinishReporting_WithChange() { reporter.finishReporting(); verify(timingStatsPersister).persistDatafeedTimingStats( - new DatafeedTimingStats(JOB_ID, 0, 0, 0.0, new ExponentialAverageCalculationContext(0.0, TIMESTAMP, null)), - RefreshPolicy.IMMEDIATE + eq(new DatafeedTimingStats(JOB_ID, 0, 0, 0.0, new ExponentialAverageCalculationContext(0.0, TIMESTAMP, null))), + eq(RefreshPolicy.IMMEDIATE), + any() ); verifyNoMoreInteractions(timingStatsPersister); } @@ -205,11 +220,11 @@ public void testTimingStatsDifferSignificantly() { } public void testFinishReportingTimingStatsException() { - doThrow(new ElasticsearchException("BOOM")).when(timingStatsPersister).persistDatafeedTimingStats(any(), any()); + doThrow(new ElasticsearchException("BOOM")).when(timingStatsPersister).persistDatafeedTimingStats(any(), any(), any()); DatafeedTimingStatsReporter reporter = createReporter(new DatafeedTimingStats(JOB_ID)); try { - reporter.reportDataCounts(createDataCounts(0, TIMESTAMP)); + reporter.reportDataCounts(createDataCounts(0)); reporter.finishReporting(); } catch (ElasticsearchException ex) { fail("Should not have failed with: " + ex.getDetailedMessage()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index 3d7f8a10c6ee0..1d556f7399a8a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -289,7 +289,11 @@ public void testPersistDatafeedTimingStats() { 666.0, new ExponentialAverageCalculationContext(600.0, Instant.ofEpochMilli(123456789), 60.0) ); - persister.persistDatafeedTimingStats(timingStats, WriteRequest.RefreshPolicy.IMMEDIATE); + persister.persistDatafeedTimingStats( + timingStats, + WriteRequest.RefreshPolicy.IMMEDIATE, + ActionListener.wrap(r -> {}, e -> fail("unexpected exception " + e.getMessage())) + ); InOrder inOrder = inOrder(client); inOrder.verify(client).settings(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java index 3784e3ae56239..51cf4f5acc7b9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java @@ -18,8 +18,8 @@ import org.mockito.Mockito; import java.time.Instant; -import java.util.Arrays; import java.util.Date; +import java.util.List; import java.util.concurrent.TimeUnit; import static org.mockito.ArgumentMatchers.any; @@ -37,10 +37,10 @@ public class DataCountsReporterTests extends ESTestCase { @Before public void setUpMocks() { - AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); + AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(List.of(new Detector.Builder("metric", "field").build())); acBuilder.setBucketSpan(bucketSpan); acBuilder.setLatency(TimeValue.ZERO); - acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); + acBuilder.setDetectors(List.of(new Detector.Builder("metric", "field").build())); Job.Builder builder = new Job.Builder("sr"); builder.setAnalysisConfig(acBuilder); @@ -153,7 +153,7 @@ public void testReportLatestTimeIncrementalStats() { assertEquals(5001L, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); } - public void testReportRecordsWritten() { + public void testReportRecordsWritten() throws InterruptedException { DataCountsReporter dataCountsReporter = new DataCountsReporter(job, new DataCounts(job.getId()), jobDataCountsPersister); dataCountsReporter.setAnalysedFieldsPerRecord(3); @@ -174,7 +174,7 @@ public void testReportRecordsWritten() { assertEquals(dataCountsReporter.incrementalStats(), dataCountsReporter.runningTotalStats()); - verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class)); + verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class), eq(false)); } public void testReportRecordsWritten_Given9999Records() { @@ -262,7 +262,7 @@ public void testReportRecordsWritten_Given2_000_000Records() { assertEquals(20, dataCountsReporter.getLogStatusCallCount()); } - public void testFinishReporting() { + public void testFinishReporting() throws InterruptedException { DataCountsReporter dataCountsReporter = new DataCountsReporter(job, new DataCounts(job.getId()), jobDataCountsPersister); dataCountsReporter.setAnalysedFieldsPerRecord(3); @@ -300,7 +300,7 @@ public void testFinishReporting() { ); dc.setLastDataTimeStamp(dataCountsReporter.incrementalStats().getLastDataTimeStamp()); - verify(jobDataCountsPersister, times(1)).persistDataCounts(eq("sr"), eq(dc)); + verify(jobDataCountsPersister, times(1)).persistDataCounts(eq("sr"), eq(dc), eq(false)); assertEquals(dc, dataCountsReporter.incrementalStats()); }