From df4ea6614d709ee66f1ceb966df6216b125b8ea1 Mon Sep 17 00:00:00 2001 From: gatorsmile <gatorsmile@gmail.com> Date: Tue, 14 Jun 2016 11:44:37 -0700 Subject: [PATCH] [SPARK-15864][SQL] Fix Inconsistent Behaviors when Uncaching Non-cached Tables #### What changes were proposed in this pull request? To uncache a table, we have three different ways: - _SQL interface_: `UNCACHE TABLE` - _DataSet API_: `sparkSession.catalog.uncacheTable` - _DataSet API_: `sparkSession.table(tableName).unpersist()` When the table is not cached, - _SQL interface_: `UNCACHE TABLE non-cachedTable` -> **no error message** - _Dataset API_: `sparkSession.catalog.uncacheTable("non-cachedTable")` -> **report a strange error message:** ```requirement failed: Table [a: int] is not cached``` - _Dataset API_: `sparkSession.table("non-cachedTable").unpersist()` -> **no error message** This PR will make them consistent. No operation if the table has already been uncached. In addition, this PR also removes `uncacheQuery` and renames `tryUncacheQuery` to `uncacheQuery`, and documents it that it's noop if the table has already been uncached #### How was this patch tested? Improved the existing test case for verifying the cases when the table has not been cached. Also added test cases for verifying the cases when the table does not exist Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #13593 from gatorsmile/uncacheNonCachedTable. --- .../scala/org/apache/spark/sql/Dataset.scala | 2 +- .../spark/sql/execution/CacheManager.scala | 17 +++---------- .../spark/sql/execution/command/cache.scala | 2 +- .../spark/sql/execution/command/ddl.scala | 2 +- .../spark/sql/execution/command/tables.scala | 2 +- .../spark/sql/internal/CatalogImpl.scala | 6 ++--- .../apache/spark/sql/CachedTableSuite.scala | 6 ----- .../spark/sql/hive/CachedTableSuite.scala | 24 +++++++++++++++---- 8 files changed, 30 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5a67fc79ef..53779df3d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2316,7 +2316,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def unpersist(blocking: Boolean): this.type = { - sparkSession.sharedState.cacheManager.tryUncacheQuery(this, blocking) + sparkSession.sharedState.cacheManager.uncacheQuery(this, blocking) this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 4e95754e9b..de2503a87a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -104,22 +104,11 @@ private[sql] class CacheManager extends Logging { } } - /** Removes the data for the given [[Dataset]] from the cache */ - private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock { - val planToCache = query.queryExecution.analyzed - val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) - require(dataIndex >= 0, s"Table $query is not cached.") - cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) - cachedData.remove(dataIndex) - } - /** - * Tries to remove the data for the given [[Dataset]] from the cache - * if it's cached + * Tries to remove the data for the given [[Dataset]] from the cache. + * No operation, if it's already uncached. */ - private[sql] def tryUncacheQuery( - query: Dataset[_], - blocking: Boolean = true): Boolean = writeLock { + private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Boolean = writeLock { val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) val found = dataIndex >= 0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 3e5eed2efa..5332366d24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -53,7 +53,7 @@ case class CacheTableCommand( case class UncacheTableCommand(tableName: String) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.table(tableName).unpersist(blocking = false) + sparkSession.catalog.uncacheTable(tableName) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 5fd0b83cf0..fc00912bf9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -201,7 +201,7 @@ case class DropTableCommand( case _ => }) try { - sparkSession.sharedState.cacheManager.tryUncacheQuery( + sparkSession.sharedState.cacheManager.uncacheQuery( sparkSession.table(tableName.quotedString)) } catch { case NonFatal(e) => log.warn(e.toString, e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 90db785332..58bb5cdca9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -374,7 +374,7 @@ case class TruncateTableCommand( spark.sessionState.invalidateTable(tableName.unquotedString) // Also try to drop the contents of the table from the columnar cache try { - spark.sharedState.cacheManager.tryUncacheQuery(spark.table(tableName.quotedString)) + spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString)) } catch { case NonFatal(e) => log.warn(s"Exception when attempting to uncache table '$tableName'", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index f42fd174b2..601334b97a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -292,7 +292,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def dropTempView(viewName: String): Unit = { - sparkSession.sharedState.cacheManager.tryUncacheQuery(sparkSession.table(viewName)) + sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName)) sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true) } @@ -323,7 +323,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def uncacheTable(tableName: String): Unit = { - sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName)) + sparkSession.sharedState.cacheManager.uncacheQuery(query = sparkSession.table(tableName)) } /** @@ -367,7 +367,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { // TODO: Use uncacheTable once it supports database name. val df = Dataset.ofRows(sparkSession, logicalPlan) // Uncache the logicalPlan. - sparkSession.sharedState.cacheManager.tryUncacheQuery(df, blocking = true) + sparkSession.sharedState.cacheManager.uncacheQuery(df, blocking = true) // Cache it again. sparkSession.sharedState.cacheManager.cacheQuery(df, Some(tableIdent.table)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 3306ac42a3..d7df18ae1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -186,12 +186,6 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assertCached(spark.table("testData"), 0) } - test("correct error on uncache of non-cached table") { - intercept[IllegalArgumentException] { - spark.catalog.uncacheTable("testData") - } - } - test("SELECT star from cached table") { sql("SELECT * FROM testData").createOrReplaceTempView("selectStar") spark.catalog.cacheTable("selectStar") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 5121440f06..e35a71917f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -20,12 +20,14 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.storage.RDDBlockId import org.apache.spark.util.Utils -class CachedTableSuite extends QueryTest with TestHiveSingleton { +class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import hiveContext._ def rddIdOf(tableName: String): Int = { @@ -95,9 +97,23 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { sql("DROP TABLE IF EXISTS nonexistantTable") } - test("correct error on uncache of non-cached table") { - intercept[IllegalArgumentException] { - spark.catalog.uncacheTable("src") + test("correct error on uncache of nonexistant tables") { + intercept[NoSuchTableException] { + spark.catalog.uncacheTable("nonexistantTable") + } + intercept[NoSuchTableException] { + sql("UNCACHE TABLE nonexistantTable") + } + } + + test("no error on uncache of non-cached table") { + val tableName = "newTable" + withTable(tableName) { + sql(s"CREATE TABLE $tableName(a INT)") + // no error will be reported in the following three ways to uncache a table. + spark.catalog.uncacheTable(tableName) + sql("UNCACHE TABLE newTable") + sparkSession.table(tableName).unpersist() } } -- GitLab