Skip to content

Commit

Permalink
Add more index options for streaming job (#76)
Browse files Browse the repository at this point in the history
* Add watermark delay option

Signed-off-by: Chen Dai <daichen@amazon.com>

* Change UT and IT with watermark delay option

Signed-off-by: Chen Dai <daichen@amazon.com>

* Update MV SQL IT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add output mode option

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add extra sink options

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add extra source options

Signed-off-by: Chen Dai <daichen@amazon.com>

* Refactor MV UT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Copy extra options to streaming relation

Signed-off-by: Chen Dai <daichen@amazon.com>

* Fix logical plan assert

Signed-off-by: Chen Dai <daichen@amazon.com>

* Qualify table name in relation when fetching extra option

Signed-off-by: Chen Dai <daichen@amazon.com>

* Update user manual with new options

Signed-off-by: Chen Dai <daichen@amazon.com>

---------

Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen authored Oct 21, 2023
1 parent f8ec62e commit af210ca
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 93 deletions.
15 changes: 11 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,22 @@ User can provide the following options in `WITH` clause of create statement:
+ `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually.
+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing.
+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'

Note that the index option name is case-sensitive.
Note that the index option name is case-sensitive. Here is an example:

```sql
WITH (
auto_refresh = [true|false],
refresh_interval = 'time interval expression',
checkpoint_location = 'checkpoint directory path'
auto_refresh = true,
refresh_interval = '10 Seconds',
checkpoint_location = 's3://test/',
watermark_delay = '1 Second',
output_mode = 'complete',
index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}',
extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}'
)
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,18 @@ class FlintSpark(val spark: SparkSession) {
.queryName(indexName)
.format(FLINT_DATASOURCE)
.options(flintSparkConf.properties)
.addIndexOptions(options)
.addSinkOptions(options)
.start(indexName)
Some(job.id.toString)

// Otherwise, fall back to foreachBatch + batch refresh
case INCREMENTAL =>
val job = spark.readStream
.options(options.extraSourceOptions(tableName))
.table(tableName)
.writeStream
.queryName(indexName)
.addIndexOptions(options)
.addSinkOptions(options)
.foreachBatch { (batchDF: DataFrame, _: Long) =>
batchRefresh(Some(batchDF))
}
Expand Down Expand Up @@ -237,26 +238,26 @@ class FlintSpark(val spark: SparkSession) {
// Using Scala implicit class to avoid breaking method chaining of Spark data frame fluent API
private implicit class FlintDataStreamWriter(val dataStream: DataStreamWriter[Row]) {

def addIndexOptions(options: FlintSparkIndexOptions): DataStreamWriter[Row] = {
def addSinkOptions(options: FlintSparkIndexOptions): DataStreamWriter[Row] = {
dataStream
.addCheckpointLocation(options.checkpointLocation())
.addRefreshInterval(options.refreshInterval())
.addOutputMode(options.outputMode())
.options(options.extraSinkOptions())
}

def addCheckpointLocation(checkpointLocation: Option[String]): DataStreamWriter[Row] = {
if (checkpointLocation.isDefined) {
dataStream.option("checkpointLocation", checkpointLocation.get)
} else {
dataStream
}
checkpointLocation.map(dataStream.option("checkpointLocation", _)).getOrElse(dataStream)
}

def addRefreshInterval(refreshInterval: Option[String]): DataStreamWriter[Row] = {
if (refreshInterval.isDefined) {
dataStream.trigger(Trigger.ProcessingTime(refreshInterval.get))
} else {
dataStream
}
refreshInterval
.map(interval => dataStream.trigger(Trigger.ProcessingTime(interval)))
.getOrElse(dataStream)
}

def addOutputMode(outputMode: Option[String]): DataStreamWriter[Row] = {
outputMode.map(dataStream.outputMode).getOrElse(dataStream)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL}
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames

/**
Expand All @@ -16,6 +19,8 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
*/
case class FlintSparkIndexOptions(options: Map[String, String]) {

implicit val formats: Formats = Serialization.formats(NoTypeHints)

validateOptionNames(options)

/**
Expand All @@ -42,6 +47,21 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
*/
def checkpointLocation(): Option[String] = getOptionValue(CHECKPOINT_LOCATION)

/**
* How late the data can come and still be processed.
*
* @return
* watermark delay time expression
*/
def watermarkDelay(): Option[String] = getOptionValue(WATERMARK_DELAY)

/**
* The output mode that describes how data will be written to streaming sink.
* @return
* output mode
*/
def outputMode(): Option[String] = getOptionValue(OUTPUT_MODE)

/**
* The index settings for OpenSearch index created.
*
Expand All @@ -50,6 +70,28 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
*/
def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS)

/**
* Extra streaming source options that can be simply passed to DataStreamReader or
* Relation.options
* @param source
* source name (full table name)
* @return
* extra source option map or empty map if not exist
*/
def extraSourceOptions(source: String): Map[String, String] = {
parseExtraOptions(source)
}

/**
* Extra streaming sink options that can be simply passed to DataStreamWriter.options()
*
* @return
* extra sink option map or empty map if not exist
*/
def extraSinkOptions(): Map[String, String] = {
parseExtraOptions("sink")
}

/**
* @return
* all option values and fill default value if unspecified
Expand All @@ -67,6 +109,12 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
private def getOptionValue(name: OptionName): Option[String] = {
options.get(name.toString)
}

private def parseExtraOptions(key: String): Map[String, String] = {
getOptionValue(EXTRA_OPTIONS)
.map(opt => (parse(opt) \ key).extract[Map[String, String]])
.getOrElse(Map.empty)
}
}

object FlintSparkIndexOptions {
Expand All @@ -84,7 +132,10 @@ object FlintSparkIndexOptions {
val AUTO_REFRESH: OptionName.Value = Value("auto_refresh")
val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval")
val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location")
val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay")
val OUTPUT_MODE: OptionName.Value = Value("output_mode")
val INDEX_SETTINGS: OptionName.Value = Value("index_settings")
val EXTRA_OPTIONS: OptionName.Value = Value("extra_options")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.mv
import java.util.Locale

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.convert.ImplicitConversions.`map AsScala`

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
Expand All @@ -23,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.flint.{logicalPlanToDataFrame, qualifyTableName}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Flint materialized view in Spark.
Expand All @@ -44,9 +46,6 @@ case class FlintSparkMaterializedView(
extends FlintSparkIndex
with StreamingRefresh {

/** TODO: add it to index option */
private val watermarkDelay = "0 Minute"

override val kind: String = MV_INDEX_TYPE

override def name(): String = getFlintIndexName(mvName)
Expand Down Expand Up @@ -81,19 +80,33 @@ case class FlintSparkMaterializedView(
* 2.Set isStreaming flag to true in Relation operator
*/
val streamingPlan = batchPlan transform {
case WindowingAggregate(agg, timeCol) =>
agg.copy(child = watermark(timeCol, watermarkDelay, agg.child))
case WindowingAggregate(aggregate, timeCol) =>
aggregate.copy(child = watermark(timeCol, aggregate.child))

case relation: UnresolvedRelation if !relation.isStreaming =>
relation.copy(isStreaming = true)
relation.copy(isStreaming = true, options = optionsWithExtra(spark, relation))
}
logicalPlanToDataFrame(spark, streamingPlan)
}

private def watermark(timeCol: Attribute, delay: String, child: LogicalPlan) = {
private def watermark(timeCol: Attribute, child: LogicalPlan) = {
require(
options.watermarkDelay().isDefined,
"watermark delay is required for incremental refresh with aggregation")

val delay = options.watermarkDelay().get
EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child)
}

private def optionsWithExtra(
spark: SparkSession,
relation: UnresolvedRelation): CaseInsensitiveStringMap = {
val originalOptions = relation.options.asCaseSensitiveMap
val tableName = qualifyTableName(spark, relation.tableName)
val extraOptions = options.extraSourceOptions(tableName).asJava
new CaseInsensitiveStringMap((originalOptions ++ extraOptions).asJava)
}

/**
* Extractor that extract event time column out of Aggregate operator.
*/
Expand All @@ -107,7 +120,7 @@ case class FlintSparkMaterializedView(

if (winFuncs.size != 1) {
throw new IllegalStateException(
"A windowing function is required for streaming aggregation")
"A windowing function is required for incremental refresh with aggregation")
}

// Assume first aggregate item must be time column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, REFRESH_INTERVAL}
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite
Expand All @@ -16,7 +16,10 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
AUTO_REFRESH.toString shouldBe "auto_refresh"
REFRESH_INTERVAL.toString shouldBe "refresh_interval"
CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location"
WATERMARK_DELAY.toString shouldBe "watermark_delay"
OUTPUT_MODE.toString shouldBe "output_mode"
INDEX_SETTINGS.toString shouldBe "index_settings"
EXTRA_OPTIONS.toString shouldBe "extra_options"
}

test("should return specified option value") {
Expand All @@ -25,12 +28,42 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
"auto_refresh" -> "true",
"refresh_interval" -> "1 Minute",
"checkpoint_location" -> "s3://test/",
"index_settings" -> """{"number_of_shards": 3}"""))
"watermark_delay" -> "30 Seconds",
"output_mode" -> "complete",
"index_settings" -> """{"number_of_shards": 3}""",
"extra_options" ->
""" {
| "alb_logs": {
| "opt1": "val1"
| },
| "sink": {
| "opt2": "val2",
| "opt3": "val3"
| }
| }""".stripMargin))

