From 251698fb7335a3bb465f1cd0c29e7e74e0361f4a Mon Sep 17 00:00:00 2001
From: Yin Huai <yhuai@databricks.com>
Date: Thu, 2 Apr 2015 16:02:31 -0700
Subject: [PATCH] [SPARK-6655][SQL] We need to read the schema of a data source
 table stored in spark.sql.sources.schema property

https://issues.apache.org/jira/browse/SPARK-6655

Author: Yin Huai <yhuai@databricks.com>

Closes #5313 from yhuai/SPARK-6655 and squashes the following commits:

1e00c03 [Yin Huai] Unnecessary change.
f131bd9 [Yin Huai] Fix.
f1218c1 [Yin Huai] Failed test.
---
 .../spark/sql/hive/HiveMetastoreCatalog.scala | 18 +++++++++++----
 .../sql/hive/MetastoreDataSourcesSuite.scala  | 23 +++++++++++++++++++
 2 files changed, 37 insertions(+), 4 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f0076cef13..14cdb42073 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -70,7 +70,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
         val table = synchronized {
           client.getTable(in.database, in.name)
         }
-        val userSpecifiedSchema =
+
+        def schemaStringFromParts: Option[String] = {
           Option(table.getProperty("spark.sql.sources.schema.numParts")).map { numParts =>
             val parts = (0 until numParts.toInt).map { index =>
               val part = table.getProperty(s"spark.sql.sources.schema.part.${index}")
@@ -82,10 +83,19 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
 
               part
             }
-            // Stick all parts back to a single schema string in the JSON representation
-            // and convert it back to a StructType.
-            DataType.fromJson(parts.mkString).asInstanceOf[StructType]
+            // Stick all parts back to a single schema string.
+            parts.mkString
           }
+        }
+
+        // Originally, we used spark.sql.sources.schema to store the schema of a data source table.
+        // After SPARK-6024, we removed this flag.
+        // Although we are not using spark.sql.sources.schema any more, we need to still support.
+        val schemaString =
+          Option(table.getProperty("spark.sql.sources.schema")).orElse(schemaStringFromParts)
+
+        val userSpecifiedSchema =
+          schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])
 
         // It does not appear that the ql client for the metastore has a way to enumerate all the
         // SerDe properties directly...
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index e5ad0bf552..e09c702c89 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -25,6 +25,8 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.metastore.TableType
+import org.apache.hadoop.hive.ql.metadata.Table
 import org.apache.hadoop.mapred.InvalidInputException
 
 import org.apache.spark.sql._
@@ -682,6 +684,27 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
     assert(schema === actualSchema)
   }
 
+  test("SPARK-6655 still support a schema stored in spark.sql.sources.schema") {
+    val tableName = "spark6655"
+    val schema = StructType(StructField("int", IntegerType, true) :: Nil)
+    // Manually create the metadata in metastore.
+    val tbl = new Table("default", tableName)
+    tbl.setProperty("spark.sql.sources.provider", "json")
+    tbl.setProperty("spark.sql.sources.schema", schema.json)
+    tbl.setProperty("EXTERNAL", "FALSE")
+    tbl.setTableType(TableType.MANAGED_TABLE)
+    tbl.setSerdeParam("path", catalog.hiveDefaultTableFilePath(tableName))
+    catalog.synchronized {
+      catalog.client.createTable(tbl)
+    }
+
+    invalidateTable(tableName)
+    val actualSchema = table(tableName).schema
+    assert(schema === actualSchema)
+    sql(s"drop table $tableName")
+  }
+
+
   test("insert into a table") {
     def createDF(from: Int, to: Int): DataFrame =
       createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2")
-- 
GitLab