From 433e7b4c8a9576f5a59ccb54088532187a69b215 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Fri, 20 Oct 2023 01:29:11 -0700 Subject: [PATCH] Add missing tags and MV support Signed-off-by: Vamsi Manohar --- .../src/main/antlr/FlintSparkSqlExtensions.g4 | 34 +++ spark/src/main/antlr/SparkSqlBase.g4 | 5 + .../dispatcher/SparkQueryDispatcher.java | 33 ++- .../spark/dispatcher/model/IndexDetails.java | 145 +++++++++--- .../sql/spark/dispatcher/model/JobType.java | 37 +++ .../sql/spark/utils/SQLQueryUtils.java | 47 ++-- .../dispatcher/SparkQueryDispatcherTest.java | 222 ++++++++++++------ .../FlintIndexMetadataReaderImplTest.java | 58 ++--- .../sql/spark/flint/IndexDetailsTest.java | 13 +- .../sql/spark/utils/SQLQueryUtilsTest.java | 43 +++- 10 files changed, 455 insertions(+), 182 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/JobType.java diff --git a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 index e8e0264f28..c4af2779d1 100644 --- a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 +++ b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 @@ -17,6 +17,7 @@ singleStatement statement : skippingIndexStatement | coveringIndexStatement + | materializedViewStatement ; skippingIndexStatement @@ -76,6 +77,39 @@ dropCoveringIndexStatement : DROP INDEX indexName ON tableName ; +materializedViewStatement + : createMaterializedViewStatement + | showMaterializedViewStatement + | describeMaterializedViewStatement + | dropMaterializedViewStatement + ; + +createMaterializedViewStatement + : CREATE MATERIALIZED VIEW (IF NOT EXISTS)? mvName=multipartIdentifier + AS query=materializedViewQuery + (WITH LEFT_PAREN propertyList RIGHT_PAREN)? + ; + +showMaterializedViewStatement + : SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier + ; + +describeMaterializedViewStatement + : (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier + ; + +dropMaterializedViewStatement + : DROP MATERIALIZED VIEW mvName=multipartIdentifier + ; + +/* + * Match all remaining tokens in non-greedy way + * so WITH clause won't be captured by this rule. + */ +materializedViewQuery + : .+? + ; + indexColTypeList : indexColType (COMMA indexColType)* ; diff --git a/spark/src/main/antlr/SparkSqlBase.g4 b/spark/src/main/antlr/SparkSqlBase.g4 index 4ac1ced5c4..533d851ba6 100644 --- a/spark/src/main/antlr/SparkSqlBase.g4 +++ b/spark/src/main/antlr/SparkSqlBase.g4 @@ -154,6 +154,7 @@ COMMA: ','; DOT: '.'; +AS: 'AS'; CREATE: 'CREATE'; DESC: 'DESC'; DESCRIBE: 'DESCRIBE'; @@ -161,14 +162,18 @@ DROP: 'DROP'; EXISTS: 'EXISTS'; FALSE: 'FALSE'; IF: 'IF'; +IN: 'IN'; INDEX: 'INDEX'; INDEXES: 'INDEXES'; +MATERIALIZED: 'MATERIALIZED'; NOT: 'NOT'; ON: 'ON'; PARTITION: 'PARTITION'; REFRESH: 'REFRESH'; SHOW: 'SHOW'; TRUE: 'TRUE'; +VIEW: 'VIEW'; +VIEWS: 'VIEWS'; WITH: 'WITH'; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 2bd1ae67b9..f5ed55afd5 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -39,8 +39,8 @@ import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; -import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; import org.opensearch.sql.spark.dispatcher.model.IndexDetails; +import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.session.CreateSessionRequest; import org.opensearch.sql.spark.execution.session.Session; import org.opensearch.sql.spark.execution.session.SessionId; @@ -63,9 +63,8 @@ public class SparkQueryDispatcher { public static final String INDEX_TAG_KEY = "index"; public static final String DATASOURCE_TAG_KEY = "datasource"; - public static final String SCHEMA_TAG_KEY = "schema"; - public static final String TABLE_TAG_KEY = "table"; public static final String CLUSTER_NAME_TAG_KEY = "cluster"; + public static final String JOB_TYPE_TAG_KEY = "job_type"; private EMRServerlessClient emrServerlessClient; @@ -190,6 +189,8 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) { IndexDetails indexDetails = SQLQueryUtils.extractIndexDetails(dispatchQueryRequest.getQuery()); + fillMissingDetails(dispatchQueryRequest, indexDetails); + if (indexDetails.isDropIndex()) { return handleDropIndexQuery(dispatchQueryRequest, indexDetails); } else { @@ -200,17 +201,29 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR } } + // TODO: Revisit this logic. + // Currently, Spark if datasource is not provided in query. + // Spark Assumes the datasource to be catalog. + // This is required to handle drop index case properly when datasource name is not provided. + private static void fillMissingDetails( + DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) { + if (indexDetails.getFullyQualifiedTableName() != null + && indexDetails.getFullyQualifiedTableName().getDatasourceName() == null) { + indexDetails + .getFullyQualifiedTableName() + .setDatasourceName(dispatchQueryRequest.getDatasource()); + } + } + private DispatchQueryResponse handleIndexQuery( DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) { - FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); DataSourceMetadata dataSourceMetadata = this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata); String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query"; Map tags = getDefaultTagsForJobSubmission(dispatchQueryRequest); - tags.put(INDEX_TAG_KEY, indexDetails.getIndexName()); - tags.put(TABLE_TAG_KEY, fullyQualifiedTableName.getTableName()); - tags.put(SCHEMA_TAG_KEY, fullyQualifiedTableName.getSchemaName()); + tags.put(INDEX_TAG_KEY, indexDetails.openSearchIndexName()); + tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); StartJobRequest startJobRequest = new StartJobRequest( dispatchQueryRequest.getQuery(), @@ -221,12 +234,12 @@ private DispatchQueryResponse handleIndexQuery( .dataSource( dataSourceService.getRawDataSourceMetadata( dispatchQueryRequest.getDatasource())) - .structuredStreaming(indexDetails.getAutoRefresh()) + .structuredStreaming(indexDetails.isAutoRefresh()) .extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()) .build() .toString(), tags, - indexDetails.getAutoRefresh(), + indexDetails.isAutoRefresh(), dataSourceMetadata.getResultIndex()); String jobId = emrServerlessClient.startJobRun(startJobRequest); return new DispatchQueryResponse(jobId, false, dataSourceMetadata.getResultIndex(), null); @@ -251,6 +264,7 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ session = createdSession.get(); } else { // create session if not exist + tags.put(JOB_TYPE_TAG_KEY, JobType.INTERACTIVE.getText()); session = sessionManager.createSession( new CreateSessionRequest( @@ -277,6 +291,7 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ dataSourceMetadata.getResultIndex(), session.getSessionId().getSessionId()); } else { + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); StartJobRequest startJobRequest = new StartJobRequest( dispatchQueryRequest.getQuery(), diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java index 1cc66da9fc..42e2905e67 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java @@ -5,56 +5,129 @@ package org.opensearch.sql.spark.dispatcher.model; -import lombok.AllArgsConstructor; -import lombok.Data; +import com.google.common.base.Preconditions; import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; +import lombok.Getter; +import org.apache.commons.lang3.StringUtils; import org.opensearch.sql.spark.flint.FlintIndexType; /** Index details in an async query. */ -@Data -@AllArgsConstructor -@NoArgsConstructor +@Getter @EqualsAndHashCode public class IndexDetails { + + public static final String STRIP_CHARS = "`"; + private String indexName; private FullyQualifiedTableName fullyQualifiedTableName; // by default, auto_refresh = false; - private Boolean autoRefresh = false; + private boolean autoRefresh; private boolean isDropIndex; + // materialized view special case where + // table name and mv name are combined. + private String mvName; private FlintIndexType indexType; + private IndexDetails() {} + + public static IndexDetailsBuilder builder() { + return new IndexDetailsBuilder(); + } + + // Builder class + public static class IndexDetailsBuilder { + private final IndexDetails indexDetails; + + public IndexDetailsBuilder() { + indexDetails = new IndexDetails(); + } + + public IndexDetailsBuilder indexName(String indexName) { + indexDetails.indexName = indexName; + return this; + } + + public IndexDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) { + indexDetails.fullyQualifiedTableName = tableName; + return this; + } + + public IndexDetailsBuilder autoRefresh(Boolean autoRefresh) { + indexDetails.autoRefresh = autoRefresh; + return this; + } + + public IndexDetailsBuilder isDropIndex(boolean isDropIndex) { + indexDetails.isDropIndex = isDropIndex; + return this; + } + + public IndexDetailsBuilder mvName(String mvName) { + indexDetails.mvName = mvName; + return this; + } + + public IndexDetailsBuilder indexType(FlintIndexType indexType) { + indexDetails.indexType = indexType; + return this; + } + + public IndexDetails build() { + Preconditions.checkNotNull(indexDetails.indexType, "Index Type can't be null"); + switch (indexDetails.indexType) { + case COVERING: + Preconditions.checkNotNull( + indexDetails.indexName, "IndexName can't be null for Covering Index."); + Preconditions.checkNotNull( + indexDetails.fullyQualifiedTableName, "TableName can't be null for Covering Index."); + break; + case SKIPPING: + Preconditions.checkNotNull( + indexDetails.fullyQualifiedTableName, "TableName can't be null for Skipping Index."); + break; + case MATERIALIZED_VIEW: + Preconditions.checkNotNull(indexDetails.mvName, "Materialized view name can't be null"); + break; + } + + return indexDetails; + } + } + public String openSearchIndexName() { FullyQualifiedTableName fullyQualifiedTableName = getFullyQualifiedTableName(); - if (FlintIndexType.SKIPPING.equals(getIndexType())) { - String indexName = - "flint" - + "_" - + fullyQualifiedTableName.getDatasourceName() - + "_" - + fullyQualifiedTableName.getSchemaName() - + "_" - + fullyQualifiedTableName.getTableName() - + "_" - + getIndexType().getSuffix(); - return indexName.toLowerCase(); - } else if (FlintIndexType.COVERING.equals(getIndexType())) { - String indexName = - "flint" - + "_" - + fullyQualifiedTableName.getDatasourceName() - + "_" - + fullyQualifiedTableName.getSchemaName() - + "_" - + fullyQualifiedTableName.getTableName() - + "_" - + getIndexName() - + "_" - + getIndexType().getSuffix(); - return indexName.toLowerCase(); - } else { - throw new UnsupportedOperationException( - String.format("Unsupported Index Type : %s", getIndexType())); + String indexName = StringUtils.EMPTY; + switch (getIndexType()) { + case COVERING: + indexName = + "flint" + + "_" + + StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS) + + "_" + + StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS) + + "_" + + StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS) + + "_" + + StringUtils.strip(getIndexName(), STRIP_CHARS) + + "_" + + getIndexType().getSuffix(); + break; + case SKIPPING: + indexName = + "flint" + + "_" + + StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS) + + "_" + + StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS) + + "_" + + StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS) + + "_" + + getIndexType().getSuffix(); + break; + case MATERIALIZED_VIEW: + indexName = "flint" + "_" + StringUtils.strip(getMvName(), STRIP_CHARS).toLowerCase(); + break; } + return indexName.toLowerCase(); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/JobType.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/JobType.java new file mode 100644 index 0000000000..01f5f422e9 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/JobType.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.dispatcher.model; + +public enum JobType { + INTERACTIVE("interactive"), + STREAMING("streaming"), + BATCH("batch"); + + private String text; + + JobType(String text) { + this.text = text; + } + + public String getText() { + return this.text; + } + + /** + * Get JobType from text. + * + * @param text text. + * @return JobType {@link JobType}. + */ + public static JobType fromString(String text) { + for (JobType JobType : JobType.values()) { + if (JobType.text.equalsIgnoreCase(text)) { + return JobType; + } + } + throw new IllegalArgumentException("No JobType with text " + text + " found"); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java b/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java index f6b75d49ef..4816f1c2cd 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java +++ b/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java @@ -52,7 +52,7 @@ public static IndexDetails extractIndexDetails(String sqlQuery) { flintSparkSqlExtensionsParser.statement(); FlintSQLIndexDetailsVisitor flintSQLIndexDetailsVisitor = new FlintSQLIndexDetailsVisitor(); statementContext.accept(flintSQLIndexDetailsVisitor); - return flintSQLIndexDetailsVisitor.getIndexDetails(); + return flintSQLIndexDetailsVisitor.getIndexDetailsBuilder().build(); } public static boolean isIndexQuery(String sqlQuery) { @@ -117,29 +117,29 @@ public Void visitCreateTableHeader(SqlBaseParser.CreateTableHeaderContext ctx) { public static class FlintSQLIndexDetailsVisitor extends FlintSparkSqlExtensionsBaseVisitor { - @Getter private final IndexDetails indexDetails; + @Getter private final IndexDetails.IndexDetailsBuilder indexDetailsBuilder; public FlintSQLIndexDetailsVisitor() { - this.indexDetails = new IndexDetails(); + this.indexDetailsBuilder = new IndexDetails.IndexDetailsBuilder(); } @Override public Void visitIndexName(FlintSparkSqlExtensionsParser.IndexNameContext ctx) { - indexDetails.setIndexName(ctx.getText()); + indexDetailsBuilder.indexName(ctx.getText()); return super.visitIndexName(ctx); } @Override public Void visitTableName(FlintSparkSqlExtensionsParser.TableNameContext ctx) { - indexDetails.setFullyQualifiedTableName(new FullyQualifiedTableName(ctx.getText())); + indexDetailsBuilder.fullyQualifiedTableName(new FullyQualifiedTableName(ctx.getText())); return super.visitTableName(ctx); } @Override public Void visitCreateSkippingIndexStatement( FlintSparkSqlExtensionsParser.CreateSkippingIndexStatementContext ctx) { - indexDetails.setDropIndex(false); - indexDetails.setIndexType(FlintIndexType.SKIPPING); + indexDetailsBuilder.isDropIndex(false); + indexDetailsBuilder.indexType(FlintIndexType.SKIPPING); visitPropertyList(ctx.propertyList()); return super.visitCreateSkippingIndexStatement(ctx); } @@ -147,28 +147,47 @@ public Void visitCreateSkippingIndexStatement( @Override public Void visitCreateCoveringIndexStatement( FlintSparkSqlExtensionsParser.CreateCoveringIndexStatementContext ctx) { - indexDetails.setDropIndex(false); - indexDetails.setIndexType(FlintIndexType.COVERING); + indexDetailsBuilder.isDropIndex(false); + indexDetailsBuilder.indexType(FlintIndexType.COVERING); visitPropertyList(ctx.propertyList()); return super.visitCreateCoveringIndexStatement(ctx); } + @Override + public Void visitCreateMaterializedViewStatement( + FlintSparkSqlExtensionsParser.CreateMaterializedViewStatementContext ctx) { + indexDetailsBuilder.isDropIndex(false); + indexDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); + indexDetailsBuilder.mvName(ctx.mvName.getText()); + visitPropertyList(ctx.propertyList()); + return super.visitCreateMaterializedViewStatement(ctx); + } + @Override public Void visitDropCoveringIndexStatement( FlintSparkSqlExtensionsParser.DropCoveringIndexStatementContext ctx) { - indexDetails.setDropIndex(true); - indexDetails.setIndexType(FlintIndexType.COVERING); + indexDetailsBuilder.isDropIndex(true); + indexDetailsBuilder.indexType(FlintIndexType.COVERING); return super.visitDropCoveringIndexStatement(ctx); } @Override public Void visitDropSkippingIndexStatement( FlintSparkSqlExtensionsParser.DropSkippingIndexStatementContext ctx) { - indexDetails.setDropIndex(true); - indexDetails.setIndexType(FlintIndexType.SKIPPING); + indexDetailsBuilder.isDropIndex(true); + indexDetailsBuilder.indexType(FlintIndexType.SKIPPING); return super.visitDropSkippingIndexStatement(ctx); } + @Override + public Void visitDropMaterializedViewStatement( + FlintSparkSqlExtensionsParser.DropMaterializedViewStatementContext ctx) { + indexDetailsBuilder.isDropIndex(true); + indexDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); + indexDetailsBuilder.mvName(ctx.mvName.getText()); + return super.visitDropMaterializedViewStatement(ctx); + } + @Override public Void visitPropertyList(FlintSparkSqlExtensionsParser.PropertyListContext ctx) { if (ctx != null) { @@ -180,7 +199,7 @@ public Void visitPropertyList(FlintSparkSqlExtensionsParser.PropertyListContext // https://github.com/apache/spark/blob/v3.5.0/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala#L35 to unescape string literal if (propertyKey(property.key).toLowerCase(Locale.ROOT).contains("auto_refresh")) { if (propertyValue(property.value).toLowerCase(Locale.ROOT).contains("true")) { - indexDetails.setAutoRefresh(true); + indexDetailsBuilder.autoRefresh(true); } } }); diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 15211dec01..02e53f8b58 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -65,6 +65,7 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; import org.opensearch.sql.spark.dispatcher.model.IndexDetails; +import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.session.Session; import org.opensearch.sql.spark.execution.session.SessionId; import org.opensearch.sql.spark.execution.session.SessionManager; @@ -119,6 +120,7 @@ void testDispatchSelectQuery() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); + tags.put("job_type", JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -173,6 +175,7 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); + tags.put("job_type", JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -228,6 +231,7 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); + tags.put("job_type", JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -358,10 +362,9 @@ void testDispatchSelectQueryFailedCreateSession() { void testDispatchIndexQuery() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); - tags.put("table", "http_logs"); - tags.put("index", "elb_and_requestUri"); + tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("schema", "default"); + tags.put("job_type", JobType.STREAMING.getText()); String query = "CREATE INDEX elb_and_requestUri ON my_glue.default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; @@ -419,7 +422,7 @@ void testDispatchWithPPLQuery() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); - + tags.put("job_type", JobType.BATCH.getText()); String query = "source = my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -474,7 +477,7 @@ void testDispatchQueryWithoutATableAndDataSourceName() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); - + tags.put("job_type", JobType.BATCH.getText()); String query = "show tables"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -528,11 +531,9 @@ void testDispatchQueryWithoutATableAndDataSourceName() { void testDispatchIndexQueryWithoutADatasourceName() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); - tags.put("table", "http_logs"); - tags.put("index", "elb_and_requestUri"); + tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("schema", "default"); - + tags.put("job_type", JobType.STREAMING.getText()); String query = "CREATE INDEX elb_and_requestUri ON default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; @@ -585,6 +586,65 @@ void testDispatchIndexQueryWithoutADatasourceName() { verifyNoInteractions(flintIndexMetadataReader); } + @Test + void testDispatchMaterializedViewQuery() { + HashMap tags = new HashMap<>(); + tags.put("datasource", "my_glue"); + tags.put("index", "flint_mv_1"); + tags.put("cluster", TEST_CLUSTER_NAME); + tags.put("job_type", JobType.STREAMING.getText()); + String query = + "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH" + + " (auto_refresh = true)"; + String sparkSubmitParameters = + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })); + when(emrServerlessClient.startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + sparkSubmitParameters, + tags, + true, + any()))) + .thenReturn(EMR_JOB_ID); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); + StartJobRequest expected = + new StartJobRequest( + query, + "TEST_CLUSTER:index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + sparkSubmitParameters, + tags, + true, + null); + Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); + verifyNoInteractions(flintIndexMetadataReader); + } + @Test void testDispatchWithWrongURI() { when(dataSourceService.getRawDataSourceMetadata("my_glue")) @@ -836,13 +896,15 @@ void testGetQueryResponseOfDropIndex() { @Test void testDropIndexQuery() throws ExecutionException, InterruptedException { String query = "DROP INDEX size_year ON my_glue.default.http_logs"; - when(flintIndexMetadataReader.getFlintIndexMetadata( - new IndexDetails( - "size_year", - new FullyQualifiedTableName("my_glue.default.http_logs"), - false, - true, - FlintIndexType.COVERING))) + IndexDetails indexDetails = + IndexDetails.builder() + .indexName("size_year") + .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) + .autoRefresh(false) + .isDropIndex(true) + .indexType(FlintIndexType.COVERING) + .build(); + when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); // auto_refresh == true @@ -871,15 +933,7 @@ void testDropIndexQuery() throws ExecutionException, InterruptedException { TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)) - .getFlintIndexMetadata( - new IndexDetails( - "size_year", - new FullyQualifiedTableName("my_glue.default.http_logs"), - false, - true, - FlintIndexType.COVERING)); - + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); @@ -889,13 +943,14 @@ void testDropIndexQuery() throws ExecutionException, InterruptedException { @Test void testDropSkippingIndexQuery() throws ExecutionException, InterruptedException { String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - when(flintIndexMetadataReader.getFlintIndexMetadata( - new IndexDetails( - null, - new FullyQualifiedTableName("my_glue.default.http_logs"), - false, - true, - FlintIndexType.SKIPPING))) + IndexDetails indexDetails = + IndexDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) + .autoRefresh(false) + .isDropIndex(true) + .indexType(FlintIndexType.SKIPPING) + .build(); + when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); @@ -922,14 +977,7 @@ void testDropSkippingIndexQuery() throws ExecutionException, InterruptedExceptio TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)) - .getFlintIndexMetadata( - new IndexDetails( - null, - new FullyQualifiedTableName("my_glue.default.http_logs"), - false, - true, - FlintIndexType.SKIPPING)); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); @@ -940,13 +988,14 @@ void testDropSkippingIndexQuery() throws ExecutionException, InterruptedExceptio void testDropSkippingIndexQueryAutoRefreshFalse() throws ExecutionException, InterruptedException { String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - when(flintIndexMetadataReader.getFlintIndexMetadata( - new IndexDetails( - null, - new FullyQualifiedTableName("my_glue.default.http_logs"), - false, - true, - FlintIndexType.SKIPPING))) + IndexDetails indexDetails = + IndexDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) + .autoRefresh(false) + .isDropIndex(true) + .indexType(FlintIndexType.SKIPPING) + .build(); + when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.isAutoRefresh()).thenReturn(false); @@ -967,14 +1016,7 @@ void testDropSkippingIndexQueryAutoRefreshFalse() TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)) - .getFlintIndexMetadata( - new IndexDetails( - null, - new FullyQualifiedTableName("my_glue.default.http_logs"), - false, - true, - FlintIndexType.SKIPPING)); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); @@ -985,13 +1027,14 @@ void testDropSkippingIndexQueryAutoRefreshFalse() void testDropSkippingIndexQueryDeleteIndexException() throws ExecutionException, InterruptedException { String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - when(flintIndexMetadataReader.getFlintIndexMetadata( - new IndexDetails( - null, - new FullyQualifiedTableName("my_glue.default.http_logs"), - false, - true, - FlintIndexType.SKIPPING))) + IndexDetails indexDetails = + IndexDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) + .autoRefresh(false) + .isDropIndex(true) + .indexType(FlintIndexType.SKIPPING) + .build(); + when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.isAutoRefresh()).thenReturn(false); @@ -1013,14 +1056,7 @@ void testDropSkippingIndexQueryDeleteIndexException() TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)) - .getFlintIndexMetadata( - new IndexDetails( - null, - new FullyQualifiedTableName("my_glue.default.http_logs"), - false, - true, - FlintIndexType.SKIPPING)); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.FAILED.toString(), dropIndexResult.getStatus()); @@ -1030,6 +1066,52 @@ void testDropSkippingIndexQueryDeleteIndexException() Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); } + @Test + void testDropMVQuery() throws ExecutionException, InterruptedException { + String query = "DROP MATERIALIZED VIEW mv_1"; + IndexDetails indexDetails = + IndexDetails.builder() + .mvName("mv_1") + .isDropIndex(true) + .fullyQualifiedTableName(null) + .indexType(FlintIndexType.MATERIALIZED_VIEW) + .build(); + when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) + .thenReturn(flintIndexMetadata); + when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); + // auto_refresh == true + when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); + + when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)) + .thenReturn( + new CancelJobRunResult() + .withJobRunId(EMR_JOB_ID) + .withApplicationId(EMRS_APPLICATION_ID)); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + + AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); + when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); + when(acknowledgedResponse.isAcknowledged()).thenReturn(true); + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); + verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); + Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); + Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); + } + @Test void testDispatchQueryWithExtraSparkSubmitParameters() { DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java index b0c8491b0b..3cc40e0df5 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java @@ -44,12 +44,12 @@ void testGetJobIdFromFlintSkippingIndexMetadata() { FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata( - new IndexDetails( - null, - new FullyQualifiedTableName("mys3.default.http_logs"), - false, - true, - FlintIndexType.SKIPPING)); + IndexDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .autoRefresh(false) + .isDropIndex(true) + .indexType(FlintIndexType.SKIPPING) + .build()); Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId()); } @@ -64,12 +64,13 @@ void testGetJobIdFromFlintCoveringIndexMetadata() { FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata( - new IndexDetails( - "cv1", - new FullyQualifiedTableName("mys3.default.http_logs"), - false, - true, - FlintIndexType.COVERING)); + IndexDetails.builder() + .indexName("cv1") + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .autoRefresh(false) + .isDropIndex(true) + .indexType(FlintIndexType.COVERING) + .build()); Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId()); } @@ -86,34 +87,17 @@ void testGetJobIDWithNPEException() { IllegalArgumentException.class, () -> flintIndexMetadataReader.getFlintIndexMetadata( - new IndexDetails( - "cv1", - new FullyQualifiedTableName("mys3.default.http_logs"), - false, - true, - FlintIndexType.COVERING))); + IndexDetails.builder() + .indexName("cv1") + .fullyQualifiedTableName( + new FullyQualifiedTableName("mys3.default.http_logs")) + .autoRefresh(false) + .isDropIndex(true) + .indexType(FlintIndexType.COVERING) + .build())); Assertions.assertEquals("Provided Index doesn't exist", illegalArgumentException.getMessage()); } - @SneakyThrows - @Test - void testGetJobIdFromUnsupportedIndex() { - FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); - UnsupportedOperationException unsupportedOperationException = - Assertions.assertThrows( - UnsupportedOperationException.class, - () -> - flintIndexMetadataReader.getFlintIndexMetadata( - new IndexDetails( - "cv1", - new FullyQualifiedTableName("mys3.default.http_logs"), - false, - true, - FlintIndexType.MATERIALIZED_VIEW))); - Assertions.assertEquals( - "Unsupported Index Type : MATERIALIZED_VIEW", unsupportedOperationException.getMessage()); - } - @SneakyThrows public void mockNodeClientIndicesMappings(String indexName, String mappings) { GetMappingsResponse mockResponse = mock(GetMappingsResponse.class); diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java index 46fa4f7dbe..cf6b5f8f2b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java @@ -16,12 +16,13 @@ public class IndexDetailsTest { public void skippingIndexName() { assertEquals( "flint_mys3_default_http_logs_skipping_index", - new IndexDetails( - "invalid", - new FullyQualifiedTableName("mys3.default.http_logs"), - false, - true, - FlintIndexType.SKIPPING) + IndexDetails.builder() + .indexName("invalid") + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .autoRefresh(false) + .isDropIndex(true) + .indexType(FlintIndexType.SKIPPING) + .build() .openSearchIndexName()); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java index af892fa097..01759c2bdd 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java @@ -6,6 +6,7 @@ package org.opensearch.sql.spark.utils; import static org.opensearch.sql.spark.utils.SQLQueryUtilsTest.IndexQuery.index; +import static org.opensearch.sql.spark.utils.SQLQueryUtilsTest.IndexQuery.mv; import static org.opensearch.sql.spark.utils.SQLQueryUtilsTest.IndexQuery.skippingIndex; import lombok.Getter; @@ -112,50 +113,67 @@ void testExtractionFromFlintIndexQueries() { Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName()); } + @Test + void testExtractionFromFlintMVQuery() { + String createCoveredIndexQuery = + "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH" + + " (auto_refresh = true)"; + Assertions.assertTrue(SQLQueryUtils.isIndexQuery(createCoveredIndexQuery)); + IndexDetails indexDetails = SQLQueryUtils.extractIndexDetails(createCoveredIndexQuery); + FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + Assertions.assertNull(indexDetails.getIndexName()); + Assertions.assertNull(fullyQualifiedTableName); + Assertions.assertEquals("mv_1", indexDetails.getMvName()); + } + /** https://github.com/opensearch-project/sql/issues/2206 */ @Test void testAutoRefresh() { Assertions.assertFalse( - SQLQueryUtils.extractIndexDetails(skippingIndex().getQuery()).getAutoRefresh()); + SQLQueryUtils.extractIndexDetails(skippingIndex().getQuery()).isAutoRefresh()); Assertions.assertFalse( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("auto_refresh", "false").getQuery()) - .getAutoRefresh()); + .isAutoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("auto_refresh", "true").getQuery()) - .getAutoRefresh()); + .isAutoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("\"auto_refresh\"", "true").getQuery()) - .getAutoRefresh()); + .isAutoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("\"auto_refresh\"", "\"true\"").getQuery()) - .getAutoRefresh()); + .isAutoRefresh()); Assertions.assertFalse( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("auto_refresh", "1").getQuery()) - .getAutoRefresh()); + .isAutoRefresh()); Assertions.assertFalse( SQLQueryUtils.extractIndexDetails(skippingIndex().withProperty("interval", "1").getQuery()) - .getAutoRefresh()); + .isAutoRefresh()); - Assertions.assertFalse(SQLQueryUtils.extractIndexDetails(index().getQuery()).getAutoRefresh()); + Assertions.assertFalse(SQLQueryUtils.extractIndexDetails(index().getQuery()).isAutoRefresh()); Assertions.assertFalse( SQLQueryUtils.extractIndexDetails(index().withProperty("auto_refresh", "false").getQuery()) - .getAutoRefresh()); + .isAutoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails(index().withProperty("auto_refresh", "true").getQuery()) - .getAutoRefresh()); + .isAutoRefresh()); + + Assertions.assertTrue( + SQLQueryUtils.extractIndexDetails(mv().withProperty("auto_refresh", "true").getQuery()) + .isAutoRefresh()); } @Getter @@ -176,6 +194,11 @@ public static IndexQuery index() { "CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, " + "l_quantity)"); } + public static IndexQuery mv() { + return new IndexQuery( + "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs"); + } + public IndexQuery withProperty(String key, String value) { query = String.format("%s with (%s = %s)", query, key, value); return this;