Skip to content
Snippets Groups Projects
Commit 1b9001f7 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.

This is a tiny teeny optimization to move the if check of sortBasedShuffledOn to outside the closures so the closures don't need to pull in the entire Exchange operator object.

Author: Reynold Xin <rxin@apache.org>

Closes #2282 from rxin/SPARK-3409 and squashes the following commits:

1de3f88 [Reynold Xin] [SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.
parent 9422c4ee
No related branches found
No related tags found
No related merge requests found
......@@ -36,25 +36,23 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
override def outputPartitioning = newPartitioning
def output = child.output
override def output = child.output
/** We must copy rows when sort based shuffle is on */
protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
def execute() = attachTree(this , "execute") {
override def execute() = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
val rdd = child.execute().mapPartitions { iter =>
if (sortBasedShuffleOn) {
@transient val hashExpressions =
newProjection(expressions, child.output)
val rdd = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter =>
val hashExpressions = newProjection(expressions, child.output)
iter.map(r => (hashExpressions(r), r.copy()))
} else {
@transient val hashExpressions =
newMutableProjection(expressions, child.output)()
}
} else {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
val mutablePair = new MutablePair[Row, Row]()
iter.map(r => mutablePair.update(hashExpressions(r), r))
}
......@@ -65,17 +63,18 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
shuffled.map(_._2)
case RangePartitioning(sortingExpressions, numPartitions) =>
// TODO: RangePartitioner should take an Ordering.
implicit val ordering = new RowOrdering(sortingExpressions, child.output)
val rdd = child.execute().mapPartitions { iter =>
if (sortBasedShuffleOn) {
iter.map(row => (row.copy(), null))
} else {
val rdd = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))}
} else {
child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Row, Null](null, null)
iter.map(row => mutablePair.update(row, null))
}
}
// TODO: RangePartitioner should take an Ordering.
implicit val ordering = new RowOrdering(sortingExpressions, child.output)
val part = new RangePartitioner(numPartitions, rdd, ascending = true)
val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
......@@ -83,10 +82,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
shuffled.map(_._1)
case SinglePartition =>
val rdd = child.execute().mapPartitions { iter =>
if (sortBasedShuffleOn) {
iter.map(r => (null, r.copy()))
} else {
val rdd = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
} else {
child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Null, Row]()
iter.map(r => mutablePair.update(null, r))
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment