From 7d734a658349e8691d8b4294454c9cd98d555014 Mon Sep 17 00:00:00 2001
From: Liang-Chi Hsieh <viirya@gmail.com>
Date: Thu, 16 Mar 2017 08:18:36 +0800
Subject: [PATCH] [SPARK-19931][SQL] InMemoryTableScanExec should rewrite
 output partitioning and ordering when aliasing output attributes

## What changes were proposed in this pull request?

Now `InMemoryTableScanExec` simply takes the `outputPartitioning` and `outputOrdering` from the associated `InMemoryRelation`'s `child.outputPartitioning` and `outputOrdering`.

However, `InMemoryTableScanExec` can alias the output attributes. In this case, its `outputPartitioning` and `outputOrdering` are not correct and its parent operators can't correctly determine its data distribution.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #17175 from viirya/ensure-no-unnecessary-shuffle.
---
 .../columnar/InMemoryTableScanExec.scala      | 21 ++++++++++++---
 .../columnar/InMemoryColumnarQuerySuite.scala | 26 +++++++++++++++++++
 2 files changed, 44 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 9028caa446..214e8d309d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.UserDefinedType
@@ -41,11 +41,26 @@ case class InMemoryTableScanExec(
 
   override def output: Seq[Attribute] = attributes
 
+  private def updateAttribute(expr: Expression): Expression = {
+    val attrMap = AttributeMap(relation.child.output.zip(output))
+    expr.transform {
+      case attr: Attribute => attrMap.getOrElse(attr, attr)
+    }
+  }
+
   // The cached version does not change the outputPartitioning of the original SparkPlan.
-  override def outputPartitioning: Partitioning = relation.child.outputPartitioning
+  // But the cached version could alias output, so we need to replace output.
+  override def outputPartitioning: Partitioning = {
+    relation.child.outputPartitioning match {
+      case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
+      case _ => relation.child.outputPartitioning
+    }
+  }
 
   // The cached version does not change the outputOrdering of the original SparkPlan.
-  override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering
+  // But the cached version could alias output, so we need to replace output.
+  override def outputOrdering: Seq[SortOrder] =
+    relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
 
   private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 0250a53fe2..1e6a6a8ba3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -21,6 +21,9 @@ import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 
 import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.expressions.AttributeSet
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData._
@@ -388,4 +391,27 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("InMemoryTableScanExec should return correct output ordering and partitioning") {
+    val df1 = Seq((0, 0), (1, 1)).toDF
+      .repartition(col("_1")).sortWithinPartitions(col("_1")).persist
+    val df2 = Seq((0, 0), (1, 1)).toDF
+      .repartition(col("_1")).sortWithinPartitions(col("_1")).persist
+
+    // Because two cached dataframes have the same logical plan, this is a self-join actually.
+    // So we force one of in-memory relation to alias its output. Then we can test if original and
+    // aliased in-memory relations have correct ordering and partitioning.
+    val joined = df1.joinWith(df2, df1("_1") === df2("_1"))
+
+    val inMemoryScans = joined.queryExecution.executedPlan.collect {
+      case m: InMemoryTableScanExec => m
+    }
+    inMemoryScans.foreach { inMemoryScan =>
+      val sortedAttrs = AttributeSet(inMemoryScan.outputOrdering.flatMap(_.references))
+      assert(sortedAttrs.subsetOf(inMemoryScan.outputSet))
+
+      val partitionedAttrs =
+        inMemoryScan.outputPartitioning.asInstanceOf[HashPartitioning].references
+      assert(partitionedAttrs.subsetOf(inMemoryScan.outputSet))
+    }
+  }
 }
-- 
GitLab