From 008a5377d57ce6692eca4a41539fb27978b58e01 Mon Sep 17 00:00:00 2001
From: Andrew Or <andrew@databricks.com>
Date: Thu, 26 May 2016 19:01:41 -0700
Subject: [PATCH] [SPARK-15538][SPARK-15539][SQL] Truncate table fixes round 2

## What changes were proposed in this pull request?

Two more changes:
(1) Fix truncate table for data source tables (only for cases without `PARTITION`)
(2) Disallow truncating external tables or views

## How was this patch tested?

`DDLSuite`

Author: Andrew Or <andrew@databricks.com>

Closes #13315 from andrewor14/truncate-table.
---
 .../spark/sql/execution/command/tables.scala  | 78 ++++++++++++-------
 .../sql/execution/command/DDLSuite.scala      | 34 ++++++++
 2 files changed, 86 insertions(+), 26 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index bef4c9222c..e34beec33d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -285,41 +285,67 @@ case class TruncateTableCommand(
     tableName: TableIdentifier,
     partitionSpec: Option[TablePartitionSpec]) extends RunnableCommand {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val catalog = sparkSession.sessionState.catalog
+  override def run(spark: SparkSession): Seq[Row] = {
+    val catalog = spark.sessionState.catalog
     if (!catalog.tableExists(tableName)) {
       throw new AnalysisException(s"Table '$tableName' in TRUNCATE TABLE does not exist.")
-    } else if (catalog.isTemporaryTable(tableName)) {
+    }
+    if (catalog.isTemporaryTable(tableName)) {
       throw new AnalysisException(
         s"Operation not allowed: TRUNCATE TABLE on temporary tables: '$tableName'")
-    } else {
-      val locations = if (partitionSpec.isDefined) {
-        catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri)
+    }
+    val table = catalog.getTableMetadata(tableName)
+    if (table.tableType == CatalogTableType.EXTERNAL) {
+      throw new AnalysisException(
+        s"Operation not allowed: TRUNCATE TABLE on external tables: '$tableName'")
+    }
+    if (table.tableType == CatalogTableType.VIEW) {
+      throw new AnalysisException(
+        s"Operation not allowed: TRUNCATE TABLE on views: '$tableName'")
+    }
+    val isDatasourceTable = DDLUtils.isDatasourceTable(table)
+    if (isDatasourceTable && partitionSpec.isDefined) {
+      throw new AnalysisException(
+        s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
+        s"for tables created using the data sources API: '$tableName'")
+    }
+    if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
+      throw new AnalysisException(
+        s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
+        s"for tables that are not partitioned: '$tableName'")
+    }
+    val locations =
+      if (isDatasourceTable || table.partitionColumnNames.isEmpty) {
+        Seq(table.storage.locationUri)
       } else {
-        val table = catalog.getTableMetadata(tableName)
-        if (table.partitionColumnNames.nonEmpty) {
-          catalog.listPartitions(tableName).map(_.storage.locationUri)
-        } else {
-          Seq(table.storage.locationUri)
-        }
+        catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri)
       }
-      val hadoopConf = sparkSession.sessionState.newHadoopConf()
-      locations.foreach { location =>
-        if (location.isDefined) {
-          val path = new Path(location.get)
-          try {
-            val fs = path.getFileSystem(hadoopConf)
-            fs.delete(path, true)
-            fs.mkdirs(path)
-          } catch {
-            case NonFatal(e) =>
-              throw new AnalysisException(
-                s"Failed to truncate table '$tableName' when removing data of the path: $path " +
-                  s"because of ${e.toString}")
-          }
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    locations.foreach { location =>
+      if (location.isDefined) {
+        val path = new Path(location.get)
+        try {
+          val fs = path.getFileSystem(hadoopConf)
+          fs.delete(path, true)
+          fs.mkdirs(path)
+        } catch {
+          case NonFatal(e) =>
+            throw new AnalysisException(
+              s"Failed to truncate table '$tableName' when removing data of the path: $path " +
+                s"because of ${e.toString}")
         }
       }
     }
+    // After deleting the data, invalidate the table to make sure we don't keep around a stale
+    // file relation in the metastore cache.
+    spark.sessionState.invalidateTable(tableName.unquotedString)
+    // Also try to drop the contents of the table from the columnar cache
+    try {
+      spark.sharedState.cacheManager.tryUncacheQuery(spark.table(tableName.quotedString))
+    } catch {
+      case NonFatal(e) =>
+        log.warn(s"Exception when attempting to uncache table '$tableName'", e)
+    }
     Seq.empty[Row]
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index bddd3f2119..6c038c7735 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.util.Utils
 
 class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   private val escapedIdentifier = "`(.+)`".r
@@ -1109,4 +1110,37 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       }
     }
   }
+
+  test("truncate table - datasource table") {
+    import testImplicits._
+    val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
+    data.write.saveAsTable("rectangles")
+    spark.catalog.cacheTable("rectangles")
+    assume(spark.table("rectangles").collect().nonEmpty, "bad test; table was empty to begin with")
+    assume(spark.catalog.isCached("rectangles"), "bad test; table was not cached to begin with")
+    sql("TRUNCATE TABLE rectangles")
+    assert(spark.table("rectangles").collect().isEmpty)
+    assert(!spark.catalog.isCached("rectangles"))
+    // truncating partitioned data source tables is not supported
+    data.write.partitionBy("length").saveAsTable("rectangles2")
+    assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
+    assertUnsupported("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
+  }
+
+  test("truncate table - external table, temporary table, view (not allowed)") {
+    import testImplicits._
+    val path = Utils.createTempDir().getAbsolutePath
+    (1 to 10).map { i => (i, i) }.toDF("a", "b").createTempView("my_temp_tab")
+    sql(s"CREATE EXTERNAL TABLE my_ext_tab LOCATION '$path'")
+    sql(s"CREATE VIEW my_view AS SELECT 1")
+    assertUnsupported("TRUNCATE TABLE my_temp_tab")
+    assertUnsupported("TRUNCATE TABLE my_ext_tab")
+    assertUnsupported("TRUNCATE TABLE my_view")
+  }
+
+  test("truncate table - non-partitioned table (not allowed)") {
+    sql("CREATE TABLE my_tab (age INT, name STRING)")
+    assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
+  }
+
 }
-- 
GitLab