From c34c27fe9244939d8c905cd689536dfb81c74d7d Mon Sep 17 00:00:00 2001
From: "navis.ryu" <navis@apache.org>
Date: Mon, 2 Nov 2015 23:52:36 -0800
Subject: [PATCH] [SPARK-9034][SQL] Reflect field names defined in GenericUDTF

Hive GenericUDTF#initialize() defines field names in a returned schema though,
the current HiveGenericUDTF drops these names.
We might need to reflect these in a logical plan tree.

Author: navis.ryu <navis@apache.org>

Closes #8456 from navis/SPARK-9034.
---
 .../spark/sql/catalyst/analysis/Analyzer.scala       | 11 +++++------
 .../spark/sql/catalyst/expressions/generators.scala  | 12 +++++++-----
 .../main/scala/org/apache/spark/sql/DataFrame.scala  | 10 +++++-----
 .../sql/hive/execution/HiveCompatibilitySuite.scala  |  1 +
 .../scala/org/apache/spark/sql/hive/hiveUDFs.scala   |  2 +-
 ...GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e |  1 +
 ...GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30 |  1 +
 ...l_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada |  1 +
 ...l_view_noalias-1-72509f06e1f7c5d5ccc292f775f8eea7 |  0
 ...l_view_noalias-2-6d5806dd1d2511911a5de1e205523f42 |  2 ++
 ...l_view_noalias-3-155b3cc2f5054725a9c2acca3c38c00a |  0
 ...l_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56 |  2 ++
 ...al_view_noalias-5-e1eca4e08216897d090259d4fd1e3fe |  0
 ...l_view_noalias-6-16d227442dd775615c6ecfceedc6c612 |  0
 ...l_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3 |  2 ++
 .../spark/sql/hive/execution/HiveQuerySuite.scala    |  6 ++++++
 16 files changed, 34 insertions(+), 17 deletions(-)
 create mode 100644 sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e
 create mode 100644 sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30
 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada
 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-1-72509f06e1f7c5d5ccc292f775f8eea7
 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42
 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-3-155b3cc2f5054725a9c2acca3c38c00a
 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56
 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-5-e1eca4e08216897d090259d4fd1e3fe
 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-6-16d227442dd775615c6ecfceedc6c612
 create mode 100644 sql/hive/src/test/resources/golden/lateral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 912c967b95..899ee67352 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -147,7 +147,7 @@ class Analyzer(
             case u @ UnresolvedAlias(child) => child match {
               case ne: NamedExpression => ne
               case e if !e.resolved => u
-              case g: Generator if g.elementTypes.size > 1 => MultiAlias(g, Nil)
+              case g: Generator => MultiAlias(g, Nil)
               case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)()
               case other => Alias(other, s"_c$i")()
             }
