Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6969][SQL] Refresh the cached table when REFRESH TABLE is used #5583

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,24 @@ private[sql] case class RefreshTable(databaseName: String, tableName: String)
extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
// Refresh the given table's metadata first.
sqlContext.catalog.refreshTable(databaseName, tableName)

// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a TODO here to clean this up when uncacheTable supports databases correctly.

// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
// Create a data frame to represent the table.
// TODO: Use uncacheTable once it supports database name.
val df = DataFrame(sqlContext, logicalPlan)
// Uncache the logicalPlan.
sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
// Cache it again.
sqlContext.cacheManager.cacheQuery(df, Some(tableName))
}

Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.spark.sql.hive

import java.io.File

import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
import org.apache.spark.sql.{SaveMode, AnalysisException, DataFrame, QueryTest}
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Utils

class CachedTableSuite extends QueryTest {

Expand Down Expand Up @@ -155,4 +158,49 @@ class CachedTableSuite extends QueryTest {
assertCached(table("udfTest"))
uncacheTable("udfTest")
}

test("REFRESH TABLE also needs to recache the data (data source tables)") {
val tempPath: File = Utils.createTempDir()
tempPath.delete()
table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite)
sql("DROP TABLE IF EXISTS refreshTable")
createExternalTable("refreshTable", tempPath.toString, "parquet")
checkAnswer(
table("refreshTable"),
table("src").collect())
// Cache the table.
sql("CACHE TABLE refreshTable")
assertCached(table("refreshTable"))
// Append new data.
table("src").save(tempPath.toString, "parquet", SaveMode.Append)
// We are still using the old data.
assertCached(table("refreshTable"))
checkAnswer(
table("refreshTable"),
table("src").collect())
// Refresh the table.
sql("REFRESH TABLE refreshTable")
// We are using the new data.
assertCached(table("refreshTable"))
checkAnswer(
table("refreshTable"),
table("src").unionAll(table("src")).collect())

// Drop the table and create it again.
sql("DROP TABLE refreshTable")
createExternalTable("refreshTable", tempPath.toString, "parquet")
// It is not cached.
assert(!isCached("refreshTable"), "refreshTable should not be cached.")
// Refresh the table. REFRESH TABLE command should not make a uncached
// table cached.
sql("REFRESH TABLE refreshTable")
checkAnswer(
table("refreshTable"),
table("src").unionAll(table("src")).collect())
// It is not cached.
assert(!isCached("refreshTable"), "refreshTable should not be cached.")

sql("DROP TABLE refreshTable")
Utils.deleteRecursively(tempPath)
}
}