From 6c3fa87b9acaaddc65b05f3a44a5f189f4becc08 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 28 Aug 2023 10:46:40 -0700 Subject: [PATCH 01/11] Refactor index builder Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 41 +++++-------------- .../flint/spark/FlintSparkIndexBuilder.scala | 32 +++++++++++++++ 2 files changed, 43 insertions(+), 30 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index c4f9080bb..036a20a36 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -42,8 +42,7 @@ class FlintSpark(val spark: SparkSession) { FlintSparkConf( Map( DOC_ID_COLUMN_NAME.optionKey -> ID_COLUMN, - IGNORE_DOC_ID_COLUMN.optionKey -> "true" - ).asJava) + IGNORE_DOC_ID_COLUMN.optionKey -> "true").asJava) /** Flint client for low-level index operation */ private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions()) @@ -57,8 +56,8 @@ class FlintSpark(val spark: SparkSession) { * @return * index builder */ - def skippingIndex(): IndexBuilder = { - new IndexBuilder(this) + def skippingIndex(): SkippingIndexBuilder = { + new SkippingIndexBuilder(this) } /** @@ -240,17 +239,8 @@ object FlintSpark { /** * Helper class for index class construct. For now only skipping index supported. */ - class IndexBuilder(flint: FlintSpark) { - var tableName: String = "" - var indexedColumns: Seq[FlintSparkSkippingStrategy] = Seq() - - lazy val allColumns: Map[String, Column] = { - flint.spark.catalog - .listColumns(tableName) - .collect() - .map(col => (col.name, col)) - .toMap - } + class SkippingIndexBuilder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { + private var indexedColumns: Seq[FlintSparkSkippingStrategy] = Seq() /** * Configure which source table the index is based on. @@ -260,7 +250,7 @@ object FlintSpark { * @return * index builder */ - def onTable(tableName: String): IndexBuilder = { + def onTable(tableName: String): SkippingIndexBuilder = { this.tableName = tableName this } @@ -273,7 +263,7 @@ object FlintSpark { * @return * index builder */ - def addPartitions(colNames: String*): IndexBuilder = { + def addPartitions(colNames: String*): SkippingIndexBuilder = { require(tableName.nonEmpty, "table name cannot be empty") colNames @@ -291,7 +281,7 @@ object FlintSpark { * @return * index builder */ - def addValueSet(colName: String): IndexBuilder = { + def addValueSet(colName: String): SkippingIndexBuilder = { require(tableName.nonEmpty, "table name cannot be empty") val col = findColumn(colName) @@ -307,24 +297,15 @@ object FlintSpark { * @return * index builder */ - def addMinMax(colName: String): IndexBuilder = { + def addMinMax(colName: String): SkippingIndexBuilder = { val col = findColumn(colName) indexedColumns = indexedColumns :+ MinMaxSkippingStrategy(columnName = col.name, columnType = col.dataType) this } - /** - * Create index. - */ - def create(): Unit = { - flint.createIndex(new FlintSparkSkippingIndex(tableName, indexedColumns)) - } - - private def findColumn(colName: String): Column = - allColumns.getOrElse( - colName, - throw new IllegalArgumentException(s"Column $colName does not exist")) + override def buildIndex(): FlintSparkIndex = + new FlintSparkSkippingIndex(tableName, indexedColumns) private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = { require( diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala new file mode 100644 index 000000000..e4350f802 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.apache.spark.sql.catalog.Column + +abstract class FlintSparkIndexBuilder(flint: FlintSpark) { + + protected var tableName: String = "" + + lazy val allColumns: Map[String, Column] = { + require(tableName.nonEmpty, "Source table name is not provided") + + flint.spark.catalog + .listColumns(tableName) + .collect() + .map(col => (col.name, col)) + .toMap + } + + def create(): Unit = flint.createIndex(buildIndex()) + + protected def buildIndex(): FlintSparkIndex + + protected def findColumn(colName: String): Column = + allColumns.getOrElse( + colName, + throw new IllegalArgumentException(s"Column $colName does not exist")) +} From d627f382051626e47736f5562b343a30103acc85 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 28 Aug 2023 11:09:56 -0700 Subject: [PATCH 02/11] Add covering index builder and empty impl Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 100 +++--------------- .../flint/spark/FlintSparkIndexBuilder.scala | 14 +++ .../covering/FlintSparkCoveringIndex.scala | 97 +++++++++++++++++ .../skipping/FlintSparkSkippingIndex.scala | 85 ++++++++++++++- .../FlintSparkCoveringIndexITSuite.scala | 58 ++++++++++ .../flint/spark/FlintSparkIndexSuite.scala | 28 +++++ 6 files changed, 294 insertions(+), 88 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSuite.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 036a20a36..e581a5217 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -12,10 +12,10 @@ import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.FlintSpark._ import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN -import org.opensearch.flint.spark.skipping.{FlintSparkSkippingIndex, FlintSparkSkippingStrategy} +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} @@ -25,12 +25,10 @@ import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.SaveMode._ -import org.apache.spark.sql.catalog.Column import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN} import org.apache.spark.sql.streaming.OutputMode.Append -import org.apache.spark.sql.streaming.StreamingQuery /** * Flint Spark integration API entrypoint. @@ -56,8 +54,18 @@ class FlintSpark(val spark: SparkSession) { * @return * index builder */ - def skippingIndex(): SkippingIndexBuilder = { - new SkippingIndexBuilder(this) + def skippingIndex(): FlintSparkSkippingIndex.Builder = { + new FlintSparkSkippingIndex.Builder(this) + } + + /** + * Create index builder for creating index with fluent API. + * + * @return + * index builder + */ + def coveringIndex(): FlintSparkCoveringIndex.Builder = { + new FlintSparkCoveringIndex.Builder(this) } /** @@ -235,84 +243,4 @@ object FlintSpark { type RefreshMode = Value val FULL, INCREMENTAL = Value } - - /** - * Helper class for index class construct. For now only skipping index supported. - */ - class SkippingIndexBuilder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { - private var indexedColumns: Seq[FlintSparkSkippingStrategy] = Seq() - - /** - * Configure which source table the index is based on. - * - * @param tableName - * full table name - * @return - * index builder - */ - def onTable(tableName: String): SkippingIndexBuilder = { - this.tableName = tableName - this - } - - /** - * Add partition skipping indexed columns. - * - * @param colNames - * indexed column names - * @return - * index builder - */ - def addPartitions(colNames: String*): SkippingIndexBuilder = { - require(tableName.nonEmpty, "table name cannot be empty") - - colNames - .map(findColumn) - .map(col => PartitionSkippingStrategy(columnName = col.name, columnType = col.dataType)) - .foreach(addIndexedColumn) - this - } - - /** - * Add value set skipping indexed column. - * - * @param colName - * indexed column name - * @return - * index builder - */ - def addValueSet(colName: String): SkippingIndexBuilder = { - require(tableName.nonEmpty, "table name cannot be empty") - - val col = findColumn(colName) - addIndexedColumn(ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType)) - this - } - - /** - * Add min max skipping indexed column. - * - * @param colName - * indexed column name - * @return - * index builder - */ - def addMinMax(colName: String): SkippingIndexBuilder = { - val col = findColumn(colName) - indexedColumns = - indexedColumns :+ MinMaxSkippingStrategy(columnName = col.name, columnType = col.dataType) - this - } - - override def buildIndex(): FlintSparkIndex = - new FlintSparkSkippingIndex(tableName, indexedColumns) - - private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = { - require( - indexedColumns.forall(_.columnName != indexedCol.columnName), - s"${indexedCol.columnName} is already indexed") - - indexedColumns = indexedColumns :+ indexedCol - } - } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index e4350f802..ceb6dae8b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -7,10 +7,18 @@ package org.opensearch.flint.spark import org.apache.spark.sql.catalog.Column +/** + * Flint Spark index builder base class. + * + * @param flint + * Flint Spark API entrypoint + */ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { + /** Source table name */ protected var tableName: String = "" + /** All columns of the given source table */ lazy val allColumns: Map[String, Column] = { require(tableName.nonEmpty, "Source table name is not provided") @@ -21,8 +29,14 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { .toMap } + /** + * Create Flint index. + */ def create(): Unit = flint.createIndex(buildIndex()) + /** + * Build method for concrete builder class to implement + */ protected def buildIndex(): FlintSparkIndex protected def findColumn(colName: String): Column = diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala new file mode 100644 index 000000000..fa9cb93ad --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -0,0 +1,97 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.covering + +import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder} +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE + +import org.apache.spark.sql.DataFrame + +class FlintSparkCoveringIndex( + override val name: String, + val tableName: String, + val indexedColumns: Map[String, String]) + extends FlintSparkIndex { + + require(indexedColumns.nonEmpty, "indexed columns must not be empty") + + override val kind: String = COVERING_INDEX_TYPE + + override def metadata(): FlintMetadata = { + new FlintMetadata(s"""{ + | "_meta": { + | "kind": "$kind", + | "indexedColumns": $indexedColumns, + | "source": "$tableName" + | }, + | "properties": $getSchema + | } + |""".stripMargin) + } + + override def build(df: DataFrame): DataFrame = { + null + } + + private def getSchema: String = { + "" + } +} + +object FlintSparkCoveringIndex { + + /** Covering index type name */ + val COVERING_INDEX_TYPE = "covering" + + /** Builder class for covering index build */ + class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { + private var indexName: String = "" + private var indexedColumns: Map[String, String] = Map() + + /** + * Set covering index name. + * + * @param indexName + * index name + * @return + * index builder + */ + def indexName(indexName: String): Builder = { + this.indexName = indexName + this + } + + /** + * Configure which source table the index is based on. + * + * @param tableName + * full table name + * @return + * index builder + */ + def onTable(tableName: String): Builder = { + this.tableName = tableName + this + } + + /** + * Add indexed column name. + * + * @param colName + * column name + * @return + * index builder + */ + def addIndexColumn(colName: String): Builder = { + indexedColumns += (colName -> findColumn(colName).dataType) + this + } + + override protected def buildIndex(): FlintSparkIndex = + new FlintSparkCoveringIndex(indexName, tableName, indexedColumns) + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 14d0a0090..fe1a2cb87 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -9,16 +9,19 @@ import org.json4s._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer +import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy +import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression import org.apache.spark.sql.flint.datatype.FlintDataType import org.apache.spark.sql.functions.{col, input_file_name, sha1} -import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} +import org.apache.spark.sql.types.StructType /** * Flint skipping index in Spark. @@ -113,4 +116,82 @@ object FlintSparkSkippingIndex { s"flint_${tableName.replace(".", "_")}_skipping_index" } + + /** Builder class for skipping index build */ + class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { + private var indexedColumns: Seq[FlintSparkSkippingStrategy] = Seq() + + /** + * Configure which source table the index is based on. + * + * @param tableName + * full table name + * @return + * index builder + */ + def onTable(tableName: String): Builder = { + this.tableName = tableName + this + } + + /** + * Add partition skipping indexed columns. + * + * @param colNames + * indexed column names + * @return + * index builder + */ + def addPartitions(colNames: String*): Builder = { + require(tableName.nonEmpty, "table name cannot be empty") + + colNames + .map(findColumn) + .map(col => PartitionSkippingStrategy(columnName = col.name, columnType = col.dataType)) + .foreach(addIndexedColumn) + this + } + + /** + * Add value set skipping indexed column. + * + * @param colName + * indexed column name + * @return + * index builder + */ + def addValueSet(colName: String): Builder = { + require(tableName.nonEmpty, "table name cannot be empty") + + val col = findColumn(colName) + addIndexedColumn(ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType)) + this + } + + /** + * Add min max skipping indexed column. + * + * @param colName + * indexed column name + * @return + * index builder + */ + def addMinMax(colName: String): Builder = { + val col = findColumn(colName) + indexedColumns = + indexedColumns :+ MinMaxSkippingStrategy(columnName = col.name, columnType = col.dataType) + this + } + + override def buildIndex(): FlintSparkIndex = + new FlintSparkSkippingIndex(tableName, indexedColumns) + + private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = { + require( + indexedColumns.forall(_.columnName != indexedCol.columnName), + s"${indexedCol.columnName} is already indexed") + + indexedColumns = indexedColumns :+ indexedCol + } + } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala new file mode 100644 index 000000000..95d36c6a5 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +class FlintSparkCoveringIndexITSuite extends FlintSparkIndexSuite { + + /** Test table and index name */ + private val testTable = "default.test" + private val testIndex = "test" + + override def beforeAll(): Unit = { + super.beforeAll() + + sql(s""" + | CREATE TABLE $testTable + | ( + | name STRING, + | age INT, + | address STRING + | ) + | USING CSV + | OPTIONS ( + | header 'false', + | delimiter '\t' + | ) + | PARTITIONED BY ( + | year INT, + | month INT + | ) + |""".stripMargin) + + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 30, 'Seattle') + | """.stripMargin) + + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=5) + | VALUES ('World', 25, 'Portland') + | """.stripMargin) + } + + override def afterEach(): Unit = { + super.afterEach() + + // Delete all test indices + flint.deleteIndex(testIndex) + } + + test("create covering index with metadata successfully") { + + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSuite.scala new file mode 100644 index 000000000..e6facd1bd --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSuite.scala @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.opensearch.flint.OpenSearchSuite + +import org.apache.spark.FlintSuite +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} +import org.apache.spark.sql.streaming.StreamTest + +trait FlintSparkIndexSuite + extends QueryTest + with FlintSuite + with OpenSearchSuite + with StreamTest { + + /** Flint Spark high level API being tested */ + lazy protected val flint: FlintSpark = { + setFlintSparkConf(HOST_ENDPOINT, openSearchHost) + setFlintSparkConf(HOST_PORT, openSearchPort) + setFlintSparkConf(REFRESH_POLICY, "true") + new FlintSpark(spark) + } +} From aab7c88e2a5767efdfe578a33dc8c6da59996e87 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 29 Aug 2023 11:02:13 -0700 Subject: [PATCH 03/11] Implement covering index metadata Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 9 ++++ .../covering/FlintSparkCoveringIndex.scala | 49 ++++++++++++++++--- .../skipping/FlintSparkSkippingIndex.scala | 3 ++ .../FlintSparkSkippingIndexSuite.scala | 11 +++++ .../FlintSparkCoveringIndexITSuite.scala | 43 ++++++++++++++-- .../FlintSparkSkippingIndexITSuite.scala | 23 ++------- ...IndexSuite.scala => FlintSparkSuite.scala} | 5 +- 7 files changed, 114 insertions(+), 29 deletions(-) rename integ-test/src/test/scala/org/opensearch/flint/spark/{FlintSparkIndexSuite.scala => FlintSparkSuite.scala} (87%) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index e581a5217..8058f9bff 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -15,6 +15,7 @@ import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer} @@ -206,6 +207,7 @@ class FlintSpark(val spark: SparkSession) { */ private def deserialize(metadata: FlintMetadata): FlintSparkIndex = { val meta = parse(metadata.getContent) \ "_meta" + val indexName = (meta \ "name").extract[String] val tableName = (meta \ "source").extract[String] val indexType = (meta \ "kind").extract[String] val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray] @@ -229,6 +231,13 @@ class FlintSpark(val spark: SparkSession) { } } new FlintSparkSkippingIndex(tableName, strategies) + case COVERING_INDEX_TYPE => + new FlintSparkCoveringIndex( + indexName, + tableName, + indexedColumns.arr.map { obj => + ((obj \ "columnName").extract[String], (obj \ "columnType").extract[String]) + }.toMap) } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index fa9cb93ad..f2a390436 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -5,12 +5,28 @@ package org.opensearch.flint.spark.covering +import org.json4s.{Formats, NoTypeHints} +import org.json4s.JsonAST.{JArray, JObject, JString} +import org.json4s.native.JsonMethods.{compact, parse, render} +import org.json4s.native.Serialization import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE import org.apache.spark.sql.DataFrame - +import org.apache.spark.sql.flint.datatype.FlintDataType +import org.apache.spark.sql.types.StructType + +/** + * Flint covering index in Spark. + * + * @param name + * index name + * @param tableName + * source table name + * @param indexedColumns + * indexed column list + */ class FlintSparkCoveringIndex( override val name: String, val tableName: String, @@ -19,13 +35,17 @@ class FlintSparkCoveringIndex( require(indexedColumns.nonEmpty, "indexed columns must not be empty") + /** Required by json4s write function */ + implicit val formats: Formats = Serialization.formats(NoTypeHints) + override val kind: String = COVERING_INDEX_TYPE override def metadata(): FlintMetadata = { new FlintMetadata(s"""{ | "_meta": { + | "name": "$name", | "kind": "$kind", - | "indexedColumns": $indexedColumns, + | "indexedColumns": $getMetaInfo, | "source": "$tableName" | }, | "properties": $getSchema @@ -34,11 +54,24 @@ class FlintSparkCoveringIndex( } override def build(df: DataFrame): DataFrame = { + // TODO: implement later null } + private def getMetaInfo: String = { + val objects = indexedColumns.map { case (colName, colVal) => + JObject("columnName" -> JString(colName), "columnType" -> JString(colVal)) + }.toList + Serialization.write(JArray(objects)) + } + private def getSchema: String = { - "" + val catalogDDL = + indexedColumns + .map { case (colName, colType) => s"$colName $colType not null" } + .mkString(",") + val properties = FlintDataType.serialize(StructType.fromDDL(catalogDDL)) + compact(render(parse(properties) \ "properties")) } } @@ -81,13 +114,15 @@ object FlintSparkCoveringIndex { /** * Add indexed column name. * - * @param colName - * column name + * @param colNames + * column names * @return * index builder */ - def addIndexColumn(colName: String): Builder = { - indexedColumns += (colName -> findColumn(colName).dataType) + def addIndexColumn(colNames: String*): Builder = { + colNames.foreach(colName => { + indexedColumns += (colName -> findColumn(colName).dataType) + }) this } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index fe1a2cb87..4c0d3983c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -28,6 +28,8 @@ import org.apache.spark.sql.types.StructType * * @param tableName * source table name + * @param indexedColumns + * indexed column list */ class FlintSparkSkippingIndex( tableName: String, @@ -49,6 +51,7 @@ class FlintSparkSkippingIndex( override def metadata(): FlintMetadata = { new FlintMetadata(s"""{ | "_meta": { + | "name": "${name()}", | "kind": "$SKIPPING_INDEX_TYPE", | "indexedColumns": $getMetaInfo, | "source": "$tableName" diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index e95ac6f05..8f9383f7d 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -44,6 +44,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.metadata().getContent should matchJson( s"""{ | "_meta": { + | "name": "flint_default_test_skipping_index", | "kind": "skipping", | "indexedColumns": [{}], | "source": "default.test" @@ -69,6 +70,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.metadata().getContent should matchJson( s"""{ | "_meta": { + | "name": "flint_default_test_skipping_index", | "kind": "skipping", | "indexedColumns": [{}], | "source": "default.test" @@ -96,6 +98,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.metadata().getContent should matchJson( s"""{ | "_meta": { + | "name": "flint_default_test_skipping_index", | "kind": "skipping", | "indexedColumns": [{}], | "source": "default.test" @@ -121,6 +124,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.metadata().getContent should matchJson( s"""{ | "_meta": { + | "name": "flint_default_test_skipping_index", | "kind": "skipping", | "indexedColumns": [{}], | "source": "default.test" @@ -146,6 +150,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.metadata().getContent should matchJson( s"""{ | "_meta": { + | "name": "flint_default_test_skipping_index", | "kind": "skipping", | "indexedColumns": [{}], | "source": "default.test" @@ -171,6 +176,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.metadata().getContent should matchJson( s"""{ | "_meta": { + | "name": "flint_default_test_skipping_index", | "kind": "skipping", | "indexedColumns": [{}], | "source": "default.test" @@ -196,6 +202,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.metadata().getContent should matchJson( s"""{ | "_meta": { + | "name": "flint_default_test_skipping_index", | "kind": "skipping", | "indexedColumns": [{}], | "source": "default.test" @@ -221,6 +228,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.metadata().getContent should matchJson( s"""{ | "_meta": { + | "name": "flint_default_test_skipping_index", | "kind": "skipping", | "indexedColumns": [{}], | "source": "default.test" @@ -246,6 +254,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.metadata().getContent should matchJson( s"""{ | "_meta": { + | "name": "flint_default_test_skipping_index", | "kind": "skipping", | "indexedColumns": [{}], | "source": "default.test" @@ -272,6 +281,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.metadata().getContent should matchJson( s"""{ | "_meta": { + | "name": "flint_default_test_skipping_index", | "kind": "skipping", | "indexedColumns": [{}], | "source": "default.test" @@ -299,6 +309,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.metadata().getContent should matchJson( s"""{ | "_meta": { + | "name": "flint_default_test_skipping_index", | "kind": "skipping", | "indexedColumns": [{}], | "source": "default.test" diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 95d36c6a5..244362160 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -5,11 +5,15 @@ package org.opensearch.flint.spark -class FlintSparkCoveringIndexITSuite extends FlintSparkIndexSuite { +import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { /** Test table and index name */ - private val testTable = "default.test" - private val testIndex = "test" + private val testTable = "default.test_ci" + private val testIndex = "ci" override def beforeAll(): Unit = { super.beforeAll() @@ -53,6 +57,39 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkIndexSuite { } test("create covering index with metadata successfully") { + flint + .coveringIndex() + .indexName(testIndex) + .onTable(testTable) + .addIndexColumn("name", "age") + .create() + val index = flint.describeIndex(testIndex) + index shouldBe defined + index.get.metadata().getContent should matchJson(s"""{ + | "_meta": { + | "name": "ci", + | "kind": "covering", + | "indexedColumns": [ + | { + | "columnName": "name", + | "columnType": "string" + | }, + | { + | "columnName": "age", + | "columnType": "int" + | }], + | "source": "default.test_ci" + | }, + | "properties": { + | "name": { + | "type": "keyword" + | }, + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 7f139e539..8c9336ca0 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -6,7 +6,6 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson -import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.FlintSparkSkippingFileIndex @@ -15,27 +14,13 @@ import org.scalatest.matchers.{Matcher, MatchResult} import org.scalatest.matchers.must.Matchers._ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper -import org.apache.spark.FlintSuite -import org.apache.spark.sql.{Column, QueryTest, Row} +import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.flint.config.FlintSparkConf._ import org.apache.spark.sql.functions.col -import org.apache.spark.sql.streaming.StreamTest - -class FlintSparkSkippingIndexITSuite - extends QueryTest - with FlintSuite - with OpenSearchSuite - with StreamTest { - - /** Flint Spark high level API being tested */ - lazy val flint: FlintSpark = { - setFlintSparkConf(HOST_ENDPOINT, openSearchHost) - setFlintSparkConf(HOST_PORT, openSearchPort) - setFlintSparkConf(REFRESH_POLICY, "true") - new FlintSpark(spark) - } + +class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { /** Test table and index name */ private val testTable = "default.test" @@ -96,6 +81,7 @@ class FlintSparkSkippingIndexITSuite index shouldBe defined index.get.metadata().getContent should matchJson(s"""{ | "_meta": { + | "name": "flint_default_test_skipping_index", | "kind": "skipping", | "indexedColumns": [ | { @@ -449,6 +435,7 @@ class FlintSparkSkippingIndexITSuite index.get.metadata().getContent should matchJson( s"""{ | "_meta": { + | "name": "flint_default_data_type_table_skipping_index", | "kind": "skipping", | "indexedColumns": [ | { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala similarity index 87% rename from integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSuite.scala rename to integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index e6facd1bd..9169c9434 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -12,7 +12,10 @@ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} import org.apache.spark.sql.streaming.StreamTest -trait FlintSparkIndexSuite +/** + * Flint Spark suite trait that initializes [[FlintSpark]] API instance. + */ +trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite From a7d7b74854b80fc5f3afd6a0dce0709d87e49e2a Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 29 Aug 2023 13:40:30 -0700 Subject: [PATCH 04/11] Implement covering index build Signed-off-by: Chen Dai --- .../covering/FlintSparkCoveringIndex.scala | 4 +- .../FlintSparkCoveringIndexITSuite.scala | 37 +++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index f2a390436..54d55b630 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -54,8 +54,8 @@ class FlintSparkCoveringIndex( } override def build(df: DataFrame): DataFrame = { - // TODO: implement later - null + val colNames = indexedColumns.keys.toSeq + df.select(colNames.head, colNames.tail: _*) } private def getMetaInfo: String = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 244362160..d6f223ce8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -6,9 +6,12 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.apache.spark.sql.Row + class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { /** Test table and index name */ @@ -92,4 +95,38 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { | } |""".stripMargin) } + + test("full refresh covering index successfully") { + flint + .coveringIndex() + .indexName(testIndex) + .onTable(testTable) + .addIndexColumn("name", "age") + .create() + + flint.refreshIndex(testIndex, FULL) + + val indexData = flint.queryIndex(testIndex) + checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) + } + + test("incremental refresh covering index successfully") { + flint + .coveringIndex() + .indexName(testIndex) + .onTable(testTable) + .addIndexColumn("name", "age") + .create() + + val jobId = flint.refreshIndex(testIndex, INCREMENTAL) + jobId shouldBe defined + + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + val indexData = flint.queryIndex(testIndex) + checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) + } } From cca6b660c3635ead9ab40d9a9e3b1f0ad7a03758 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 29 Aug 2023 16:00:58 -0700 Subject: [PATCH 05/11] Refactor IT class Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexBuilder.scala | 2 +- .../covering/FlintSparkCoveringIndex.scala | 4 +-- .../FlintSparkCoveringIndexITSuite.scala | 30 +--------------- .../FlintSparkSkippingIndexITSuite.scala | 30 +--------------- .../flint/spark/FlintSparkSuite.scala | 35 +++++++++++++++++++ 5 files changed, 40 insertions(+), 61 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index ceb6dae8b..740c02e1d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -19,7 +19,7 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { protected var tableName: String = "" /** All columns of the given source table */ - lazy val allColumns: Map[String, Column] = { + lazy protected val allColumns: Map[String, Column] = { require(tableName.nonEmpty, "Source table name is not provided") flint.spark.catalog diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 54d55b630..4feb23f45 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -59,8 +59,8 @@ class FlintSparkCoveringIndex( } private def getMetaInfo: String = { - val objects = indexedColumns.map { case (colName, colVal) => - JObject("columnName" -> JString(colName), "columnType" -> JString(colVal)) + val objects = indexedColumns.map { case (colName, colType) => + JObject("columnName" -> JString(colName), "columnType" -> JString(colType)) }.toList Serialization.write(JArray(objects)) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index d6f223ce8..599a0a1c8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -21,35 +21,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { override def beforeAll(): Unit = { super.beforeAll() - sql(s""" - | CREATE TABLE $testTable - | ( - | name STRING, - | age INT, - | address STRING - | ) - | USING CSV - | OPTIONS ( - | header 'false', - | delimiter '\t' - | ) - | PARTITIONED BY ( - | year INT, - | month INT - | ) - |""".stripMargin) - - sql(s""" - | INSERT INTO $testTable - | PARTITION (year=2023, month=4) - | VALUES ('Hello', 30, 'Seattle') - | """.stripMargin) - - sql(s""" - | INSERT INTO $testTable - | PARTITION (year=2023, month=5) - | VALUES ('World', 25, 'Portland') - | """.stripMargin) + createPartitionedTable(testTable) } override def afterEach(): Unit = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 8c9336ca0..a06ebd1e1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -29,35 +29,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { override def beforeAll(): Unit = { super.beforeAll() - sql(s""" - | CREATE TABLE $testTable - | ( - | name STRING, - | age INT, - | address STRING - | ) - | USING CSV - | OPTIONS ( - | header 'false', - | delimiter '\t' - | ) - | PARTITIONED BY ( - | year INT, - | month INT - | ) - |""".stripMargin) - - sql(s""" - | INSERT INTO $testTable - | PARTITION (year=2023, month=4) - | VALUES ('Hello', 30, 'Seattle') - | """.stripMargin) - - sql(s""" - | INSERT INTO $testTable - | PARTITION (year=2023, month=5) - | VALUES ('World', 25, 'Portland') - | """.stripMargin) + createPartitionedTable(testTable) } override def afterEach(): Unit = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 9169c9434..3ee6deda1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -28,4 +28,39 @@ trait FlintSparkSuite setFlintSparkConf(REFRESH_POLICY, "true") new FlintSpark(spark) } + + protected def createPartitionedTable(testTable: String): Unit = { + sql( + s""" + | CREATE TABLE $testTable + | ( + | name STRING, + | age INT, + | address STRING + | ) + | USING CSV + | OPTIONS ( + | header 'false', + | delimiter '\t' + | ) + | PARTITIONED BY ( + | year INT, + | month INT + | ) + |""".stripMargin) + + sql( + s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 30, 'Seattle') + | """.stripMargin) + + sql( + s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=5) + | VALUES ('World', 25, 'Portland') + | """.stripMargin) + } } From 428559b1476ac1b02fd768a5e394ae1df1dd4664 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 30 Aug 2023 13:34:22 -0700 Subject: [PATCH 06/11] Rename build method Signed-off-by: Chen Dai --- .../spark/covering/FlintSparkCoveringIndex.scala | 5 +++-- .../flint/spark/FlintSparkCoveringIndexITSuite.scala | 12 ++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 4feb23f45..c8ae710e4 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -58,6 +58,7 @@ class FlintSparkCoveringIndex( df.select(colNames.head, colNames.tail: _*) } + // TODO: refactor all these once Flint metadata spec finalized private def getMetaInfo: String = { val objects = indexedColumns.map { case (colName, colType) => JObject("columnName" -> JString(colName), "columnType" -> JString(colType)) @@ -93,7 +94,7 @@ object FlintSparkCoveringIndex { * @return * index builder */ - def indexName(indexName: String): Builder = { + def name(indexName: String): Builder = { this.indexName = indexName this } @@ -119,7 +120,7 @@ object FlintSparkCoveringIndex { * @return * index builder */ - def addIndexColumn(colNames: String*): Builder = { + def addIndexColumns(colNames: String*): Builder = { colNames.foreach(colName => { indexedColumns += (colName -> findColumn(colName).dataType) }) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 599a0a1c8..b8be04392 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -34,9 +34,9 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { test("create covering index with metadata successfully") { flint .coveringIndex() - .indexName(testIndex) + .name(testIndex) .onTable(testTable) - .addIndexColumn("name", "age") + .addIndexColumns("name", "age") .create() val index = flint.describeIndex(testIndex) @@ -71,9 +71,9 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { test("full refresh covering index successfully") { flint .coveringIndex() - .indexName(testIndex) + .name(testIndex) .onTable(testTable) - .addIndexColumn("name", "age") + .addIndexColumns("name", "age") .create() flint.refreshIndex(testIndex, FULL) @@ -85,9 +85,9 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { test("incremental refresh covering index successfully") { flint .coveringIndex() - .indexName(testIndex) + .name(testIndex) .onTable(testTable) - .addIndexColumn("name", "age") + .addIndexColumns("name", "age") .create() val jobId = flint.refreshIndex(testIndex, INCREMENTAL) From 27628d498f9664dfe03fb3dbe5bfc616184506d7 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 30 Aug 2023 13:51:40 -0700 Subject: [PATCH 07/11] Add more IT Signed-off-by: Chen Dai --- .../spark/FlintSparkCoveringIndexITSuite.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index b8be04392..c685474d0 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -101,4 +101,20 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { val indexData = flint.queryIndex(testIndex) checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) } + + test("can have multiple covering indexes on a table") { + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() + + flint + .coveringIndex() + .name(testIndex + "_address") + .onTable(testTable) + .addIndexColumns("address") + .create() + } } From 41ca8ad6ae5606563d82eefc16062787a1119c6e Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 30 Aug 2023 16:49:09 -0700 Subject: [PATCH 08/11] Add UT for new covering index class Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 4 +- .../covering/FlintSparkCoveringIndex.scala | 50 ++++++++++++++++--- .../FlintSparkCoveringIndexSuite.scala | 38 ++++++++++++++ .../FlintSparkCoveringIndexITSuite.scala | 26 ++++++---- 4 files changed, 99 insertions(+), 19 deletions(-) create mode 100644 flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 8058f9bff..29d744b8c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -15,7 +15,7 @@ import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex -import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{parseFlintIndexName, COVERING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer} @@ -233,7 +233,7 @@ class FlintSpark(val spark: SparkSession) { new FlintSparkSkippingIndex(tableName, strategies) case COVERING_INDEX_TYPE => new FlintSparkCoveringIndex( - indexName, + parseFlintIndexName(indexName, tableName), tableName, indexedColumns.arr.map { obj => ((obj \ "columnName").extract[String], (obj \ "columnType").extract[String]) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index c8ae710e4..7e70b3923 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -11,7 +11,7 @@ import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder} -import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.flint.datatype.FlintDataType @@ -20,7 +20,7 @@ import org.apache.spark.sql.types.StructType /** * Flint covering index in Spark. * - * @param name + * @param indexName * index name * @param tableName * source table name @@ -28,9 +28,9 @@ import org.apache.spark.sql.types.StructType * indexed column list */ class FlintSparkCoveringIndex( - override val name: String, - val tableName: String, - val indexedColumns: Map[String, String]) + indexName: String, + tableName: String, + indexedColumns: Map[String, String]) extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") @@ -40,10 +40,12 @@ class FlintSparkCoveringIndex( override val kind: String = COVERING_INDEX_TYPE + override def name(): String = getFlintIndexName(indexName, tableName) + override def metadata(): FlintMetadata = { new FlintMetadata(s"""{ | "_meta": { - | "name": "$name", + | "name": "${name()}", | "kind": "$kind", | "indexedColumns": $getMetaInfo, | "source": "$tableName" @@ -81,6 +83,42 @@ object FlintSparkCoveringIndex { /** Covering index type name */ val COVERING_INDEX_TYPE = "covering" + /** + * Get Flint index name which follows the convention: "flint_" prefix + source table name + + + * given index name + "_index" suffix. + * + * This helps identify the Flint index because Flint index is not registered to Spark Catalog + * for now. + * + * @param tableName + * full table name + * @param indexName + * index name specified by user + * @return + * Flint covering index name + */ + def getFlintIndexName(indexName: String, tableName: String): String = { + require(tableName.contains("."), "Full table name database.table is required") + + s"flint_${tableName.replace(".", "_")}_${indexName}_index" + } + + /** + * Parse original index name out of Flint index name. + * + * @param flintIndexName + * Flint index name + * @param tableName + * source table name + * @return + * original index name specified by user + */ + def parseFlintIndexName(flintIndexName: String, tableName: String): String = { + flintIndexName.substring( + s"flint_${tableName.replace(".", "_")}_".length, + flintIndexName.length - "_index".length) + } + /** Builder class for covering index build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { private var indexName: String = "" diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala new file mode 100644 index 000000000..334511b82 --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.covering + +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.parseFlintIndexName +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +import org.apache.spark.FlintSuite + +class FlintSparkCoveringIndexSuite extends FlintSuite { + + test("get covering index name") { + val index = new FlintSparkCoveringIndex("ci", "default.test", Map("name" -> "string")) + index.name() shouldBe "flint_default_test_ci_index" + } + + test("parse covering index name") { + val flintIndexName = "flint_default_ci_test_name_and_age_index" + val indexName = parseFlintIndexName(flintIndexName, "default.ci_test") + indexName shouldBe "name_and_age" + } + + test("should fail if get index name without full table name") { + val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string")) + assertThrows[IllegalArgumentException] { + index.name() + } + } + + test("should fail if no indexed column given") { + assertThrows[IllegalArgumentException] { + new FlintSparkCoveringIndex("ci", "default.test", Map.empty) + } + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index c685474d0..7f17a96dd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -7,6 +7,7 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -15,8 +16,9 @@ import org.apache.spark.sql.Row class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { /** Test table and index name */ - private val testTable = "default.test_ci" - private val testIndex = "ci" + private val testTable = "default.ci_test" + private val testIndex = "name_and_age" + private val testFlintIndex = getFlintIndexName(testIndex, testTable) override def beforeAll(): Unit = { super.beforeAll() @@ -28,7 +30,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { super.afterEach() // Delete all test indices - flint.deleteIndex(testIndex) + flint.deleteIndex(testFlintIndex) } test("create covering index with metadata successfully") { @@ -39,11 +41,11 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .addIndexColumns("name", "age") .create() - val index = flint.describeIndex(testIndex) + val index = flint.describeIndex(testFlintIndex) index shouldBe defined index.get.metadata().getContent should matchJson(s"""{ | "_meta": { - | "name": "ci", + | "name": "flint_default_ci_test_name_and_age_index", | "kind": "covering", | "indexedColumns": [ | { @@ -54,7 +56,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { | "columnName": "age", | "columnType": "int" | }], - | "source": "default.test_ci" + | "source": "default.ci_test" | }, | "properties": { | "name": { @@ -76,9 +78,9 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .addIndexColumns("name", "age") .create() - flint.refreshIndex(testIndex, FULL) + flint.refreshIndex(testFlintIndex, FULL) - val indexData = flint.queryIndex(testIndex) + val indexData = flint.queryIndex(testFlintIndex) checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) } @@ -90,7 +92,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .addIndexColumns("name", "age") .create() - val jobId = flint.refreshIndex(testIndex, INCREMENTAL) + val jobId = flint.refreshIndex(testFlintIndex, INCREMENTAL) jobId shouldBe defined val job = spark.streams.get(jobId.get) @@ -98,7 +100,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { job.processAllAvailable() } - val indexData = flint.queryIndex(testIndex) + val indexData = flint.queryIndex(testFlintIndex) checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) } @@ -110,11 +112,13 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .addIndexColumns("name", "age") .create() + val newIndex = testIndex + "_address" flint .coveringIndex() - .name(testIndex + "_address") + .name(newIndex) .onTable(testTable) .addIndexColumns("address") .create() + flint.deleteIndex(getFlintIndexName(newIndex, testTable)) } } From 671643390ceb86bb210c3ed2e1959a69f30ee048 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 1 Sep 2023 14:07:47 -0700 Subject: [PATCH 09/11] Refactor flint index name prefix and suffix Signed-off-by: Chen Dai --- .../org/opensearch/flint/spark/FlintSparkIndex.scala | 8 ++++++++ .../flint/spark/covering/FlintSparkCoveringIndex.scala | 10 +++++++--- .../flint/spark/skipping/FlintSparkSkippingIndex.scala | 7 +++++-- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index bbfa4c4ba..eb8da4e07 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -49,4 +49,12 @@ object FlintSparkIndex { * ID column name. */ val ID_COLUMN: String = "__id__" + + /** + * Common prefix of Flint index name. + * + * @param tableName source table name + * @return Flint index name + */ + def flintIndexNamePrefix(tableName: String): String = s"flint_${tableName.replace(".", "_")}_" } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 7e70b3923..e3598048c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -11,6 +11,7 @@ import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder} +import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} import org.apache.spark.sql.DataFrame @@ -83,6 +84,9 @@ object FlintSparkCoveringIndex { /** Covering index type name */ val COVERING_INDEX_TYPE = "covering" + /** Flint covering index name suffix */ + val COVERING_INDEX_SUFFIX = "_index" + /** * Get Flint index name which follows the convention: "flint_" prefix + source table name + + * given index name + "_index" suffix. @@ -100,7 +104,7 @@ object FlintSparkCoveringIndex { def getFlintIndexName(indexName: String, tableName: String): String = { require(tableName.contains("."), "Full table name database.table is required") - s"flint_${tableName.replace(".", "_")}_${indexName}_index" + flintIndexNamePrefix(tableName) + indexName + COVERING_INDEX_SUFFIX } /** @@ -115,8 +119,8 @@ object FlintSparkCoveringIndex { */ def parseFlintIndexName(flintIndexName: String, tableName: String): String = { flintIndexName.substring( - s"flint_${tableName.replace(".", "_")}_".length, - flintIndexName.length - "_index".length) + flintIndexNamePrefix(tableName).length, + flintIndexName.length - COVERING_INDEX_SUFFIX.length) } /** Builder class for covering index build */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 4c0d3983c..2ea93f07e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -10,7 +10,7 @@ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder} -import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy @@ -102,6 +102,9 @@ object FlintSparkSkippingIndex { /** File path column name */ val FILE_PATH_COLUMN = "file_path" + /** Flint skipping index name suffix */ + val SKIPPING_INDEX_SUFFIX = "skipping_index" + /** * Get skipping index name which follows the convention: "flint_" prefix + source table name + * "_skipping_index" suffix. @@ -117,7 +120,7 @@ object FlintSparkSkippingIndex { def getSkippingIndexName(tableName: String): String = { require(tableName.contains("."), "Full table name database.table is required") - s"flint_${tableName.replace(".", "_")}_skipping_index" + flintIndexNamePrefix(tableName) + SKIPPING_INDEX_SUFFIX } /** Builder class for skipping index build */ From 714360b65abf2aec6fab273590d0fc24dfcc45f0 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 1 Sep 2023 14:14:15 -0700 Subject: [PATCH 10/11] Add comment Signed-off-by: Chen Dai --- .../org/opensearch/flint/spark/FlintSparkIndex.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index eb8da4e07..62e6b4668 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -51,10 +51,13 @@ object FlintSparkIndex { val ID_COLUMN: String = "__id__" /** - * Common prefix of Flint index name. + * Common prefix of Flint index name which is "flint_database_table_" * - * @param tableName source table name - * @return Flint index name + * @param fullTableName + * source full table name + * @return + * Flint index name */ - def flintIndexNamePrefix(tableName: String): String = s"flint_${tableName.replace(".", "_")}_" + def flintIndexNamePrefix(fullTableName: String): String = + s"flint_${fullTableName.replace(".", "_")}_" } From a085a13ba23be2705f8ef30e102ed2fcf3e08cc5 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 6 Sep 2023 13:25:33 -0700 Subject: [PATCH 11/11] Remove parse flint index name logic Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 4 ++-- .../covering/FlintSparkCoveringIndex.scala | 18 +----------------- .../FlintSparkCoveringIndexSuite.scala | 7 ------- .../spark/FlintSparkCoveringIndexITSuite.scala | 2 +- 4 files changed, 4 insertions(+), 27 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 29d744b8c..8058f9bff 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -15,7 +15,7 @@ import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex -import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{parseFlintIndexName, COVERING_INDEX_TYPE} +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer} @@ -233,7 +233,7 @@ class FlintSpark(val spark: SparkSession) { new FlintSparkSkippingIndex(tableName, strategies) case COVERING_INDEX_TYPE => new FlintSparkCoveringIndex( - parseFlintIndexName(indexName, tableName), + indexName, tableName, indexedColumns.arr.map { obj => ((obj \ "columnName").extract[String], (obj \ "columnType").extract[String]) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index e3598048c..f2f5933d6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -46,7 +46,7 @@ class FlintSparkCoveringIndex( override def metadata(): FlintMetadata = { new FlintMetadata(s"""{ | "_meta": { - | "name": "${name()}", + | "name": "$indexName", | "kind": "$kind", | "indexedColumns": $getMetaInfo, | "source": "$tableName" @@ -107,22 +107,6 @@ object FlintSparkCoveringIndex { flintIndexNamePrefix(tableName) + indexName + COVERING_INDEX_SUFFIX } - /** - * Parse original index name out of Flint index name. - * - * @param flintIndexName - * Flint index name - * @param tableName - * source table name - * @return - * original index name specified by user - */ - def parseFlintIndexName(flintIndexName: String, tableName: String): String = { - flintIndexName.substring( - flintIndexNamePrefix(tableName).length, - flintIndexName.length - COVERING_INDEX_SUFFIX.length) - } - /** Builder class for covering index build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { private var indexName: String = "" diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index 334511b82..a50db1af2 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -5,7 +5,6 @@ package org.opensearch.flint.spark.covering -import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.parseFlintIndexName import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.FlintSuite @@ -17,12 +16,6 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { index.name() shouldBe "flint_default_test_ci_index" } - test("parse covering index name") { - val flintIndexName = "flint_default_ci_test_name_and_age_index" - val indexName = parseFlintIndexName(flintIndexName, "default.ci_test") - indexName shouldBe "name_and_age" - } - test("should fail if get index name without full table name") { val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string")) assertThrows[IllegalArgumentException] { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 7f17a96dd..20e4dca24 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -45,7 +45,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { index shouldBe defined index.get.metadata().getContent should matchJson(s"""{ | "_meta": { - | "name": "flint_default_ci_test_name_and_age_index", + | "name": "name_and_age", | "kind": "covering", | "indexedColumns": [ | {