From c2b6fbbbfa8488f9dba36f2fcf41c2597c035ec8 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 5 Oct 2023 16:52:12 -0700 Subject: [PATCH] Correctly Set query status (#2227) (#2231) (#2232) * Correctly Set query status Previously, the query status was set to SUCCESS only when the EMR-S job status was 'SUCCESS'. However, for index queries, even when the EMR Job Run State is 'RUNNING', the result should indicate success. This PR addresses and resolves this inconsistency. Tests done: * Manual verification: Created a skipping index and confirmed the async query result is marked 'successful' instead of 'running'. * Updated relevant unit tests. * add unit test --------- (cherry picked from commit cd9d768760f1980b0e297e25d5018ecbfbd2220b) (cherry picked from commit 00ae8ce330d2b354de2c8e5849499dd627bfb4e9) Signed-off-by: Kaituo Li Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../ppl/admin/connectors/s3glue_connector.rst | 15 +++++--- .../dispatcher/SparkQueryDispatcher.java | 34 +++++++++++-------- .../response/JobExecutionResponseReader.java | 8 ++++- .../dispatcher/SparkQueryDispatcherTest.java | 9 ++--- ...AsyncQueryExecutionResponseReaderTest.java | 10 ++++++ 5 files changed, 52 insertions(+), 24 deletions(-) diff --git a/docs/user/ppl/admin/connectors/s3glue_connector.rst b/docs/user/ppl/admin/connectors/s3glue_connector.rst index ef27cf572a..e150357679 100644 --- a/docs/user/ppl/admin/connectors/s3glue_connector.rst +++ b/docs/user/ppl/admin/connectors/s3glue_connector.rst @@ -14,10 +14,18 @@ S3Glue Connector Introduction ============ +Properties in DataSource Configuration + +* name: A unique identifier for the data source within a domain. +* connector: Currently supports the following connectors: s3glue, spark, prometheus, and opensearch. +* resultIndex: Stores the results of queries executed on the data source. If unavailable, it defaults to .query_execution_result. + +Glue Connector +======================================================== + s3Glue connector provides a way to query s3 files using glue as metadata store and spark as execution engine. This page covers s3Glue datasource configuration and also how to query and s3Glue datasource. - Required resources for s3 Glue Connector =================================== * S3: This is where the data lies. @@ -27,8 +35,6 @@ Required resources for s3 Glue Connector We currently only support emr-serverless as spark execution engine and Glue as metadata store. we will add more support in future. -Glue Connector Properties in DataSource Configuration -======================================================== Glue Connector Properties. * ``glue.auth.type`` [Required] @@ -59,7 +65,8 @@ Glue datasource configuration:: "glue.indexstore.opensearch.auth" :"basicauth", "glue.indexstore.opensearch.auth.username" :"username" "glue.indexstore.opensearch.auth.password" :"password" - } + }, + "resultIndex": "query_execution_result" }] [{ diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index f5ef419294..dcce11fd55 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -63,34 +63,38 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) } } - // TODO : Fetch from Result Index and then make call to EMR Serverless. public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) { - GetJobRunResult getJobRunResult = - emrServerlessClient.getJobRunResult( - asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId()); - String jobState = getJobRunResult.getJobRun().getState(); + // either empty json when the result is not available or data with status + // Fetch from Result Index JSONObject result = - (jobState.equals(JobRunState.SUCCESS.toString())) - ? jobExecutionResponseReader.getResultFromOpensearchIndex( - asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex()) - : new JSONObject(); + jobExecutionResponseReader.getResultFromOpensearchIndex( + asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex()); // if result index document has a status, we are gonna use the status directly; otherwise, we - // will use emr-s job status - // a job is successful does not mean there is no error in execution. For example, even if result - // index mapping - // is incorrect, we still write query result and let the job finish. + // will use emr-s job status. + // That a job is successful does not mean there is no error in execution. For example, even if + // result + // index mapping is incorrect, we still write query result and let the job finish. + // That a job is running does not mean the status is running. For example, index/streaming Query + // is a + // long-running job which runs forever. But we need to return success from the result index + // immediately. if (result.has(DATA_FIELD)) { JSONObject items = result.getJSONObject(DATA_FIELD); - // If items have STATUS_FIELD, use it; otherwise, use jobState - String status = items.optString(STATUS_FIELD, jobState); + // If items have STATUS_FIELD, use it; otherwise, mark failed + String status = items.optString(STATUS_FIELD, JobRunState.FAILED.toString()); result.put(STATUS_FIELD, status); // If items have ERROR_FIELD, use it; otherwise, set empty string String error = items.optString(ERROR_FIELD, ""); result.put(ERROR_FIELD, error); } else { + // make call to EMR Serverless when related result index documents are not available + GetJobRunResult getJobRunResult = + emrServerlessClient.getJobRunResult( + asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId()); + String jobState = getJobRunResult.getJobRun().getState(); result.put(STATUS_FIELD, jobState); result.put(ERROR_FIELD, ""); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java index 5da0ef44fe..d3cbd68dce 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java +++ b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java @@ -16,6 +16,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; import org.opensearch.common.action.ActionFuture; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; @@ -46,8 +47,14 @@ private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) { searchSourceBuilder.query(query); searchRequest.source(searchSourceBuilder); ActionFuture searchResponseActionFuture; + JSONObject data = new JSONObject(); try { searchResponseActionFuture = client.search(searchRequest); + } catch (IndexNotFoundException e) { + // if there is no result index (e.g., EMR-S hasn't created the index yet), we return empty + // json + LOG.info(resultIndex + " is not created yet."); + return data; } catch (Exception e) { throw new RuntimeException(e); } @@ -59,7 +66,6 @@ private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) { + " index failed with status : " + searchResponse.status()); } else { - JSONObject data = new JSONObject(); for (SearchHit searchHit : searchResponse.getHits().getHits()) { data.put(DATA_FIELD, searchHit.getSourceAsMap()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index c89c122d11..925e6f1a90 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -613,11 +613,14 @@ void testGetQueryResponse() { flintIndexMetadataReader); when(emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID)) .thenReturn(new GetJobRunResult().withJobRun(new JobRun().withState(JobRunState.PENDING))); + + // simulate result index is not created yet + when(jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)) + .thenReturn(new JSONObject()); JSONObject result = sparkQueryDispatcher.getQueryResponse( new AsyncQueryJobMetadata(EMRS_APPLICATION_ID, EMR_JOB_ID, null)); Assertions.assertEquals("PENDING", result.get("status")); - verifyNoInteractions(jobExecutionResponseReader); } @Test @@ -629,8 +632,6 @@ void testGetQueryResponseWithSuccess() { dataSourceUserAuthorizationHelper, jobExecutionResponseReader, flintIndexMetadataReader); - when(emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID)) - .thenReturn(new GetJobRunResult().withJobRun(new JobRun().withState(JobRunState.SUCCESS))); JSONObject queryResult = new JSONObject(); Map resultMap = new HashMap<>(); resultMap.put(STATUS_FIELD, "SUCCESS"); @@ -641,7 +642,6 @@ void testGetQueryResponseWithSuccess() { JSONObject result = sparkQueryDispatcher.getQueryResponse( new AsyncQueryJobMetadata(EMRS_APPLICATION_ID, EMR_JOB_ID, null)); - verify(emrServerlessClient, times(1)).getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(jobExecutionResponseReader, times(1)).getResultFromOpensearchIndex(EMR_JOB_ID, null); Assertions.assertEquals( new HashSet<>(Arrays.asList(DATA_FIELD, STATUS_FIELD, ERROR_FIELD)), result.keySet()); @@ -655,6 +655,7 @@ void testGetQueryResponseWithSuccess() { // We need similar. Assertions.assertTrue(dataJson.similar(result.get(DATA_FIELD))); Assertions.assertEquals("SUCCESS", result.get(STATUS_FIELD)); + verifyNoInteractions(emrServerlessClient); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java b/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java index 7d7ebd42b3..fefc951dd7 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; import static org.opensearch.sql.spark.constants.TestConstants.EMR_JOB_ID; @@ -24,6 +25,7 @@ import org.opensearch.client.Client; import org.opensearch.common.action.ActionFuture; import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -91,4 +93,12 @@ public void testSearchFailure() { RuntimeException.class, () -> jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)); } + + @Test + public void testIndexNotFoundException() { + when(client.search(any())).thenThrow(IndexNotFoundException.class); + JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); + assertTrue( + jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, "foo").isEmpty()); + } }