Skip to content
Snippets Groups Projects
Commit 7d734a65 authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Wenchen Fan
Browse files

[SPARK-19931][SQL] InMemoryTableScanExec should rewrite output partitioning...

[SPARK-19931][SQL] InMemoryTableScanExec should rewrite output partitioning and ordering when aliasing output attributes

## What changes were proposed in this pull request?

Now `InMemoryTableScanExec` simply takes the `outputPartitioning` and `outputOrdering` from the associated `InMemoryRelation`'s `child.outputPartitioning` and `outputOrdering`.

However, `InMemoryTableScanExec` can alias the output attributes. In this case, its `outputPartitioning` and `outputOrdering` are not correct and its parent operators can't correctly determine its data distribution.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #17175 from viirya/ensure-no-unnecessary-shuffle.
parent 046b8d4a
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.UserDefinedType
......@@ -41,11 +41,26 @@ case class InMemoryTableScanExec(
override def output: Seq[Attribute] = attributes
private def updateAttribute(expr: Expression): Expression = {
val attrMap = AttributeMap(relation.child.output.zip(output))
expr.transform {
case attr: Attribute => attrMap.getOrElse(attr, attr)
}
}
// The cached version does not change the outputPartitioning of the original SparkPlan.
override def outputPartitioning: Partitioning = relation.child.outputPartitioning
// But the cached version could alias output, so we need to replace output.
override def outputPartitioning: Partitioning = {
relation.child.outputPartitioning match {
case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
case _ => relation.child.outputPartitioning
}
}
// The cached version does not change the outputOrdering of the original SparkPlan.
override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering
// But the cached version could alias output, so we need to replace output.
override def outputOrdering: Seq[SortOrder] =
relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a)
......
......@@ -21,6 +21,9 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData._
......@@ -388,4 +391,27 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}
}
test("InMemoryTableScanExec should return correct output ordering and partitioning") {
val df1 = Seq((0, 0), (1, 1)).toDF
.repartition(col("_1")).sortWithinPartitions(col("_1")).persist
val df2 = Seq((0, 0), (1, 1)).toDF
.repartition(col("_1")).sortWithinPartitions(col("_1")).persist
// Because two cached dataframes have the same logical plan, this is a self-join actually.
// So we force one of in-memory relation to alias its output. Then we can test if original and
// aliased in-memory relations have correct ordering and partitioning.
val joined = df1.joinWith(df2, df1("_1") === df2("_1"))
val inMemoryScans = joined.queryExecution.executedPlan.collect {
case m: InMemoryTableScanExec => m
}
inMemoryScans.foreach { inMemoryScan =>
val sortedAttrs = AttributeSet(inMemoryScan.outputOrdering.flatMap(_.references))
assert(sortedAttrs.subsetOf(inMemoryScan.outputSet))
val partitionedAttrs =
inMemoryScan.outputPartitioning.asInstanceOf[HashPartitioning].references
assert(partitionedAttrs.subsetOf(inMemoryScan.outputSet))
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment