From 7e2e268289828ae664622c59b90d82938d957ff3 Mon Sep 17 00:00:00 2001 From: Josh Rosen <joshrosen@databricks.com> Date: Wed, 7 Oct 2015 15:53:37 -0700 Subject: [PATCH] [SPARK-9702] [SQL] Use Exchange to implement logical Repartition operator This patch allows `Repartition` to support UnsafeRows. This is accomplished by implementing the logical `Repartition` operator in terms of `Exchange` and a new `RoundRobinPartitioning`. Author: Josh Rosen <joshrosen@databricks.com> Author: Liang-Chi Hsieh <viirya@appier.com> Closes #8083 from JoshRosen/SPARK-9702. --- .../catalyst/plans/physical/partitioning.scala | 16 ++++++++++++++++ .../apache/spark/sql/execution/Exchange.scala | 16 +++++++++++++--- .../spark/sql/execution/SparkStrategies.scala | 6 +++++- .../spark/sql/execution/basicOperators.scala | 18 +++++++++--------- 4 files changed, 43 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 5ac3f1f5b0..86b9417477 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -194,6 +194,22 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning { override def guarantees(other: Partitioning): Boolean = false } +/** + * Represents a partitioning where rows are distributed evenly across output partitions + * by starting from a random target partition number and distributing rows in a round-robin + * fashion. This partitioning is used when implementing the DataFrame.repartition() operator. + */ +case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning { + override def satisfies(required: Distribution): Boolean = required match { + case UnspecifiedDistribution => true + case _ => false + } + + override def compatibleWith(other: Partitioning): Boolean = false + + override def guarantees(other: Partitioning): Boolean = false +} + case object SinglePartition extends Partitioning { val numPartitions = 1 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 029f2264a6..8efa471600 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import java.util.Random + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -31,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.util.MutablePair -import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv} +import org.apache.spark._ /** * :: DeveloperApi :: @@ -130,7 +132,6 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una @transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf private val serializer: Serializer = { - val rowDataTypes = child.output.map(_.dataType).toArray if (tungstenMode) { new UnsafeRowSerializer(child.output.size) } else { @@ -141,6 +142,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una protected override def doExecute(): RDD[InternalRow] = attachTree(this , "execute") { val rdd = child.execute() val part: Partitioner = newPartitioning match { + case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions) case HashPartitioning(expressions, numPartitions) => new HashPartitioner(numPartitions) case RangePartitioning(sortingExpressions, numPartitions) => // Internally, RangePartitioner runs a job on the RDD that samples keys to compute @@ -162,7 +164,15 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una case _ => sys.error(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. } - def getPartitionKeyExtractor(): InternalRow => InternalRow = newPartitioning match { + def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { + case RoundRobinPartitioning(numPartitions) => + // Distributes elements evenly across output partitions, starting from a random partition. + var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions) + (row: InternalRow) => { + // The HashPartitioner will handle the `mod` by the number of partitions + position += 1 + position + } case HashPartitioning(expressions, _) => newMutableProjection(expressions, child.output)() case RangePartitioning(_, _) | SinglePartition => identity case _ => sys.error(s"Exchange not implemented for $newPartitioning") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b078c8b6b0..d1bbf2e20f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -336,7 +336,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { throw new IllegalStateException( "logical distinct operator should have been replaced by aggregate in the optimizer") case logical.Repartition(numPartitions, shuffle, child) => - execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil + if (shuffle) { + execution.Exchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil + } else { + execution.Coalesce(numPartitions, planLater(child)) :: Nil + } case logical.SortPartitions(sortExprs, child) => // This sort only sorts tuples within a partition. Its requiredDistribution will be // an UnspecifiedDistribution. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 3e49e0a357..d4bbbeb39e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -17,21 +17,20 @@ package org.apache.spark.sql.execution +import java.util.Random + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD, ShuffledRDD} +import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.collection.ExternalSorter -import org.apache.spark.util.collection.unsafe.sort.PrefixComparator import org.apache.spark.util.random.PoissonSampler -import org.apache.spark.util.{CompletionIterator, MutablePair} +import org.apache.spark.util.MutablePair import org.apache.spark.{HashPartitioner, SparkEnv} /** @@ -279,10 +278,12 @@ case class TakeOrderedAndProject( /** * :: DeveloperApi :: * Return a new RDD that has exactly `numPartitions` partitions. + * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. + * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of + * the 100 new partitions will claim 10 of the current partitions. */ @DeveloperApi -case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan) - extends UnaryNode { +case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = { @@ -291,11 +292,10 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan) } protected override def doExecute(): RDD[InternalRow] = { - child.execute().map(_.copy()).coalesce(numPartitions, shuffle) + child.execute().map(_.copy()).coalesce(numPartitions, shuffle = false) } } - /** * :: DeveloperApi :: * Returns a table with the elements from left that are not in right using -- GitLab