Skip to content

Commit

Permalink
idomatic scala
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Jan 27, 2025
1 parent 2a8a6fc commit d6da2ce
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,46 +84,47 @@ class PartitionStatsIndexSupport(spark: SparkSession,
def prunePartitions(fileIndex: HoodieFileIndex,
queryFilters: Seq[Expression],
queryReferencedColumns: Seq[String]): Option[Set[String]] = {
if (queryFilters.nonEmpty && containsAnySqlFunction(queryFilters)) {
// If the query contains any SQL function, skip the pruning.
// Expression Index will be used in such cases, if available.
Option.empty
}
if (isIndexAvailable && queryFilters.nonEmpty && queryReferencedColumns.nonEmpty) {
val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, inMemoryProjectionThreshold)
loadTransposed(queryReferencedColumns, readInMemory, Option.empty, Option.empty) {
transposedPartitionStatsDF => {
try {
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
val allPartitions = transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet
if (allPartitions.nonEmpty) {
// PARTITION_STATS index exist for all or some columns in the filters
// NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has covered the case where the
// column in a filter does not have the stats available, by making sure such a
// filter does not prune any partition.
val indexSchema = transposedPartitionStatsDF.schema
val indexedCols: Seq[String] = metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
// to be fixed. HUDI-8836.
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols = indexedCols)).reduce(And)
if (indexFilter.equals(TrueLiteral)) {
// if there are any non indexed cols or we can't translate source expr, we can prune partitions based on col stats lookup.
Some(allPartitions)
if (containsAnySqlFunction(queryFilters)) {
// If the query contains any SQL function, skip the pruning.
// Expression Index will be used in such cases, if available.
Option.empty
} else {
val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, inMemoryProjectionThreshold)
loadTransposed(queryReferencedColumns, readInMemory, Option.empty, Option.empty) {
transposedPartitionStatsDF => {
try {
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
val allPartitions = transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet
if (allPartitions.nonEmpty) {
// PARTITION_STATS index exist for all or some columns in the filters
// NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has covered the case where the
// column in a filter does not have the stats available, by making sure such a
// filter does not prune any partition.
val indexSchema = transposedPartitionStatsDF.schema
val indexedCols: Seq[String] = metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
// to be fixed. HUDI-8836.
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols = indexedCols)).reduce(And)
if (indexFilter.equals(TrueLiteral)) {
// if there are any non indexed cols or we can't translate source expr, we can prune partitions based on col stats lookup.
Some(allPartitions)
} else {
Some(transposedPartitionStatsDF.where(new Column(indexFilter))
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet)
}
} else {
Some(transposedPartitionStatsDF.where(new Column(indexFilter))
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet)
// PARTITION_STATS index does not exist for any column in the filters, skip the pruning
Option.empty
}
} else {
// PARTITION_STATS index does not exist for any column in the filters, skip the pruning
Option.empty
} finally {
transposedPartitionStatsDF.unpersist()
}
} finally {
transposedPartitionStatsDF.unpersist()
}
}
}
Expand All @@ -142,13 +143,13 @@ class PartitionStatsIndexSupport(spark: SparkSession,
case _: ParseToDate => true
case _: ParseToTimestamp => true
case _: DateAdd => true
case _: DateSub => true
case _: DateSub => true
case _: Substring => true
case _: StringTrim => true
case _: StringTrimLeft => true
case _: StringTrimRight => true
case _: RegExpReplace => true
case _: RegExpExtract => true
case _: RegExpReplace => true
case _: RegExpExtract => true
case _: StringSplit => true
case _ => false
}.isDefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
/**
* Test expression index partition pruning with partition stats.
*/
@Disabled("HUDI-8919")
@Test
def testPartitionPruningWithPartitionStats(): Unit = {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Expand Down

0 comments on commit d6da2ce

Please sign in to comment.