Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support refresh interval and checkpoint location option #39

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
STRING: 'STRING';
TRUE: 'TRUE';
WITH: 'WITH';

Expand All @@ -174,6 +173,13 @@ EQ : '=' | '==';
MINUS: '-';


STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
| '"' ( ~('"'|'\\') | ('\\' .) )* '"'
| 'R\'' (~'\'')* '\''
| 'R"'(~'"')* '"'
;

INTEGER_VALUE
: DIGIT+
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark
import scala.collection.JavaConverters._

import org.json4s.{Formats, JArray, NoTypeHints}
import org.json4s.JsonAST.{JField, JObject}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
Expand All @@ -30,6 +31,7 @@ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
import org.apache.spark.sql.streaming.OutputMode.Append
import org.apache.spark.sql.streaming.Trigger

/**
* Flint Spark integration API entrypoint.
Expand Down Expand Up @@ -128,11 +130,22 @@ class FlintSpark(val spark: SparkSession) {
.writeStream
.queryName(indexName)
.outputMode(Append())
.foreachBatch { (batchDF: DataFrame, _: Long) =>
writeFlintIndex(batchDF)
}
.start()
Some(job.id.toString)

index.options
.checkpointLocation()
.foreach(location => job.option("checkpointLocation", location))
index.options
.refreshInterval()
.foreach(interval => job.trigger(Trigger.ProcessingTime(interval)))
dai-chen marked this conversation as resolved.
Show resolved Hide resolved

val jobId =
job
.foreachBatch { (batchDF: DataFrame, _: Long) =>
writeFlintIndex(batchDF)
}
.start()
.id
Some(jobId.toString)
}
}

Expand Down Expand Up @@ -175,8 +188,8 @@ class FlintSpark(val spark: SparkSession) {
*/
def deleteIndex(indexName: String): Boolean = {
if (flintClient.exists(indexName)) {
flintClient.deleteIndex(indexName)
stopRefreshingJob(indexName)
flintClient.deleteIndex(indexName)
true
} else {
false
Expand Down Expand Up @@ -223,6 +236,14 @@ class FlintSpark(val spark: SparkSession) {
val tableName = (meta \ "source").extract[String]
val indexType = (meta \ "kind").extract[String]
val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray]
val indexOptions = FlintSparkIndexOptions(
(meta \ "options")
.asInstanceOf[JObject]
.obj
.map { case JField(key, value) =>
key -> value.values.toString
}
.toMap)

indexType match {
case SKIPPING_INDEX_TYPE =>
Expand All @@ -242,14 +263,15 @@ class FlintSpark(val spark: SparkSession) {
throw new IllegalStateException(s"Unknown skipping strategy: $other")
}
}
new FlintSparkSkippingIndex(tableName, strategies)
new FlintSparkSkippingIndex(tableName, strategies, indexOptions)
case COVERING_INDEX_TYPE =>
new FlintSparkCoveringIndex(
indexName,
tableName,
indexedColumns.arr.map { obj =>
((obj \ "columnName").extract[String], (obj \ "columnType").extract[String])
}.toMap)
}.toMap,
indexOptions)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ trait FlintSparkIndex {
*/
val kind: String

/**
* Index options
*/
val options: FlintSparkIndexOptions

/**
* @return
* Flint index name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.empty

import org.apache.spark.sql.catalog.Column

/**
Expand All @@ -18,6 +20,9 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
/** Source table name */
protected var tableName: String = ""

/** Index options */
protected var indexOptions: FlintSparkIndexOptions = empty

/** All columns of the given source table */
lazy protected val allColumns: Map[String, Column] = {
require(tableName.nonEmpty, "Source table name is not provided")
Expand All @@ -29,6 +34,19 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
.toMap
}

/**
* Add index options.
*
* @param options
* index options
* @return
* builder
*/
def options(options: FlintSparkIndexOptions): this.type = {
this.indexOptions = options
this
}

/**
* Create Flint index.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

/**
* Flint Spark index configurable options.
*
* @param options
* index option mappings
*/
case class FlintSparkIndexOptions(options: Map[String, String]) {

/**
* Is Flint index auto refreshed or manual refreshed.
*
* @return
* auto refresh option value
*/
def autoRefresh(): Boolean = options.getOrElse("auto_refresh", "false").toBoolean

/**
* The refresh interval (only valid if auto refresh enabled).
*
* @return
* refresh interval expression
*/
def refreshInterval(): Option[String] = options.get("refresh_interval")

/**
* The checkpoint location which maybe required by Flint index's refresh.
*
* @return
* checkpoint location path
*/
def checkpointLocation(): Option[String] = options.get("checkpoint_location")
}

object FlintSparkIndexOptions {

/**
* Empty options
*/
val empty: FlintSparkIndexOptions = FlintSparkIndexOptions(Map.empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import org.json4s.JsonAST.{JArray, JObject, JString}
import org.json4s.native.JsonMethods.{compact, parse, render}
import org.json4s.native.Serialization
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder}
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

import org.apache.spark.sql.DataFrame
Expand All @@ -31,7 +32,8 @@ import org.apache.spark.sql.types.StructType
case class FlintSparkCoveringIndex(
indexName: String,
tableName: String,
indexedColumns: Map[String, String])
indexedColumns: Map[String, String],
override val options: FlintSparkIndexOptions = empty)
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
Expand All @@ -49,7 +51,8 @@ case class FlintSparkCoveringIndex(
| "name": "$indexName",
| "kind": "$kind",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName"
| "source": "$tableName",
| "options": $getIndexOptions
| },
| "properties": $getSchema
| }
Expand All @@ -69,6 +72,10 @@ case class FlintSparkCoveringIndex(
Serialization.write(JArray(objects))
}

