From dc1d324fdf83e9f4b1debfb277533b002691d71f Mon Sep 17 00:00:00 2001
From: Davies Liu <davies@databricks.com>
Date: Wed, 25 Nov 2015 11:11:39 -0800
Subject: [PATCH] [SPARK-11969] [SQL] [PYSPARK] visualization of SQL query for
 pyspark

Currently, we does not have visualization for SQL query from Python, this PR fix that.

cc zsxwing

Author: Davies Liu <davies@databricks.com>

Closes #9949 from davies/pyspark_sql_ui.
---
 python/pyspark/sql/dataframe.py                      |  2 +-
 .../main/scala/org/apache/spark/sql/DataFrame.scala  |  7 +++++++
 .../org/apache/spark/sql/execution/python.scala      | 12 +++++++-----
 3 files changed, 15 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 0dd75ba7ca..746bb55e14 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -277,7 +277,7 @@ class DataFrame(object):
         [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
         """
         with SCCallSiteSync(self._sc) as css:
-            port = self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
+            port = self._jdf.collectToPython()
         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
 
     @ignore_unicode_prefix
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index d8319b9a97..6197f10813 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.python.PythonRDD
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis._
@@ -1735,6 +1736,12 @@ class DataFrame private[sql](
     EvaluatePython.javaToPython(rdd)
   }
 
+  protected[sql] def collectToPython(): Int = {
+    withNewExecutionId {
+      PythonRDD.collectAndServe(javaToPython.rdd)
+    }
+  }
+
   ////////////////////////////////////////////////////////////////////////////
   ////////////////////////////////////////////////////////////////////////////
   // Deprecated methods
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
index d611b0011d..defcec95fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
@@ -121,11 +121,13 @@ object EvaluatePython {
 
   def takeAndServe(df: DataFrame, n: Int): Int = {
     registerPicklers()
-    val iter = new SerDeUtil.AutoBatchedPickler(
-      df.queryExecution.executedPlan.executeTake(n).iterator.map { row =>
-        EvaluatePython.toJava(row, df.schema)
-      })
-    PythonRDD.serveIterator(iter, s"serve-DataFrame")
+    df.withNewExecutionId {
+      val iter = new SerDeUtil.AutoBatchedPickler(
+        df.queryExecution.executedPlan.executeTake(n).iterator.map { row =>
+          EvaluatePython.toJava(row, df.schema)
+        })
+      PythonRDD.serveIterator(iter, s"serve-DataFrame")
+    }
   }
 
   /**
-- 
GitLab