Skip to content

Commit

Permalink
Fix column name with special character when create materialized view (#…
Browse files Browse the repository at this point in the history
…98)

* Add IT to verify identifier is unquoted properly

Signed-off-by: Chen Dai <daichen@amazon.com>

* Fix dotted column name issue with IT

Signed-off-by: Chen Dai <daichen@amazon.com>

---------

Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen authored Oct 25, 2023
1 parent fa8e47a commit 62c11df
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ object FlintSparkIndex {
}

def generateSchemaJSON(allFieldTypes: Map[String, String]): String = {
// Backtick column names to escape special characters, otherwise fromDDL() will fail
val catalogDDL =
allFieldTypes
.map { case (colName, colType) => s"$colName $colType not null" }
.map { case (colName, colType) => s"`$colName` $colType not null" }
.mkString(",")

val structType = StructType.fromDDL(catalogDDL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark

import scala.Option.empty
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
Expand Down Expand Up @@ -179,6 +179,21 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
|""".stripMargin)
}

test("create skipping index with quoted index, table and column name") {
sql(s"""
| CREATE INDEX `$testIndex` ON `spark_catalog`.`default`.`covering_sql_test`
| (`name`, `age`)
| """.stripMargin)

val index = flint.describeIndex(testFlintIndex)
index shouldBe defined

val metadata = index.get.metadata()
metadata.name shouldBe testIndex
metadata.source shouldBe testTable
metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("name", "age")
}

test("show all covering index on the source table") {
flint
.coveringIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package org.opensearch.flint.spark
import java.sql.Timestamp

import scala.Option.empty
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
Expand Down Expand Up @@ -157,6 +157,28 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
}

test("create materialized view with quoted name and column name") {
val testQuotedQuery =
""" SELECT
| window.start AS `start.time`,
| COUNT(*) AS `count`
| FROM `spark_catalog`.`default`.`mv_test`
| GROUP BY TUMBLE(`time`, '10 Minutes')""".stripMargin.trim

sql(s"""
| CREATE MATERIALIZED VIEW `spark_catalog`.`default`.`mv_test_metrics`
| AS $testQuotedQuery
|""".stripMargin)

val index = flint.describeIndex(testFlintIndex)
index shouldBe defined

val metadata = index.get.metadata()
metadata.name shouldBe testMvName
metadata.source shouldBe testQuotedQuery
metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("start.time", "count")
}

test("show all materialized views in catalog and database") {
// Show in catalog
flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.flint.spark

import scala.Option.empty
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
Expand Down Expand Up @@ -150,6 +150,24 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
| """.stripMargin)
}

test("create skipping index with quoted table and column name") {
sql(s"""
| CREATE SKIPPING INDEX ON `spark_catalog`.`default`.`skipping_sql_test`
| (
| `year` PARTITION,
| `name` VALUE_SET,
| `age` MIN_MAX
| )
| """.stripMargin)

val index = flint.describeIndex(testIndex)
index shouldBe defined

val metadata = index.get.metadata()
metadata.source shouldBe testTable
metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("year", "name", "age")
}

test("describe skipping index") {
flint
.skippingIndex()
Expand Down

0 comments on commit 62c11df

Please sign in to comment.