Skip to content
Snippets Groups Projects
Commit 6265cba0 authored by Yin Huai's avatar Yin Huai Committed by Michael Armbrust
Browse files

[SPARK-6969][SQL] Refresh the cached table when REFRESH TABLE is used

https://issues.apache.org/jira/browse/SPARK-6969

Author: Yin Huai <yhuai@databricks.com>

Closes #5583 from yhuai/refreshTableRefreshDataCache and squashes the following commits:

1e5142b [Yin Huai] Add todo.
92b2498 [Yin Huai] Minor updates.
367df92 [Yin Huai] Recache data in the command of REFRESH TABLE.
parent 03fd9216
No related branches found
No related tags found
No related merge requests found
......@@ -347,7 +347,24 @@ private[sql] case class RefreshTable(databaseName: String, tableName: String)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
// Refresh the given table's metadata first.
sqlContext.catalog.refreshTable(databaseName, tableName)
// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
// Create a data frame to represent the table.
// TODO: Use uncacheTable once it supports database name.
val df = DataFrame(sqlContext, logicalPlan)
// Uncache the logicalPlan.
sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
// Cache it again.
sqlContext.cacheManager.cacheQuery(df, Some(tableName))
}
Seq.empty[Row]
}
}
......
......@@ -17,11 +17,14 @@
package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
import org.apache.spark.sql.{SaveMode, AnalysisException, DataFrame, QueryTest}
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Utils
class CachedTableSuite extends QueryTest {
......@@ -155,4 +158,49 @@ class CachedTableSuite extends QueryTest {
assertCached(table("udfTest"))
uncacheTable("udfTest")
}
test("REFRESH TABLE also needs to recache the data (data source tables)") {
val tempPath: File = Utils.createTempDir()
tempPath.delete()
table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite)
sql("DROP TABLE IF EXISTS refreshTable")
createExternalTable("refreshTable", tempPath.toString, "parquet")
checkAnswer(
table("refreshTable"),
table("src").collect())
// Cache the table.
sql("CACHE TABLE refreshTable")
assertCached(table("refreshTable"))
// Append new data.
table("src").save(tempPath.toString, "parquet", SaveMode.Append)
// We are still using the old data.
assertCached(table("refreshTable"))
checkAnswer(
table("refreshTable"),
table("src").collect())
// Refresh the table.
sql("REFRESH TABLE refreshTable")
// We are using the new data.
assertCached(table("refreshTable"))
checkAnswer(
table("refreshTable"),
table("src").unionAll(table("src")).collect())
// Drop the table and create it again.
sql("DROP TABLE refreshTable")
createExternalTable("refreshTable", tempPath.toString, "parquet")
// It is not cached.
assert(!isCached("refreshTable"), "refreshTable should not be cached.")
// Refresh the table. REFRESH TABLE command should not make a uncached
// table cached.
sql("REFRESH TABLE refreshTable")
checkAnswer(
table("refreshTable"),
table("src").unionAll(table("src")).collect())
// It is not cached.
assert(!isCached("refreshTable"), "refreshTable should not be cached.")
sql("DROP TABLE refreshTable")
Utils.deleteRecursively(tempPath)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment