From 3405cc775843a3a80d009d4f9079ba9daa2220e7 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Thu, 21 Apr 2016 21:48:48 -0700
Subject: [PATCH] [SPARK-14835][SQL] Remove MetastoreRelation dependency from
 SQLBuilder

## What changes were proposed in this pull request?
This patch removes SQLBuilder's dependency on MetastoreRelation. We should be able to move SQLBuilder into the sql/core package after this change.

## How was this patch tested?
N/A - covered by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #12594 from rxin/SPARK-14835.
---
 .../sql/catalyst/catalog/interface.scala      | 20 +++++++++++++++----
 .../apache/spark/sql/hive/SQLBuilder.scala    | 10 ++++++----
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index d2294efd9a..75cbe32c16 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -21,7 +21,8 @@ import javax.annotation.Nullable
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.parser.DataTypeParser
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 
 
@@ -301,6 +302,7 @@ object ExternalCatalog {
  */
 trait CatalogRelation {
   def catalogTable: CatalogTable
+  def output: Seq[Attribute]
 }
 
 
@@ -315,11 +317,21 @@ case class SimpleCatalogRelation(
     alias: Option[String] = None)
   extends LeafNode with CatalogRelation {
 
-  // TODO: implement this
-  override def output: Seq[Attribute] = Seq.empty
-
   override def catalogTable: CatalogTable = metadata
 
+  override val output: Seq[Attribute] = {
+    val cols = catalogTable.schema
+      .filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
+    (cols ++ catalogTable.partitionColumns).map { f =>
+      AttributeReference(
+        f.name,
+        DataTypeParser.parse(f.dataType),
+        // Since data can be dumped in randomly with no validation, everything is nullable.
+        nullable = true
+      )(qualifier = Some(alias.getOrElse(metadata.identifier.table)))
+    }
+  }
+
   require(metadata.identifier.database == Some(databaseName),
     "provided database does not match the one specified in the table definition")
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 86115d0e9b..3a0e22c742 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -22,8 +22,9 @@ import java.util.concurrent.atomic.AtomicLong
 import scala.util.control.NonFatal
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.{Dataset, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.{CollapseProject, CombineUnions}
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -41,7 +42,7 @@ import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
 class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Logging {
   require(logicalPlan.resolved, "SQLBuilder only supports resolved logical query plans")
 
-  def this(df: DataFrame) = this(df.queryExecution.analyzed, df.sqlContext)
+  def this(df: Dataset[_]) = this(df.queryExecution.analyzed, df.sqlContext)
 
   private val nextSubqueryId = new AtomicLong(0)
   private def newSubqueryName(): String = s"gen_subquery_${nextSubqueryId.getAndIncrement()}"
@@ -517,8 +518,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
       case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
         Some(SQLTable(database, table, l.output.map(_.withQualifier(None))))
 
-      case m: MetastoreRelation =>
-        Some(SQLTable(m.databaseName, m.tableName, m.output.map(_.withQualifier(None))))
+      case relation: CatalogRelation =>
+        val m = relation.catalogTable
+        Some(SQLTable(m.database, m.identifier.table, relation.output.map(_.withQualifier(None))))
 
       case _ => None
     }
-- 
GitLab