Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add show and desc materialized view SQL support #81

Merged
merged 4 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ CREATE MATERIALIZED VIEW [IF NOT EXISTS] name
AS <query>
WITH ( options )

SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database]

[DESC|DESCRIBE] MATERIALIZED VIEW name

DROP MATERIALIZED VIEW name
```

Expand All @@ -209,6 +213,10 @@ SELECT
FROM alb_logs
GROUP BY TUMBLE(time, '1 Minute')

SHOW MATERIALIZED VIEWS IN spark_catalog.default

DESC MATERIALIZED VIEW alb_logs_metrics

DROP MATERIALIZED VIEW alb_logs_metrics
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ dropCoveringIndexStatement

materializedViewStatement
: createMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
;

Expand All @@ -88,6 +90,14 @@ createMaterializedViewStatement
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

showMaterializedViewStatement
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
: SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier
;

describeMaterializedViewStatement
: (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;
Expand Down
2 changes: 2 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ DROP: 'DROP';
EXISTS: 'EXISTS';
FALSE: 'FALSE';
IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
MATERIALIZED: 'MATERIALIZED';
Expand All @@ -172,6 +173,7 @@ REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
WITH: 'WITH';


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,14 @@ class FlintSpark(val spark: SparkSession) {
* Flint index list
*/
def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = {
flintClient
.getAllIndexMetadata(indexNamePattern)
.asScala
.map(FlintSparkIndexFactory.create)
if (flintClient.exists(indexNamePattern)) {
flintClient
.getAllIndexMetadata(indexNamePattern)
.asScala
.map(FlintSparkIndexFactory.create)
} else {
Seq.empty
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
package org.opensearch.flint.spark.sql.mv

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateMaterializedViewStatementContext, DropMaterializedViewStatementContext, MaterializedViewQueryContext}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.types.StringType

/**
* Flint Spark AST builder that builds Spark command for Flint materialized view statement.
Expand Down Expand Up @@ -47,6 +50,41 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}
}

override def visitShowMaterializedViewStatement(
ctx: ShowMaterializedViewStatementContext): AnyRef = {
val outputSchema = Seq(
AttributeReference("materialized_view_name", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val catalogDbName = ctx.catalogDb.getText
val indexNamePattern = FlintSparkIndex.flintIndexNamePrefix(catalogDbName) + "*"
flint
.describeIndexes(indexNamePattern)
.collect { case mv: FlintSparkMaterializedView =>
Row(mv.mvName)
}
}
}

override def visitDescribeMaterializedViewStatement(
ctx: DescribeMaterializedViewStatementContext): AnyRef = {
val outputSchema = Seq(
AttributeReference("output_col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint
.describeIndex(flintIndexName)
.map { case mv: FlintSparkMaterializedView =>
mv.outputSchema.map { case (colName, colType) =>
Row(colName, colType)
}.toSeq
}
.getOrElse(Seq.empty)
}
}

override def visitDropMaterializedViewStatement(
ctx: DropMaterializedViewStatementContext): AnyRef = {
FlintSparkSqlCommand() { flint =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,42 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
}

test("show all materialized views in catalog and database") {
// Show in catalog
flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
checkAnswer(
sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"),
Seq(Row("spark_catalog.default.mv1")))

// Show in catalog.database
flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create()
checkAnswer(
sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"),
Seq(Row("spark_catalog.default.mv1"), Row("spark_catalog.default.mv2")))

checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty)
}

test("should return emtpy when show materialized views in empty database") {
checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty)
}

test("describe materialized view") {
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.create()

checkAnswer(
sql(s"DESC MATERIALIZED VIEW $testMvName"),
Seq(Row("startTime", "timestamp"), Row("count", "long")))
}

test("should return empty when describe nonexistent materialized view") {
checkAnswer(sql("DESC MATERIALIZED VIEW nonexistent_mv"), Seq())
}

test("drop materialized view") {
flint
.materializedView()
Expand Down