options.autoRefresh() shouldBe true
options.refreshInterval() shouldBe Some("1 Minute")
options.checkpointLocation() shouldBe Some("s3://test/")
options.watermarkDelay() shouldBe Some("30 Seconds")
options.outputMode() shouldBe Some("complete")
options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""")
options.extraSourceOptions("alb_logs") shouldBe Map("opt1" -> "val1")
options.extraSinkOptions() shouldBe Map("opt2" -> "val2", "opt3" -> "val3")
}

test("should return extra source option value and empty sink option values") {
val options = FlintSparkIndexOptions(
Map("extra_options" ->
""" {
| "alb_logs": {
| "opt1": "val1"
| }
| }""".stripMargin))

options.extraSourceOptions("alb_logs") shouldBe Map("opt1" -> "val1")
options.extraSourceOptions("alb_logs_metrics") shouldBe empty
options.extraSinkOptions() shouldBe empty
}

test("should return default option value if unspecified") {
Expand All @@ -39,11 +72,15 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
options.autoRefresh() shouldBe false
options.refreshInterval() shouldBe empty
options.checkpointLocation() shouldBe empty
options.watermarkDelay() shouldBe empty
options.outputMode() shouldBe empty
options.indexSettings() shouldBe empty
options.extraSourceOptions("alb_logs") shouldBe empty
options.extraSinkOptions() shouldBe empty
options.optionsWithDefault should contain("auto_refresh" -> "false")
}

test("should return default option value if unspecified with specified value") {
test("should return include unspecified option if it has default value") {
val options = FlintSparkIndexOptions(Map("refresh_interval" -> "1 Minute"))

options.optionsWithDefault shouldBe Map(
Expand Down
Loading

0 comments on commit af210ca

Please sign in to comment.