Skip to content

Commit

Permalink
query rewrite for LogsTable skipping index (#154)
Browse files Browse the repository at this point in the history
* query rewrite for nexus skipping index

Signed-off-by: Sean Kao <seankao@amazon.com>

* Delay index scan collect to query execution time

Signed-off-by: Sean Kao <seankao@amazon.com>

---------

Signed-off-by: Sean Kao <seankao@amazon.com>
  • Loading branch information
seankao-az authored Nov 15, 2023
1 parent 24252d1 commit 0351f40
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 1 deletion.
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
assembly / assemblyExcludedJars := {
val cp = (assembly / fullClasspath).value
cp filter { file => file.data.getName.contains("LogsConnectorSpark")}
},
assembly / test := (Test / test).value)

// Test assembly package with integration test.
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

package org.opensearch.flint.spark.skipping

import com.amazon.awslogsdataaccesslayer.connectors.spark.LogsTable
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or, Predicate}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.qualifyTableName

/**
Expand Down Expand Up @@ -57,6 +62,46 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
} else {
filter
}
case filter @ Filter(
condition: Predicate,
relation @ DataSourceV2Relation(table, _, Some(catalog), Some(identifier), _))
if hasNoDisjunction(condition) &&
// Check if query plan already rewritten
table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasFileIndexScan() =>
val index = flint.describeIndex(getIndexName(catalog, identifier))
if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) {
val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex]
val indexFilter = rewriteToIndexFilter(skippingIndex, condition)
/*
* Replace original LogsTable with a new one with file index scan:
* Filter(a=b)
* |- DataSourceV2Relation(A)
* |- LogsTable <== replaced with a new LogsTable with file index scan
*/
if (indexFilter.isDefined) {
val indexScan = flint.queryIndex(skippingIndex.name())
val selectFileIndexScan =
// Non hybrid scan
// TODO: refactor common logic with file-based skipping index
indexScan
.filter(new Column(indexFilter.get))
.select(FILE_PATH_COLUMN)

// Construct LogsTable with file index scan
// It will build scan operator using log file ids collected from file index scan
val logsTable = table.asInstanceOf[LogsTable]
val newTable = new LogsTable(
logsTable.schema(),
logsTable.options(),
selectFileIndexScan,
logsTable.processedFields())
filter.copy(child = relation.copy(table = newTable))
} else {
filter
}
} else {
filter
}
}

private def getIndexName(table: CatalogTable): String = {
Expand All @@ -67,6 +112,11 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
getSkippingIndexName(qualifiedTableName)
}

private def getIndexName(catalog: CatalogPlugin, identifier: Identifier): String = {
val qualifiedTableName = s"${catalog.name}.${identifier}"
getSkippingIndexName(qualifiedTableName)
}

private def hasNoDisjunction(condition: Expression): Boolean = {
condition.collectFirst { case Or(_, _) =>
true
Expand Down

0 comments on commit 0351f40

Please sign in to comment.