Skip to content

Commit

Permalink
[SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshT…
Browse files Browse the repository at this point in the history
…able

## What changes were proposed in this pull request?

Tables in the catalog cache are not invalidated once their statistics are updated. As a consequence, existing sessions will use the cached information even though it is not valid anymore. Consider and an example below.

```
// step 1
spark.range(100).write.saveAsTable("tab1")
// step 2
spark.sql("analyze table tab1 compute statistics")
// step 3
spark.sql("explain cost select distinct * from tab1").show(false)
// step 4
spark.range(100).write.mode("append").saveAsTable("tab1")
// step 5
spark.sql("explain cost select distinct * from tab1").show(false)
```

After step 3, the table will be present in the catalog relation cache. Step 4 will correctly update the metadata inside the catalog but will NOT invalidate the cache.

By the way, ``spark.sql("analyze table tab1 compute statistics")`` between step 3 and step 4 would also solve the problem.

## How was this patch tested?

Current and additional unit tests.

Author: aokolnychyi <anton.okolnychyi@sap.com>

Closes #19252 from aokolnychyi/spark-21969.
  • Loading branch information
aokolnychyi authored and gatorsmile committed Sep 19, 2017
1 parent d5aefa8 commit ee13f3e
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(tableIdentifier)
externalCatalog.alterTableStats(db, table, newStats)
// Invalidate the table relation cache
refreshTable(identifier)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ case class AnalyzeColumnCommand(

sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics))

// Refresh the cached data source table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)

Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ case class AnalyzeTableCommand(
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
if (newStats.isDefined) {
sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
// Refresh the cached data source table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)
}

Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
assert(fetched1.get.sizeInBytes == 0)
assert(fetched1.get.colStats.size == 2)

// table lookup will make the table cached
spark.table(table)
assert(isTableInCatalogCache(table))

// insert into command
sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
if (autoUpdate) {
Expand All @@ -270,9 +274,78 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
} else {
checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
}

// check that tableRelationCache inside the catalog was invalidated after insert
assert(!isTableInCatalogCache(table))
}
}
}
}

test("invalidation of tableRelationCache after inserts") {
val table = "invalidate_catalog_cache_table"
Seq(false, true).foreach { autoUpdate =>
withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
withTable(table) {
spark.range(100).write.saveAsTable(table)
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
spark.table(table)
val initialSizeInBytes = getTableFromCatalogCache(table).stats.sizeInBytes
spark.range(100).write.mode(SaveMode.Append).saveAsTable(table)
spark.table(table)
assert(getTableFromCatalogCache(table).stats.sizeInBytes == 2 * initialSizeInBytes)
}
}
}
}

test("invalidation of tableRelationCache after table truncation") {
val table = "invalidate_catalog_cache_table"
Seq(false, true).foreach { autoUpdate =>
withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
withTable(table) {
spark.range(100).write.saveAsTable(table)
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
spark.table(table)
sql(s"TRUNCATE TABLE $table")
spark.table(table)
assert(getTableFromCatalogCache(table).stats.sizeInBytes == 0)
}
}
}
}

test("invalidation of tableRelationCache after alter table add partition") {
val table = "invalidate_catalog_cache_table"
Seq(false, true).foreach { autoUpdate =>
withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
withTempDir { dir =>
withTable(table) {
val path = dir.getCanonicalPath
sql(s"""
|CREATE TABLE $table (col1 int, col2 int)
|USING PARQUET
|PARTITIONED BY (col2)
|LOCATION '${dir.toURI}'""".stripMargin)
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
spark.table(table)
assert(getTableFromCatalogCache(table).stats.sizeInBytes == 0)
spark.catalog.recoverPartitions(table)
val df = Seq((1, 2), (1, 2)).toDF("col2", "col1")
df.write.parquet(s"$path/col2=1")
sql(s"ALTER TABLE $table ADD PARTITION (col2=1) LOCATION '${dir.toURI}'")
spark.table(table)
val cachedTable = getTableFromCatalogCache(table)
val cachedTableSizeInBytes = cachedTable.stats.sizeInBytes
val defaultSizeInBytes = conf.defaultSizeInBytes
if (autoUpdate) {
assert(cachedTableSizeInBytes != defaultSizeInBytes && cachedTableSizeInBytes > 0)
} else {
assert(cachedTableSizeInBytes == defaultSizeInBytes)
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import java.sql.{Date, Timestamp}
import scala.collection.mutable
import scala.util.Random

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.StaticSQLConf
Expand Down Expand Up @@ -85,6 +85,16 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
}

def getTableFromCatalogCache(tableName: String): LogicalPlan = {
val catalog = spark.sessionState.catalog
val qualifiedTableName = QualifiedTableName(catalog.getCurrentDatabase, tableName)
catalog.getCachedTable(qualifiedTableName)
}

def isTableInCatalogCache(tableName: String): Boolean = {
getTableFromCatalogCache(tableName) != null
}

def getCatalogStatistics(tableName: String): CatalogStatistics = {
getCatalogTable(tableName).stats.get
}
Expand Down

0 comments on commit ee13f3e

Please sign in to comment.