From 22271f969363fd139e6cfb5a2d95a2607fb4e572 Mon Sep 17 00:00:00 2001
From: Josh Rosen <joshrosen@databricks.com>
Date: Thu, 29 Jan 2015 18:23:05 -0800
Subject: [PATCH] [SPARK-5462] [SQL] Use analyzed query plan in
 DataFrame.apply()

This patch changes DataFrame's `apply()` method to use an analyzed query plan when resolving column names.  This fixes a bug where `apply` would throw "invalid call to qualifiers on unresolved object" errors when called on DataFrames constructed via `SQLContext.sql()`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #4282 from JoshRosen/SPARK-5462 and squashes the following commits:

b9e6da2 [Josh Rosen] [SPARK-5462] Use analyzed query plan in DataFrame.apply().
---
 .../src/main/scala/org/apache/spark/sql/DataFrame.scala   | 8 +++++---
 .../test/scala/org/apache/spark/sql/DataFrameSuite.scala  | 4 ++++
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 1ff25adcf8..2694e81eac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -111,14 +111,16 @@ class DataFrame protected[sql](
   /** Returns the list of numeric columns, useful for doing aggregation. */
   protected[sql] def numericColumns: Seq[Expression] = {
     schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
-      logicalPlan.resolve(n.name, sqlContext.analyzer.resolver).get
+      queryExecution.analyzed.resolve(n.name, sqlContext.analyzer.resolver).get
     }
   }
 
   /** Resolves a column name into a Catalyst [[NamedExpression]]. */
   protected[sql] def resolve(colName: String): NamedExpression = {
-    logicalPlan.resolve(colName, sqlContext.analyzer.resolver).getOrElse(throw new RuntimeException(
-      s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})"""))
+    queryExecution.analyzed.resolve(colName, sqlContext.analyzer.resolver).getOrElse {
+      throw new RuntimeException(
+        s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
+    }
   }
 
   /** Left here for compatibility reasons. */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index db83a906d9..df343adc79 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -276,5 +276,9 @@ class DataFrameSuite extends QueryTest {
     )
   }
 
+  test("apply on query results (SPARK-5462)") {
+    val df = testData.sqlContext.sql("select key from testData")
+    checkAnswer(df("key"), testData.select('key).collect().toSeq)
+  }
 
 }
-- 
GitLab