From 271c4c621d91d3f610ae89e5d2e5dab1a2009ca6 Mon Sep 17 00:00:00 2001
From: Burak Yavuz <brkyvz@gmail.com>
Date: Tue, 28 Apr 2015 22:48:04 -0700
Subject: [PATCH] [SPARK-7215] made coalesce and repartition a part of the
 query plan

Coalesce and repartition now show up as part of the query plan, rather than resulting in a new `DataFrame`.

cc rxin

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #5762 from brkyvz/df-repartition and squashes the following commits:

b1e76dd [Burak Yavuz] added documentation on repartitions
5807e35 [Burak Yavuz] renamed coalescepartitions
fa4509f [Burak Yavuz] rename coalesce
2c349b5 [Burak Yavuz] address comments
f2e6af1 [Burak Yavuz] add ticks
686c90b [Burak Yavuz] made coalesce and repartition a part of the query plan
---
 .../catalyst/plans/logical/basicOperators.scala    | 11 +++++++++++
 .../sql/catalyst/plans/logical/partitioning.scala  |  8 +++++++-
 .../scala/org/apache/spark/sql/DataFrame.scala     |  9 ++-------
 .../spark/sql/execution/SparkStrategies.scala      |  5 +++--
 .../spark/sql/execution/basicOperators.scala       | 14 ++++++++++++++
 .../scala/org/apache/spark/sql/hive/HiveQl.scala   |  6 +++---
 6 files changed, 40 insertions(+), 13 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index bbc94a7ab3..608e272da7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -310,6 +310,17 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 }
 
+/**
+ * Return a new RDD that has exactly `numPartitions` partitions. Differs from
+ * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
+ * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
+ * of the output requires some specific ordering or distribution of the data.
+ */
+case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
+  extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+}
+
 /**
  * A relation with one row. This is used in "SELECT ..." without a from clause.
  */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
index e737418d9c..63df2c1ee7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
@@ -32,5 +32,11 @@ abstract class RedistributeData extends UnaryNode {
 case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
   extends RedistributeData
 
-case class Repartition(partitionExpressions: Seq[Expression], child: LogicalPlan)
+/**
+ * This method repartitions data using [[Expression]]s, and receives information about the
+ * number of partitions during execution. Used when a specific ordering or distribution is
+ * expected by the consumer of the query result. Use [[Repartition]] for RDD-like
+ * `coalesce` and `repartition`.
+ */
+case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan)
   extends RedistributeData
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index ca6ae482eb..2affba7d42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -961,9 +961,7 @@ class DataFrame private[sql](
    * @group rdd
    */
   override def repartition(numPartitions: Int): DataFrame = {
-    sqlContext.createDataFrame(
-      queryExecution.toRdd.map(_.copy()).repartition(numPartitions),
-      schema, needsConversion = false)
+    Repartition(numPartitions, shuffle = true, logicalPlan)
   }
 
   /**
@@ -974,10 +972,7 @@ class DataFrame private[sql](
    * @group rdd
    */
   override def coalesce(numPartitions: Int): DataFrame = {
-    sqlContext.createDataFrame(
-      queryExecution.toRdd.coalesce(numPartitions),
-      schema,
-      needsConversion = false)
+    Repartition(numPartitions, shuffle = false, logicalPlan)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 030ef118f7..3a0a6c8670 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -283,7 +283,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case logical.Distinct(child) =>
         execution.Distinct(partial = false,
           execution.Distinct(partial = true, planLater(child))) :: Nil
-
+      case logical.Repartition(numPartitions, shuffle, child) =>
+        execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil
       case logical.SortPartitions(sortExprs, child) =>
         // This sort only sorts tuples within a partition. Its requiredDistribution will be
         // an UnspecifiedDistribution.
@@ -317,7 +318,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
       case logical.OneRowRelation =>
         execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
-      case logical.Repartition(expressions, child) =>
+      case logical.RepartitionByExpression(expressions, child) =>
         execution.Exchange(
           HashPartitioning(expressions, numPartitions), Nil, planLater(child)) :: Nil
       case e @ EvaluatePython(udf, child, _) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index d286fe81be..1afdb40941 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -245,6 +245,20 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
   }
 }
 
+/**
+ * :: DeveloperApi ::
+ * Return a new RDD that has exactly `numPartitions` partitions.
+ */
+@DeveloperApi
+case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
+  extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def execute(): RDD[Row] = {
+    child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
+  }
+}
+
 
 /**
  * :: DeveloperApi ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 0ea6d57b81..2dc6463aba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -783,13 +783,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
             case (None, Some(perPartitionOrdering), None, None) =>
               Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withHaving)
             case (None, None, Some(partitionExprs), None) =>
-              Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving)
+              RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving)
             case (None, Some(perPartitionOrdering), Some(partitionExprs), None) =>
               Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false,
-                Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving))
+                RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving))
             case (None, None, None, Some(clusterExprs)) =>
               Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false,
-                Repartition(clusterExprs.getChildren.map(nodeToExpr), withHaving))
+                RepartitionByExpression(clusterExprs.getChildren.map(nodeToExpr), withHaving))
             case (None, None, None, None) => withHaving
             case _ => sys.error("Unsupported set of ordering / distribution clauses.")
           }
-- 
GitLab