From 6265cba00f6141575b4be825735d77d4cea500ab Mon Sep 17 00:00:00 2001
From: Yin Huai <yhuai@databricks.com>
Date: Tue, 21 Apr 2015 14:48:42 -0700
Subject: [PATCH] [SPARK-6969][SQL] Refresh the cached table when REFRESH TABLE
 is used

https://issues.apache.org/jira/browse/SPARK-6969

Author: Yin Huai <yhuai@databricks.com>

Closes #5583 from yhuai/refreshTableRefreshDataCache and squashes the following commits:

1e5142b [Yin Huai] Add todo.
92b2498 [Yin Huai] Minor updates.
367df92 [Yin Huai] Recache data in the command of REFRESH TABLE.
---
 .../org/apache/spark/sql/sources/ddl.scala    | 17 +++++++
 .../spark/sql/hive/CachedTableSuite.scala     | 50 ++++++++++++++++++-
 2 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 2e861b84b7..78d494184e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -347,7 +347,24 @@ private[sql] case class RefreshTable(databaseName: String, tableName: String)
   extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
+    // Refresh the given table's metadata first.
     sqlContext.catalog.refreshTable(databaseName, tableName)
+
+    // If this table is cached as a InMemoryColumnarRelation, drop the original
+    // cached version and make the new version cached lazily.
+    val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
+    // Use lookupCachedData directly since RefreshTable also takes databaseName.
+    val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+    if (isCached) {
+      // Create a data frame to represent the table.
+      // TODO: Use uncacheTable once it supports database name.
+      val df = DataFrame(sqlContext, logicalPlan)
+      // Uncache the logicalPlan.
+      sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
+      // Cache it again.
+      sqlContext.cacheManager.cacheQuery(df, Some(tableName))
+    }
+
     Seq.empty[Row]
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index c188264072..fc6c3c3503 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.File
+
 import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
+import org.apache.spark.sql.{SaveMode, AnalysisException, DataFrame, QueryTest}
 import org.apache.spark.storage.RDDBlockId
+import org.apache.spark.util.Utils
 
 class CachedTableSuite extends QueryTest {
 
@@ -155,4 +158,49 @@ class CachedTableSuite extends QueryTest {
     assertCached(table("udfTest"))
     uncacheTable("udfTest")
   }
+
+  test("REFRESH TABLE also needs to recache the data (data source tables)") {
+    val tempPath: File = Utils.createTempDir()
+    tempPath.delete()
+    table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite)
+    sql("DROP TABLE IF EXISTS refreshTable")
+    createExternalTable("refreshTable", tempPath.toString, "parquet")
+    checkAnswer(
+      table("refreshTable"),
+      table("src").collect())
+    // Cache the table.
+    sql("CACHE TABLE refreshTable")
+    assertCached(table("refreshTable"))
+    // Append new data.
+    table("src").save(tempPath.toString, "parquet", SaveMode.Append)
+    // We are still using the old data.
+    assertCached(table("refreshTable"))
+    checkAnswer(
+      table("refreshTable"),
+      table("src").collect())
+    // Refresh the table.
+    sql("REFRESH TABLE refreshTable")
+    // We are using the new data.
+    assertCached(table("refreshTable"))
+    checkAnswer(
+      table("refreshTable"),
+      table("src").unionAll(table("src")).collect())
+
+    // Drop the table and create it again.
+    sql("DROP TABLE refreshTable")
+    createExternalTable("refreshTable", tempPath.toString, "parquet")
+    // It is not cached.
+    assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+    // Refresh the table. REFRESH TABLE command should not make a uncached
+    // table cached.
+    sql("REFRESH TABLE refreshTable")
+    checkAnswer(
+      table("refreshTable"),
+      table("src").unionAll(table("src")).collect())
+    // It is not cached.
+    assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+
+    sql("DROP TABLE refreshTable")
+    Utils.deleteRecursively(tempPath)
+  }
 }
-- 
GitLab