private def getIndexOptions: String = {
Serialization.write(options.options)
}

private def getSchema: String = {
val catalogDDL =
indexedColumns
Expand Down Expand Up @@ -154,6 +161,6 @@ object FlintSparkCoveringIndex {
}

override protected def buildIndex(): FlintSparkIndex =
new FlintSparkCoveringIndex(indexName, tableName, indexedColumns)
new FlintSparkCoveringIndex(indexName, tableName, indexedColumns, indexOptions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintVersion
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder}
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer
import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
Expand All @@ -34,7 +35,8 @@ import org.apache.spark.sql.types.StructType
*/
class FlintSparkSkippingIndex(
tableName: String,
val indexedColumns: Seq[FlintSparkSkippingStrategy])
val indexedColumns: Seq[FlintSparkSkippingStrategy],
override val options: FlintSparkIndexOptions = empty)
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
Expand All @@ -56,7 +58,8 @@ class FlintSparkSkippingIndex(
| "version": "${FlintVersion.current()}",
| "kind": "$SKIPPING_INDEX_TYPE",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName"
| "source": "$tableName",
| "options": $getIndexOptions
| },
| "properties": $getSchema
| }
Expand All @@ -82,6 +85,10 @@ class FlintSparkSkippingIndex(
Serialization.write(indexedColumns)
}

private def getIndexOptions: String = {
Serialization.write(options.options)
}

private def getSchema: String = {
val allFieldTypes =
indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string")
Expand Down Expand Up @@ -192,7 +199,7 @@ object FlintSparkSkippingIndex {
}

override def buildIndex(): FlintSparkIndex =
new FlintSparkSkippingIndex(tableName, indexedColumns)
new FlintSparkSkippingIndex(tableName, indexedColumns, indexOptions)

private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = {
require(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,33 @@

package org.opensearch.flint.spark.sql

import org.antlr.v4.runtime.tree.RuleNode
import org.antlr.v4.runtime.tree.{ParseTree, RuleNode}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.PropertyListContext
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder

import org.apache.spark.sql.catalyst.plans.logical.Command
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* Flint Spark AST builder that builds Spark command for Flint index statement.
* This class mix-in all other AST builders and provides util methods.
* Flint Spark AST builder that builds Spark command for Flint index statement. This class mix-in
* all other AST builders and provides util methods.
*/
class FlintSparkSqlAstBuilder
extends FlintSparkSqlExtensionsBaseVisitor[Command]
extends FlintSparkSqlExtensionsBaseVisitor[AnyRef]
with FlintSparkSkippingIndexAstBuilder
with FlintSparkCoveringIndexAstBuilder {
with FlintSparkCoveringIndexAstBuilder
with SparkSqlAstBuilder {

override def aggregateResult(aggregate: Command, nextResult: Command): Command =
override def visit(tree: ParseTree): LogicalPlan = {
tree.accept(this).asInstanceOf[LogicalPlan]
}

override def aggregateResult(aggregate: AnyRef, nextResult: AnyRef): AnyRef =
if (nextResult != null) nextResult else aggregate
}

object FlintSparkSqlAstBuilder {

/**
* Check if auto_refresh is true in property list.
*
* @param ctx
* property list
*/
def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = {
if (ctx == null) {
false
} else {
ctx
.property()
.forEach(p => {
if (p.key.getText == "auto_refresh") {
return p.value.getText.toBoolean
}
})
false
}
}

/**
* Get full table name if database not specified.
*
Expand Down
Loading