diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5a67fc79eff0b4f444fa0ac5154fddcf4b12dee6..53779df3d9c0030f98221ea7a377c79d8b9f7d09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2316,7 +2316,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def unpersist(blocking: Boolean): this.type = { - sparkSession.sharedState.cacheManager.tryUncacheQuery(this, blocking) + sparkSession.sharedState.cacheManager.uncacheQuery(this, blocking) this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 4e95754e9bef7b04b0a417ccdd2f31ed73dafbf6..de2503a87ab7d6e5de54783e29918c59c37b7602 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -104,22 +104,11 @@ private[sql] class CacheManager extends Logging { } } - /** Removes the data for the given [[Dataset]] from the cache */ - private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock { - val planToCache = query.queryExecution.analyzed - val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) - require(dataIndex >= 0, s"Table $query is not cached.") - cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) - cachedData.remove(dataIndex) - } - /** - * Tries to remove the data for the given [[Dataset]] from the cache - * if it's cached + * Tries to remove the data for the given [[Dataset]] from the cache. + * No operation, if it's already uncached. */ - private[sql] def tryUncacheQuery( - query: Dataset[_], - blocking: Boolean = true): Boolean = writeLock { + private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Boolean = writeLock { val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) val found = dataIndex >= 0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 3e5eed2efa76b0e41932ea6f37d592da5e2153cd..5332366d242a8768f69e330956fefd12d58bafee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -53,7 +53,7 @@ case class CacheTableCommand( case class UncacheTableCommand(tableName: String) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.table(tableName).unpersist(blocking = false) + sparkSession.catalog.uncacheTable(tableName) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 5fd0b83cf0a5cdb013b1384ee7412c6218bcb3df..fc00912bf9f59e1456ad0b010744438772722f62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -201,7 +201,7 @@ case class DropTableCommand( case _ => }) try { - sparkSession.sharedState.cacheManager.tryUncacheQuery( + sparkSession.sharedState.cacheManager.uncacheQuery( sparkSession.table(tableName.quotedString)) } catch { case NonFatal(e) => log.warn(e.toString, e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 90db7853329f0ea8de70eb66ad10847da37998be..58bb5cdca9910bae585a94a4441ddb2361a651b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -374,7 +374,7 @@ case class TruncateTableCommand( spark.sessionState.invalidateTable(tableName.unquotedString) // Also try to drop the contents of the table from the columnar cache try { - spark.sharedState.cacheManager.tryUncacheQuery(spark.table(tableName.quotedString)) + spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString)) } catch { case NonFatal(e) => log.warn(s"Exception when attempting to uncache table '$tableName'", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index f42fd174b262b41ae75b757c08dd5ac69500c762..601334b97add39c5ff90b39fa471b1629dd6059a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -292,7 +292,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def dropTempView(viewName: String): Unit = { - sparkSession.sharedState.cacheManager.tryUncacheQuery(sparkSession.table(viewName)) + sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName)) sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true) } @@ -323,7 +323,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def uncacheTable(tableName: String): Unit = { - sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName)) + sparkSession.sharedState.cacheManager.uncacheQuery(query = sparkSession.table(tableName)) } /** @@ -367,7 +367,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { // TODO: Use uncacheTable once it supports database name. val df = Dataset.ofRows(sparkSession, logicalPlan) // Uncache the logicalPlan. - sparkSession.sharedState.cacheManager.tryUncacheQuery(df, blocking = true) + sparkSession.sharedState.cacheManager.uncacheQuery(df, blocking = true) // Cache it again. sparkSession.sharedState.cacheManager.cacheQuery(df, Some(tableIdent.table)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 3306ac42a3650a594407b429a1a0a69037c8e9f5..d7df18ae1c42d5d85f8555efba5ba5461e0ae7b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -186,12 +186,6 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assertCached(spark.table("testData"), 0) } - test("correct error on uncache of non-cached table") { - intercept[IllegalArgumentException] { - spark.catalog.uncacheTable("testData") - } - } - test("SELECT star from cached table") { sql("SELECT * FROM testData").createOrReplaceTempView("selectStar") spark.catalog.cacheTable("selectStar") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 5121440f067bd68752c110081bed8557cb07fd66..e35a71917fbdaf594475f10c2b808f1694152974 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -20,12 +20,14 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.storage.RDDBlockId import org.apache.spark.util.Utils -class CachedTableSuite extends QueryTest with TestHiveSingleton { +class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import hiveContext._ def rddIdOf(tableName: String): Int = { @@ -95,9 +97,23 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { sql("DROP TABLE IF EXISTS nonexistantTable") } - test("correct error on uncache of non-cached table") { - intercept[IllegalArgumentException] { - spark.catalog.uncacheTable("src") + test("correct error on uncache of nonexistant tables") { + intercept[NoSuchTableException] { + spark.catalog.uncacheTable("nonexistantTable") + } + intercept[NoSuchTableException] { + sql("UNCACHE TABLE nonexistantTable") + } + } + + test("no error on uncache of non-cached table") { + val tableName = "newTable" + withTable(tableName) { + sql(s"CREATE TABLE $tableName(a INT)") + // no error will be reported in the following three ways to uncache a table. + spark.catalog.uncacheTable(tableName) + sql("UNCACHE TABLE newTable") + sparkSession.table(tableName).unpersist() } }