Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validation for index options #66

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ User can provide the following options in `WITH` clause of create statement:
+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.

Note that the index option name is case-sensitive.

```sql
WITH (
auto_refresh = [true|false],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames

/**
* Flint Spark index configurable options.
*
Expand All @@ -13,37 +16,57 @@ package org.opensearch.flint.spark
*/
case class FlintSparkIndexOptions(options: Map[String, String]) {

validateOptionNames(options)

/**
* Is Flint index auto refreshed or manual refreshed.
*
* @return
* auto refresh option value
*/
def autoRefresh(): Boolean = options.getOrElse("auto_refresh", "false").toBoolean
def autoRefresh(): Boolean = getOptionValue(AUTO_REFRESH).getOrElse("false").toBoolean

/**
* The refresh interval (only valid if auto refresh enabled).
*
* @return
* refresh interval expression
*/
def refreshInterval(): Option[String] = options.get("refresh_interval")
def refreshInterval(): Option[String] = getOptionValue(REFRESH_INTERVAL)

/**
* The checkpoint location which maybe required by Flint index's refresh.
*
* @return
* checkpoint location path
*/
def checkpointLocation(): Option[String] = options.get("checkpoint_location")
def checkpointLocation(): Option[String] = getOptionValue(CHECKPOINT_LOCATION)

/**
* The index settings for OpenSearch index created.
*
* @return
* index setting JSON
*/
def indexSettings(): Option[String] = options.get("index_settings")
def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS)

/**
* @return
* all option values and fill default value if unspecified
*/
def optionsWithDefault: Map[String, String] = {
val map = Map.newBuilder[String, String]
map ++= options

if (!options.contains(AUTO_REFRESH.toString)) {
map += (AUTO_REFRESH.toString -> autoRefresh().toString)
}
map.result()
}

private def getOptionValue(name: OptionName): Option[String] = {
options.get(name.toString)
}
}

object FlintSparkIndexOptions {
Expand All @@ -52,4 +75,28 @@ object FlintSparkIndexOptions {
* Empty options
*/
val empty: FlintSparkIndexOptions = FlintSparkIndexOptions(Map.empty)

/**
* Option name Enum.
*/
object OptionName extends Enumeration {
type OptionName = Value
val AUTO_REFRESH: OptionName.Value = Value("auto_refresh")
val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval")
val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location")
val INDEX_SETTINGS: OptionName.Value = Value("index_settings")
}

/**
* Validate option names and throw exception if any unknown found.
*
* @param options
* options given
*/
def validateOptionNames(options: Map[String, String]): Unit = {
val allOptions = OptionName.values.map(_.toString)
val invalidOptions = options.keys.filterNot(allOptions.contains)

require(invalidOptions.isEmpty, s"option name ${invalidOptions.mkString(",")} is invalid")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ case class FlintSparkCoveringIndex(
}

private def getIndexOptions: String = {
Serialization.write(options.options)
Serialization.write(options.optionsWithDefault)
}

private def getIndexProperties: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class FlintSparkSkippingIndex(
}

private def getIndexOptions: String = {
Serialization.write(options.options)
Serialization.write(options.optionsWithDefault)
}

private def getIndexProperties: String = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, REFRESH_INTERVAL}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite

class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {

test("should return lowercase name as option name") {
AUTO_REFRESH.toString shouldBe "auto_refresh"
REFRESH_INTERVAL.toString shouldBe "refresh_interval"
CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location"
INDEX_SETTINGS.toString shouldBe "index_settings"
}

test("should return specified option value") {
val options = FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"refresh_interval" -> "1 Minute",
"checkpoint_location" -> "s3://test/",
"index_settings" -> """{"number_of_shards": 3}"""))

options.autoRefresh() shouldBe true
options.refreshInterval() shouldBe Some("1 Minute")
options.checkpointLocation() shouldBe Some("s3://test/")
options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""")
}

test("should return default option value if unspecified") {
val options = FlintSparkIndexOptions(Map.empty)

options.autoRefresh() shouldBe false
options.refreshInterval() shouldBe empty
options.checkpointLocation() shouldBe empty
options.indexSettings() shouldBe empty
options.optionsWithDefault should contain("auto_refresh" -> "false")
}

test("should return default option value if unspecified with specified value") {
val options = FlintSparkIndexOptions(Map("refresh_interval" -> "1 Minute"))

options.optionsWithDefault shouldBe Map(
"auto_refresh" -> "false",
"refresh_interval" -> "1 Minute")
}

test("should report error if any unknown option name") {
the[IllegalArgumentException] thrownBy
FlintSparkIndexOptions(Map("autoRefresh" -> "true"))

the[IllegalArgumentException] thrownBy
FlintSparkIndexOptions(Map("AUTO_REFRESH" -> "true"))

the[IllegalArgumentException] thrownBy {
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "indexSetting" -> "test"))
} should have message "requirement failed: option name indexSetting is invalid"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
| "columnType": "int"
| }],
| "source": "spark_catalog.default.ci_test",
| "options": {},
| "options": { "auto_refresh": "false" },
| "properties": {}
| },
| "properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row

Expand Down Expand Up @@ -58,7 +58,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create skipping index with streaming job options") {
test("create covering index with streaming job options") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE INDEX $testIndex ON $testTable ( name )
Expand All @@ -77,7 +77,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
}
}

test("create skipping index with index settings") {
test("create covering index with index settings") {
sql(s"""
| CREATE INDEX $testIndex ON $testTable ( name )
| WITH (
Expand All @@ -94,6 +94,15 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
(settings \ "index.number_of_replicas").extract[String] shouldBe "3"
}

test("create covering index with invalid option") {
the[IllegalArgumentException] thrownBy
sql(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WITH (autoRefresh = true)
| """.stripMargin)
}

test("create covering index with manual refresh") {
sql(s"""
| CREATE INDEX $testIndex ON $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "columnType": "int"
| }],
| "source": "spark_catalog.default.test",
| "options": {},
| "options": { "auto_refresh": "false" },
| "properties": {}
| },
| "properties": {
Expand All @@ -105,7 +105,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| }
|""".stripMargin)

index.get.options shouldBe FlintSparkIndexOptions.empty
index.get.options shouldBe FlintSparkIndexOptions(Map("auto_refresh" -> "false"))
}

test("create skipping index with index options successfully") {
Expand Down Expand Up @@ -522,7 +522,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "columnType": "struct<subfield1:string,subfield2:int>"
| }],
| "source": "$testTable",
| "options": {},
| "options": { "auto_refresh": "false" },
| "properties": {}
| },
| "properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
Expand Down Expand Up @@ -99,6 +99,15 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

test("create skipping index with invalid option") {
the[IllegalArgumentException] thrownBy
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| ( year PARTITION )
| WITH (autoRefresh = true)
| """.stripMargin)
}

test("create skipping index with manual refresh") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
Expand Down