From 70bfb5c7282df84e76eba01f59bf1b8551583c33 Mon Sep 17 00:00:00 2001
From: Yin Huai <yhuai@databricks.com>
Date: Fri, 20 Feb 2015 16:20:02 +0800
Subject: [PATCH] [SPARK-5909][SQL] Add a clearCache command to Spark SQL's
 cache manager

JIRA: https://issues.apache.org/jira/browse/SPARK-5909

Author: Yin Huai <yhuai@databricks.com>

Closes #4694 from yhuai/clearCache and squashes the following commits:

397ecc4 [Yin Huai] Address comments.
a2702fc [Yin Huai] Update parser.
3a54506 [Yin Huai] add isEmpty to CacheManager.
6d14460 [Yin Huai] Python clearCache.
f7b8dbd [Yin Huai] Add clear cache command.
---
 python/pyspark/sql/context.py                    |  4 ++++
 .../org/apache/spark/sql/CacheManager.scala      |  6 ++++++
 .../scala/org/apache/spark/sql/SQLContext.scala  |  5 +++++
 .../org/apache/spark/sql/SparkSQLParser.scala    | 11 +++++++----
 .../apache/spark/sql/execution/commands.scala    | 15 +++++++++++++++
 .../org/apache/spark/sql/CachedTableSuite.scala  | 16 ++++++++++++++++
 6 files changed, 53 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 2e2309f103..3f168f718b 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -687,6 +687,10 @@ class SQLContext(object):
         """Removes the specified table from the in-memory cache."""
         self._ssql_ctx.uncacheTable(tableName)
 
+    def clearCache(self):
+        """Removes all cached tables from the in-memory cache. """
+        self._ssql_ctx.clearCache()
+
 
 class HiveContext(SQLContext):
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index f1949aa5dd..ca4a127120 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -71,11 +71,17 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
     }
   }
 
+  /** Clears all cached tables. */
   private[sql] def clearCache(): Unit = writeLock {
     cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
     cachedData.clear()
   }
 
+  /** Checks if the cache is empty. */
+  private[sql] def isEmpty: Boolean = readLock {
+    cachedData.isEmpty
+  }
+
   /**
    * Caches the data produced by the logical representation of the given schema rdd.  Unlike
    * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index a6cf3cd9dd..4bdaa02391 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -217,6 +217,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName)
 
+  /**
+   * Removes all cached tables from the in-memory cache.
+   */
+  def clearCache(): Unit = cacheManager.clearCache()
+
   // scalastyle:off
   // Disable style checker so "implicits" object can start with lowercase i
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
index 00e19da437..5921eaf5e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
@@ -23,7 +23,7 @@ import scala.util.parsing.combinator.RegexParsers
 import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{ShowTablesCommand, UncacheTableCommand, CacheTableCommand, SetCommand}
+import org.apache.spark.sql.execution._
 import org.apache.spark.sql.types.StringType
 
 
@@ -57,6 +57,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
 
   protected val AS      = Keyword("AS")
   protected val CACHE   = Keyword("CACHE")
+  protected val CLEAR   = Keyword("CLEAR")
   protected val IN      = Keyword("IN")
   protected val LAZY    = Keyword("LAZY")
   protected val SET     = Keyword("SET")
@@ -74,9 +75,11 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
     }
 
   private lazy val uncache: Parser[LogicalPlan] =
-    UNCACHE ~ TABLE ~> ident ^^ {
-      case tableName => UncacheTableCommand(tableName)
-    }
+    ( UNCACHE ~ TABLE ~> ident ^^ {
+        case tableName => UncacheTableCommand(tableName)
+      }
+    | CLEAR ~ CACHE ^^^ ClearCacheCommand
+    )
 
   private lazy val set: Parser[LogicalPlan] =
     SET ~> restInput ^^ {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 7c92e9fc88..a11232142d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -174,6 +174,21 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
   override def output: Seq[Attribute] = Seq.empty
 }
 
+/**
+ * :: DeveloperApi ::
+ * Clear all cached data from the in-memory cache.
+ */
+@DeveloperApi
+case object ClearCacheCommand extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext) = {
+    sqlContext.clearCache()
+    Seq.empty[Row]
+  }
+
+  override def output: Seq[Attribute] = Seq.empty
+}
+
 /**
  * :: DeveloperApi ::
  */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index e70e866fdb..c240f2be95 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -280,4 +280,20 @@ class CachedTableSuite extends QueryTest {
     assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found"))
     assert(!isCached("t2"))
   }
+
+  test("Clear all cache") {
+    sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
+    sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
+    cacheTable("t1")
+    cacheTable("t2")
+    clearCache()
+    assert(cacheManager.isEmpty)
+
+    sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
+    sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
+    cacheTable("t1")
+    cacheTable("t2")
+    sql("Clear CACHE")
+    assert(cacheManager.isEmpty)
+  }
 }
-- 
GitLab