From 21418a4a7f1b5553508c4031da89f288d41283a3 Mon Sep 17 00:00:00 2001
From: Kaituo Li <kaituo@amazon.com>
Date: Wed, 30 Dec 2020 17:15:54 -0800
Subject: [PATCH] Fix another case of the profile API returns prematurely
 (#353)

Testing done:
1. Manually verified the issue has been fixed.
2. Reproduced the issue using an unit test and verified fix using the test.
---
 .../ad/AnomalyDetectorProfileRunner.java      |  4 +--
 .../metrics/CardinalityProfileTests.java      | 26 ++++++++++++++++++-
 2 files changed, 26 insertions(+), 4 deletions(-)

diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java
index 535deb68..264dce21 100644
--- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java
+++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java
@@ -188,7 +188,6 @@ private void prepareProfile(
                             CommonErrorMessages.FAIL_FETCH_ERR_MSG + detectorId,
                             false
                         );
-
                     if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
                         GetRequest getStateRequest = new GetRequest(DetectorInternalState.DETECTOR_STATE_INDEX, detectorId);
                         client.get(getStateRequest, onGetDetectorState(delegateListener, detectorId, enabledTimeMs));
@@ -459,8 +458,8 @@ private ActionListener<SearchResponse> onInittedEver(
                 processInitResponse(detector, profilesToCollect, totalUpdates, false, profileBuilder, listener);
             } else {
                 createRunningStateAndInitProgress(profilesToCollect, profileBuilder);
+                listener.onResponse(profileBuilder.build());
             }
-            listener.onResponse(profileBuilder.build());
         }, exception -> {
             if (exception instanceof IndexNotFoundException) {
                 // anomaly result index is not created yet
@@ -554,7 +553,6 @@ private void processInitResponse(
             } else {
                 long intervalMins = ((IntervalTimeConfiguration) detector.getDetectionInterval()).toDuration().toMinutes();
                 InitProgressProfile initProgress = computeInitProgressProfile(totalUpdates, intervalMins);
-
                 builder.initProgress(initProgress);
             }
         }
diff --git a/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityProfileTests.java b/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityProfileTests.java
index 0be67df3..10d7e463 100644
--- a/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityProfileTests.java
+++ b/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityProfileTests.java
@@ -33,6 +33,7 @@
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.get.GetRequest;
@@ -111,6 +112,7 @@ private void setUpMultiEntityClientGet(DetectorStatus detectorStatus, JobStatus
             } else if (request.index().equals(DetectorInternalState.DETECTOR_STATE_INDEX)) {
                 switch (errorResultStatus) {
                     case NO_ERROR:
+                        listener.onResponse(null);
                         break;
                     case NULL_POINTER_EXCEPTION:
                         GetResponse response = mock(GetResponse.class);
@@ -237,7 +239,29 @@ public void testFailGetState() throws IOException, InterruptedException {
         assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS));
     }
 
-    public void testFaiConfirmInitted() throws IOException, InterruptedException {
+    public void testNoResultsNoError() throws IOException, InterruptedException {
+        setUpMultiEntityClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, ErrorResultStatus.NO_ERROR);
+        setUpMultiEntityClientSearch(ADResultStatus.NO_RESULT, CardinalityStatus.NORMAL);
+        setUpProfileAction();
+
+        final AtomicInteger called = new AtomicInteger(0);
+
+        runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> {
+            assertTrue(response.getInitProgress() != null);
+            called.getAndIncrement();
+        }, exception -> {
+            assertTrue("Should not reach here ", false);
+            called.getAndIncrement();
+        }), totalInitProgress);
+
+        while (called.get() == 0) {
+            Thread.sleep(100);
+        }
+        // should only call onResponse once
+        assertEquals(1, called.get());
+    }
+
+    public void testFailConfirmInitted() throws IOException, InterruptedException {
         setUpMultiEntityClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, ErrorResultStatus.NO_ERROR);
         setUpMultiEntityClientSearch(ADResultStatus.EXCEPTION, CardinalityStatus.NORMAL);
         setUpProfileAction();