From 9064f1b04461513a147aeb8179471b05595ddbc4 Mon Sep 17 00:00:00 2001
From: madhu <phatak.dev@gmail.com>
Date: Fri, 5 May 2017 22:44:03 +0800
Subject: [PATCH] [SPARK-20495][SQL][CORE] Add StorageLevel to cacheTable API

## What changes were proposed in this pull request?
Currently cacheTable API only supports MEMORY_AND_DISK. This PR adds additional API to take different storage levels.
## How was this patch tested?
unit tests

Author: madhu <phatak.dev@gmail.com>

Closes #17802 from phatak-dev/cacheTableAPI.
---
 project/MimaExcludes.scala                         |  2 ++
 .../org/apache/spark/sql/catalog/Catalog.scala     | 14 +++++++++++++-
 .../apache/spark/sql/internal/CatalogImpl.scala    | 13 +++++++++++++
 .../apache/spark/sql/internal/CatalogSuite.scala   |  8 ++++++++
 4 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index dbf933f28a..d50882cb19 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,8 @@ object MimaExcludes {
 
   // Exclude rules for 2.3.x
   lazy val v23excludes = v22excludes ++ Seq(
+    // [SPARK-20495][SQL] Add StorageLevel to cacheTable API
+    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable")
   )
 
   // Exclude rules for 2.2.x
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 7e5da012f8..ab81725def 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.{Experimental, InterfaceStability}
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
 import org.apache.spark.sql.types.StructType
-
+import org.apache.spark.storage.StorageLevel
 
 /**
  * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
@@ -476,6 +476,18 @@ abstract class Catalog {
    */
   def cacheTable(tableName: String): Unit
 
+  /**
+   * Caches the specified table with the given storage level.
+   *
+   * @param tableName is either a qualified or unqualified name that designates a table/view.
+   *                  If no database identifier is provided, it refers to a temporary view or
+   *                  a table/view in the current database.
+   * @param storageLevel storage level to cache table.
+   * @since 2.3.0
+   */
+  def cacheTable(tableName: String, storageLevel: StorageLevel): Unit
+
+
   /**
    * Removes the specified table from the in-memory cache.
    *
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 0b8e53868c..e1049c665a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.StorageLevel
+
 
 
 /**
@@ -419,6 +421,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
     sparkSession.sharedState.cacheManager.cacheQuery(sparkSession.table(tableName), Some(tableName))
   }
 
+  /**
+   * Caches the specified table or view with the given storage level.
+   *
+   * @group cachemgmt
+   * @since 2.3.0
+   */
+  override def cacheTable(tableName: String, storageLevel: StorageLevel): Unit = {
+    sparkSession.sharedState.cacheManager.cacheQuery(
+      sparkSession.table(tableName), Some(tableName), storageLevel)
+  }
+
   /**
    * Removes the specified table or view from the in-memory cache.
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 8f9c52cb1e..bc641fd280 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.StorageLevel
 
 
 /**
@@ -535,4 +536,11 @@ class CatalogSuite
       .createTempView("fork_table", Range(1, 2, 3, 4), overrideIfExists = true)
     assert(spark.catalog.listTables().collect().map(_.name).toSet == Set())
   }
+
+  test("cacheTable with storage level") {
+    createTempTable("my_temp_table")
+    spark.catalog.cacheTable("my_temp_table", StorageLevel.DISK_ONLY)
+    assert(spark.table("my_temp_table").storageLevel == StorageLevel.DISK_ONLY)
+  }
+
 }
-- 
GitLab