diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index dbf933f28a784a8187789495baa11ea3b075e442..d50882cb1917eb5988bf04d0a5c5a17fcf6dd2a1 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,8 @@ object MimaExcludes { // Exclude rules for 2.3.x lazy val v23excludes = v22excludes ++ Seq( + // [SPARK-20495][SQL] Add StorageLevel to cacheTable API + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable") ) // Exclude rules for 2.2.x diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 7e5da012f84ca0f506af4b316a81b8486cfec4b2..ab81725def3f42ba0a1e8be03bd6cfd2300f94b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset} import org.apache.spark.sql.types.StructType - +import org.apache.spark.storage.StorageLevel /** * Catalog interface for Spark. To access this, use `SparkSession.catalog`. @@ -476,6 +476,18 @@ abstract class Catalog { */ def cacheTable(tableName: String): Unit + /** + * Caches the specified table with the given storage level. + * + * @param tableName is either a qualified or unqualified name that designates a table/view. + * If no database identifier is provided, it refers to a temporary view or + * a table/view in the current database. + * @param storageLevel storage level to cache table. + * @since 2.3.0 + */ + def cacheTable(tableName: String, storageLevel: StorageLevel): Unit + + /** * Removes the specified table from the in-memory cache. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 0b8e53868c999def79a87cb86d41ec6f02d10ad7..e1049c665a4177687ef26244d31b1616fd6faab0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} import org.apache.spark.sql.types.StructType +import org.apache.spark.storage.StorageLevel + /** @@ -419,6 +421,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { sparkSession.sharedState.cacheManager.cacheQuery(sparkSession.table(tableName), Some(tableName)) } + /** + * Caches the specified table or view with the given storage level. + * + * @group cachemgmt + * @since 2.3.0 + */ + override def cacheTable(tableName: String, storageLevel: StorageLevel): Unit = { + sparkSession.sharedState.cacheManager.cacheQuery( + sparkSession.table(tableName), Some(tableName), storageLevel) + } + /** * Removes the specified table or view from the in-memory cache. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 8f9c52cb1e031062e0cd75379879eb1f8a382e60..bc641fd280a158edfea04eca2e776342efcb58d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.StructType +import org.apache.spark.storage.StorageLevel /** @@ -535,4 +536,11 @@ class CatalogSuite .createTempView("fork_table", Range(1, 2, 3, 4), overrideIfExists = true) assert(spark.catalog.listTables().collect().map(_.name).toSet == Set()) } + + test("cacheTable with storage level") { + createTempTable("my_temp_table") + spark.catalog.cacheTable("my_temp_table", StorageLevel.DISK_ONLY) + assert(spark.table("my_temp_table").storageLevel == StorageLevel.DISK_ONLY) + } + }