From adcb7d3350032dda69a43de724c8bdff5fef2c67 Mon Sep 17 00:00:00 2001
From: Michael Armbrust <michael@databricks.com>
Date: Fri, 17 Oct 2014 14:12:07 -0700
Subject: [PATCH] [SPARK-3855][SQL] Preserve the result attribute of python
 UDFs though transformations

In the current implementation it was possible for the reference to change after analysis.

Author: Michael Armbrust <michael@databricks.com>

Closes #2717 from marmbrus/pythonUdfResults and squashes the following commits:

da14879 [Michael Armbrust] Fix test
6343bcb [Michael Armbrust] add test
9533286 [Michael Armbrust] Correctly preserve the result attribute of python UDFs though transformations
---
 python/pyspark/tests.py                              |  6 ++++++
 .../apache/spark/sql/execution/SparkStrategies.scala |  2 +-
 .../org/apache/spark/sql/execution/pythonUdfs.scala  | 12 ++++++++++--
 3 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index ceab57464f..f5ccf31abb 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -683,6 +683,12 @@ class SQLTests(ReusedPySparkTestCase):
         [row] = self.sqlCtx.sql("SELECT twoArgs('test', 1)").collect()
         self.assertEqual(row[0], 5)
 
+    def test_udf2(self):
+        self.sqlCtx.registerFunction("strlen", lambda string: len(string))
+        self.sqlCtx.inferSchema(self.sc.parallelize([Row(a="test")])).registerTempTable("test")
+        [res] = self.sqlCtx.sql("SELECT strlen(a) FROM test WHERE strlen(a) > 1").collect()
+        self.assertEqual(u"4", res[0])
+
     def test_broadcast_in_udf(self):
         bar = {"a": "aa", "b": "bb", "c": "abc"}
         foo = self.sc.broadcast(bar)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 4f1af7234d..79e4ddb8c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -295,7 +295,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
       case logical.Repartition(expressions, child) =>
         execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
-      case e @ EvaluatePython(udf, child) =>
+      case e @ EvaluatePython(udf, child, _) =>
         BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
       case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil
       case _ => Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 0977da3e85..be729e5d24 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -105,13 +105,21 @@ private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] {
   }
 }
 
+object EvaluatePython {
+  def apply(udf: PythonUDF, child: LogicalPlan) =
+    new EvaluatePython(udf, child, AttributeReference("pythonUDF", udf.dataType)())
+}
+
 /**
  * :: DeveloperApi ::
  * Evaluates a [[PythonUDF]], appending the result to the end of the input tuple.
  */
 @DeveloperApi
-case class EvaluatePython(udf: PythonUDF, child: LogicalPlan) extends logical.UnaryNode {
-  val resultAttribute = AttributeReference("pythonUDF", udf.dataType, nullable=true)()
+case class EvaluatePython(
+    udf: PythonUDF,
+    child: LogicalPlan,
+    resultAttribute: AttributeReference)
+  extends logical.UnaryNode {
 
   def output = child.output :+ resultAttribute
 }
-- 
GitLab