Skip to content

Commit

Permalink
avoid double normalization
Browse files Browse the repository at this point in the history
  • Loading branch information
anchovYu committed Aug 29, 2024
1 parent ebb1975 commit aab2f91
Showing 1 changed file with 17 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
query: Dataset[_],
tableName: Option[String],
storageLevel: StorageLevel): Unit = {
cacheQueryInternal(query.sparkSession, query.queryExecution.normalized, tableName, storageLevel)
cacheQueryInternal(
query.sparkSession,
query.queryExecution.analyzed,
query.queryExecution.normalized,
tableName,
storageLevel
)
}

/**
Expand All @@ -107,34 +113,37 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
tableName: Option[String],
storageLevel: StorageLevel): Unit = {
val normalized = QueryExecution.normalize(spark, planToCache)
cacheQueryInternal(spark, normalized, tableName, storageLevel)
cacheQueryInternal(spark, planToCache, normalized, tableName, storageLevel)
}

// The `planToCache` should have been normalized.
// The `normalizedPlan` should have been normalized. It is the cache key.
private def cacheQueryInternal(
spark: SparkSession,
planToCache: LogicalPlan,
unnormalizedPlan: LogicalPlan,
normalizedPlan: LogicalPlan,
tableName: Option[String],
storageLevel: StorageLevel): Unit = {
if (storageLevel == StorageLevel.NONE) {
// Do nothing for StorageLevel.NONE since it will not actually cache any data.
} else if (lookupCachedDataInternal(planToCache).nonEmpty) {
} else if (lookupCachedDataInternal(normalizedPlan).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
val inMemoryRelation = sessionWithConfigsOff.withActive {
val qe = sessionWithConfigsOff.sessionState.executePlan(planToCache)
// it creates query execution from unnormalizedPlan plan to avoid multiple normalization.
val qe = sessionWithConfigsOff.sessionState.executePlan(unnormalizedPlan)
InMemoryRelation(
storageLevel,
qe,
tableName)
}

this.synchronized {
if (lookupCachedDataInternal(planToCache).nonEmpty) {
if (lookupCachedDataInternal(normalizedPlan).nonEmpty) {
logWarning("Data has already been cached.")
} else {
val cd = CachedData(planToCache, inMemoryRelation)
// the cache key is the normalized plan
val cd = CachedData(normalizedPlan, inMemoryRelation)
cachedData = cd +: cachedData
CacheManager.logCacheOperation(log"Added Dataframe cache entry:" +
log"${MDC(DATAFRAME_CACHE_ENTRY, cd)}")
Expand Down

0 comments on commit aab2f91

Please sign in to comment.