From 5afd927a47aa7ede3039234f2f7262e2247aa2ae Mon Sep 17 00:00:00 2001
From: gatorsmile <gatorsmile@gmail.com>
Date: Mon, 23 May 2016 18:03:45 -0700
Subject: [PATCH] [SPARK-15311][SQL] Disallow DML on Regular Tables when Using
 In-Memory Catalog

#### What changes were proposed in this pull request?
So far, when using In-Memory Catalog, we allow DDL operations for the tables. However, the corresponding DML operations are not supported for the tables that are neither temporary nor data source tables. For example,
```SQL
CREATE TABLE tabName(i INT, j STRING)
SELECT * FROM tabName
INSERT OVERWRITE TABLE tabName SELECT 1, 'a'
```
In the above example, before this PR fix, we will get very confusing exception messages for either `SELECT` or `INSERT`
```
org.apache.spark.sql.AnalysisException: unresolved operator 'SimpleCatalogRelation default, CatalogTable(`default`.`tbl`,CatalogTableType(MANAGED),CatalogStorageFormat(None,Some(org.apache.hadoop.mapred.TextInputFormat),Some(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat),None,false,Map()),List(CatalogColumn(i,int,true,None), CatalogColumn(j,string,true,None)),List(),List(),List(),-1,,1463928681802,-1,Map(),None,None,None,List()), None;
```

This PR is to issue appropriate exceptions in this case. The message will be like
```
org.apache.spark.sql.AnalysisException: Please enable Hive support when operating non-temporary tables: `tbl`;
```
#### How was this patch tested?
Added a test case in `DDLSuite`.

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #13093 from gatorsmile/selectAfterCreate.
---
 .../sql/catalyst/analysis/CheckAnalysis.scala | 15 ++++++
 .../plans/logical/basicLogicalOperators.scala |  3 +-
 .../sql/execution/command/DDLSuite.scala      | 48 +++++++++++++++++++
 3 files changed, 65 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 28aa249888..cd242d78a4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.catalog.SimpleCatalogRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans.UsingJoin
@@ -305,6 +306,20 @@ trait CheckAnalysis extends PredicateHelper {
                  |Conflicting attributes: ${conflictingAttributes.mkString(",")}
                """.stripMargin)
 
+          case s: SimpleCatalogRelation =>
+            failAnalysis(
+              s"""
+                 |Please enable Hive support when selecting the regular tables:
+                 |${s.catalogTable.identifier}
+               """.stripMargin)
+
+          case InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) =>
+            failAnalysis(
+              s"""
+                 |Please enable Hive support when inserting the regular tables:
+                 |${s.catalogTable.identifier}
+               """.stripMargin)
+
           case o if !o.resolved =>
             failAnalysis(
               s"unresolved operator ${operator.simpleString}")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index b1b3e00de1..ca0096eeb2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -377,7 +377,8 @@ case class InsertIntoTable(
   }
 
   assert(overwrite || !ifNotExists)
-  override lazy val resolved: Boolean = childrenResolved && expectedColumns.forall { expected =>
+  override lazy val resolved: Boolean =
+    childrenResolved && table.resolved && expectedColumns.forall { expected =>
     child.output.size == expected.size && child.output.zip(expected).forall {
       case (childAttr, tableAttr) =>
         DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index d72dc092e2..64f5a4ac47 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterEach
 
+import org.apache.spark.internal.config._
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException}
@@ -1044,6 +1045,53 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     )
   }
 
+  test("select/insert into the managed table") {
+    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+    val tabName = "tbl"
+    withTable(tabName) {
+      sql(s"CREATE TABLE $tabName (i INT, j STRING)")
+      val catalogTable =
+        spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName, Some("default")))
+      assert(catalogTable.tableType == CatalogTableType.MANAGED)
+
+      var message = intercept[AnalysisException] {
+        sql(s"INSERT OVERWRITE TABLE $tabName SELECT 1, 'a'")
+      }.getMessage
+      assert(message.contains("Please enable Hive support when inserting the regular tables"))
+      message = intercept[AnalysisException] {
+        sql(s"SELECT * FROM $tabName")
+      }.getMessage
+      assert(message.contains("Please enable Hive support when selecting the regular tables"))
+    }
+  }
+
+  test("select/insert into external table") {
+    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+    withTempDir { tempDir =>
+      val tabName = "tbl"
+      withTable(tabName) {
+        sql(
+          s"""
+             |CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |LOCATION '$tempDir'
+           """.stripMargin)
+        val catalogTable =
+          spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName, Some("default")))
+        assert(catalogTable.tableType == CatalogTableType.EXTERNAL)
+
+        var message = intercept[AnalysisException] {
+          sql(s"INSERT OVERWRITE TABLE $tabName SELECT 1, 'a'")
+        }.getMessage
+        assert(message.contains("Please enable Hive support when inserting the regular tables"))
+        message = intercept[AnalysisException] {
+          sql(s"SELECT * FROM $tabName")
+        }.getMessage
+        assert(message.contains("Please enable Hive support when selecting the regular tables"))
+      }
+    }
+  }
+
   test("drop default database") {
     Seq("true", "false").foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
-- 
GitLab