@@ -722,7 +722,7 @@ class Analyzer(
 
     /**
      * Construct the output attributes for a [[Generator]], given a list of names.  If the list of
-     * names is empty names are assigned by ordinal (i.e., _c0, _c1, ...) to match Hive's defaults.
+     * names is empty names are assigned from field names in generator.
      */
     private def makeGeneratorOutput(
         generator: Generator,
@@ -731,13 +731,12 @@ class Analyzer(
 
       if (names.length == elementTypes.length) {
         names.zip(elementTypes).map {
-          case (name, (t, nullable)) =>
+          case (name, (t, nullable, _)) =>
             AttributeReference(name, t, nullable)()
         }
       } else if (names.isEmpty) {
-        elementTypes.zipWithIndex.map {
-          // keep the default column names as Hive does _c0, _c1, _cN
-          case ((t, nullable), i) => AttributeReference(s"_c$i", t, nullable)()
+        elementTypes.map {
+          case (t, nullable, name) => AttributeReference(name, t, nullable)()
         }
       } else {
         failAnalysis(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 1a2092c909..894a0730d1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -53,7 +53,7 @@ trait Generator extends Expression {
    * The output element data types in structure of Seq[(DataType, Nullable)]
    * TODO we probably need to add more information like metadata etc.
    */
-  def elementTypes: Seq[(DataType, Boolean)]
+  def elementTypes: Seq[(DataType, Boolean, String)]
 
   /** Should be implemented by child classes to perform specific Generators. */
   override def eval(input: InternalRow): TraversableOnce[InternalRow]
@@ -69,7 +69,7 @@ trait Generator extends Expression {
  * A generator that produces its output using the provided lambda function.
  */
 case class UserDefinedGenerator(
-    elementTypes: Seq[(DataType, Boolean)],
+    elementTypes: Seq[(DataType, Boolean, String)],
     function: Row => TraversableOnce[InternalRow],
     children: Seq[Expression])
   extends Generator with CodegenFallback {
@@ -112,9 +112,11 @@ case class Explode(child: Expression) extends UnaryExpression with Generator wit
     }
   }
 
-  override def elementTypes: Seq[(DataType, Boolean)] = child.dataType match {
-    case ArrayType(et, containsNull) => (et, containsNull) :: Nil
-    case MapType(kt, vt, valueContainsNull) => (kt, false) :: (vt, valueContainsNull) :: Nil
+  // hive-compatible default alias for explode function ("col" for array, "key", "value" for map)
+  override def elementTypes: Seq[(DataType, Boolean, String)] = child.dataType match {
+    case ArrayType(et, containsNull) => (et, containsNull, "col") :: Nil
+    case MapType(kt, vt, valueContainsNull) =>
+      (kt, false, "key") :: (vt, valueContainsNull, "value") :: Nil
   }
 
   override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 53ad3c0266..fc0ab632f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1175,7 +1175,8 @@ class DataFrame private[sql](
   def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = {
     val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
 
-    val elementTypes = schema.toAttributes.map { attr => (attr.dataType, attr.nullable) }
+    val elementTypes = schema.toAttributes.map {
+      attr => (attr.dataType, attr.nullable, attr.name) }
     val names = schema.toAttributes.map(_.name)
     val convert = CatalystTypeConverters.createToCatalystConverter(schema)
 
@@ -1184,7 +1185,7 @@ class DataFrame private[sql](
     val generator = UserDefinedGenerator(elementTypes, rowFunction, input.map(_.expr))
 
     Generate(generator, join = true, outer = false,
-      qualifier = None, names.map(UnresolvedAttribute(_)), logicalPlan)
+      qualifier = None, generatorOutput = Nil, logicalPlan)
   }
 
   /**
@@ -1203,8 +1204,7 @@ class DataFrame private[sql](
     val dataType = ScalaReflection.schemaFor[B].dataType
     val attributes = AttributeReference(outputColumn, dataType)() :: Nil
     // TODO handle the metadata?
-    val elementTypes = attributes.map { attr => (attr.dataType, attr.nullable) }
-    val names = attributes.map(_.name)
+    val elementTypes = attributes.map { attr => (attr.dataType, attr.nullable, attr.name) }
 
     def rowFunction(row: Row): TraversableOnce[InternalRow] = {
       val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
@@ -1213,7 +1213,7 @@ class DataFrame private[sql](
     val generator = UserDefinedGenerator(elementTypes, rowFunction, apply(inputColumn).expr :: Nil)
 
     Generate(generator, join = true, outer = false,
-      qualifier = None, names.map(UnresolvedAttribute(_)), logicalPlan)
+      qualifier = None, generatorOutput = Nil, logicalPlan)
   }
 
   /////////////////////////////////////////////////////////////////////////////
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 6ed40b0397..2d0d7b8af3 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -661,6 +661,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "join_star",
     "lateral_view",
     "lateral_view_cp",
+    "lateral_view_noalias",
     "lateral_view_ppd",
     "leftsemijoin",
     "leftsemijoin_mr",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 0b5e863506..a9db70119d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -511,7 +511,7 @@ private[hive] case class HiveGenericUDTF(
   protected lazy val collector = new UDTFCollector
 
   override lazy val elementTypes = outputInspector.getAllStructFieldRefs.asScala.map {
-    field => (inspectorToDataType(field.getFieldObjectInspector), true)
+    field => (inspectorToDataType(field.getFieldObjectInspector), true, field.getFieldName)
   }
 
   @transient
diff --git a/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e b/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e
new file mode 100644
index 0000000000..1cf253f92c
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e	
@@ -0,0 +1 @@
+238
diff --git a/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30 b/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30
new file mode 100644
index 0000000000..60878ffb77
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30	
@@ -0,0 +1 @@
+238	val_238
diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada b/sql/hive/src/test/resources/golden/lateral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada
new file mode 100644
index 0000000000..573541ac97
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/lateral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada
@@ -0,0 +1 @@
+0
diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-1-72509f06e1f7c5d5ccc292f775f8eea7 b/sql/hive/src/test/resources/golden/lateral_view_noalias-1-72509f06e1f7c5d5ccc292f775f8eea7
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42 b/sql/hive/src/test/resources/golden/lateral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42
new file mode 100644
index 0000000000..0da0d93886
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/lateral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42
@@ -0,0 +1,2 @@
+key1	100
+key2	200
diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-3-155b3cc2f5054725a9c2acca3c38c00a b/sql/hive/src/test/resources/golden/lateral_view_noalias-3-155b3cc2f5054725a9c2acca3c38c00a
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56 b/sql/hive/src/test/resources/golden/lateral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56
new file mode 100644
index 0000000000..0da0d93886
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/lateral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56
@@ -0,0 +1,2 @@
+key1	100
+key2	200
diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-5-e1eca4e08216897d090259d4fd1e3fe b/sql/hive/src/test/resources/golden/lateral_view_noalias-5-e1eca4e08216897d090259d4fd1e3fe
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-6-16d227442dd775615c6ecfceedc6c612 b/sql/hive/src/test/resources/golden/lateral_view_noalias-6-16d227442dd775615c6ecfceedc6c612
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/lateral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3 b/sql/hive/src/test/resources/golden/lateral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3
new file mode 100644
index 0000000000..4ba46bbda5
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/lateral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3
@@ -0,0 +1,2 @@
+key1	100	key1	100
+key2	200	key2	200
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index e597d6865f..fc72e3c7dc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -563,6 +563,12 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
   createQueryTest("Specify the udtf output",
     "SELECT d FROM (SELECT explode(array(1,1)) d FROM src LIMIT 1) t")
 
+  createQueryTest("SPARK-9034 Reflect field names defined in GenericUDTF #1",
+    "SELECT col FROM (SELECT explode(array(key,value)) FROM src LIMIT 1) t")
+
+  createQueryTest("SPARK-9034 Reflect field names defined in GenericUDTF #2",
+    "SELECT key,value FROM (SELECT explode(map(key,value)) FROM src LIMIT 1) t")
+
   test("sampling") {
     sql("SELECT * FROM src TABLESAMPLE(0.1 PERCENT) s")
     sql("SELECT * FROM src TABLESAMPLE(100 PERCENT) s")
-- 
GitLab