From 9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130 Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian.cs.zju@gmail.com>
Date: Sun, 2 Nov 2014 16:00:24 -0800
Subject: [PATCH] [SPARK-2189][SQL] Adds dropTempTable API

This PR adds an API for unregistering temporary tables. If a temporary table has been cached before, it's unpersisted as well.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #3039 from liancheng/unregister-temp-table and squashes the following commits:

54ae99f [Cheng Lian] Fixes Scala styling issue
1948c14 [Cheng Lian] Removes the unpersist argument
aca41d3 [Cheng Lian] Ensures thread safety
7d4fb2b [Cheng Lian] Adds unregisterTempTable API
---
 .../org/apache/spark/sql/CacheManager.scala   | 13 ++++++++++++
 .../org/apache/spark/sql/SQLContext.scala     | 13 ++++++++++++
 .../apache/spark/sql/CachedTableSuite.scala   | 20 +++++++++++++++++++
 3 files changed, 46 insertions(+)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index 3ced11a5e6..2e7abac1f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -103,6 +103,19 @@ private[sql] trait CacheManager {
     cachedData.remove(dataIndex)
   }
 
+  /** Tries to remove the data for the given SchemaRDD from the cache if it's cached */
+  private[sql] def tryUncacheQuery(
+      query: SchemaRDD,
+      blocking: Boolean = true): Boolean = writeLock {
+    val planToCache = query.queryExecution.analyzed
+    val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
+    val found = dataIndex >= 0
+    if (found) {
+      cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+      cachedData.remove(dataIndex)
+    }
+    found
+  }
 
   /** Optionally returns cached data for the given SchemaRDD */
   private[sql] def lookupCachedData(query: SchemaRDD): Option[CachedData] = readLock {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 4cded98c80..3cf6af5f7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -276,6 +276,19 @@ class SQLContext(@transient val sparkContext: SparkContext)
     catalog.registerTable(None, tableName, rdd.queryExecution.logical)
   }
 
+  /**
+   * Drops the temporary table with the given table name in the catalog. If the table has been
+   * cached/persisted before, it's also unpersisted.
+   *
+   * @param tableName the name of the table to be unregistered.
+   *
+   * @group userf
+   */
+  def dropTempTable(tableName: String): Unit = {
+    tryUncacheQuery(table(tableName))
+    catalog.unregisterTable(None, tableName)
+  }
+
   /**
    * Executes a SQL query using Spark, returning the result as a SchemaRDD.  The dialect that is
    * used for SQL parsing can be configured with 'spark.sql.dialect'.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 44a2961b27..765fa82776 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -231,4 +231,24 @@ class CachedTableSuite extends QueryTest {
         assert(cached.statistics.sizeInBytes === actualSizeInBytes)
     }
   }
+
+  test("Drops temporary table") {
+    testData.select('key).registerTempTable("t1")
+    table("t1")
+    dropTempTable("t1")
+    assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found"))
+  }
+
+  test("Drops cached temporary table") {
+    testData.select('key).registerTempTable("t1")
+    testData.select('key).registerTempTable("t2")
+    cacheTable("t1")
+
+    assert(isCached("t1"))
+    assert(isCached("t2"))
+
+    dropTempTable("t1")
+    assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found"))
+    assert(!isCached("t2"))
+  }
 }
-- 
GitLab