diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 82f0a74b630bf6969b7dfd1b6fc4e5cdf870c006..cc138c749949da8e3f87387b39a85b2b9aa3b265 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -158,7 +158,12 @@ case class HashOuterJoin( left: SparkPlan, right: SparkPlan) extends BinaryNode { - override def outputPartitioning: Partitioning = left.outputPartitioning + override def outputPartitioning: Partitioning = joinType match { + case LeftOuter => left.outputPartitioning + case RightOuter => right.outputPartitioning + case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions) + case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType") + } override def requiredChildDistribution = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil @@ -309,7 +314,7 @@ case class HashOuterJoin( leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST), rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST)) } - case x => throw new Exception(s"Need to add implementation for $x") + case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 037890682f7b17f5bdcca68e2b22ca85b2fac908..2fc80588182d9d037f1c7d2cc43f2827e7145c38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -197,6 +197,31 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { (4, "D", 4, "d") :: (5, "E", null, null) :: (6, "F", null, null) :: Nil) + + // Make sure we are choosing left.outputPartitioning as the + // outputPartitioning for the outer join operator. + checkAnswer( + sql( + """ + |SELECT l.N, count(*) + |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a) + |GROUP BY l.N + """.stripMargin), + (1, 1) :: + (2, 1) :: + (3, 1) :: + (4, 1) :: + (5, 1) :: + (6, 1) :: Nil) + + checkAnswer( + sql( + """ + |SELECT r.a, count(*) + |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a) + |GROUP BY r.a + """.stripMargin), + (null, 6) :: Nil) } test("right outer join") { @@ -232,6 +257,31 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { (4, "d", 4, "D") :: (null, null, 5, "E") :: (null, null, 6, "F") :: Nil) + + // Make sure we are choosing right.outputPartitioning as the + // outputPartitioning for the outer join operator. + checkAnswer( + sql( + """ + |SELECT l.a, count(*) + |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N) + |GROUP BY l.a + """.stripMargin), + (null, 6) :: Nil) + + checkAnswer( + sql( + """ + |SELECT r.N, count(*) + |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N) + |GROUP BY r.N + """.stripMargin), + (1, 1) :: + (2, 1) :: + (3, 1) :: + (4, 1) :: + (5, 1) :: + (6, 1) :: Nil) } test("full outer join") { @@ -269,5 +319,54 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { (4, "D", 4, "D") :: (null, null, 5, "E") :: (null, null, 6, "F") :: Nil) + + // Make sure we are UnknownPartitioning as the outputPartitioning for the outer join operator. + checkAnswer( + sql( + """ + |SELECT l.a, count(*) + |FROM allNulls l FULL OUTER JOIN upperCaseData r ON (l.a = r.N) + |GROUP BY l.a + """.stripMargin), + (null, 10) :: Nil) + + checkAnswer( + sql( + """ + |SELECT r.N, count(*) + |FROM allNulls l FULL OUTER JOIN upperCaseData r ON (l.a = r.N) + |GROUP BY r.N + """.stripMargin), + (1, 1) :: + (2, 1) :: + (3, 1) :: + (4, 1) :: + (5, 1) :: + (6, 1) :: + (null, 4) :: Nil) + + checkAnswer( + sql( + """ + |SELECT l.N, count(*) + |FROM upperCaseData l FULL OUTER JOIN allNulls r ON (l.N = r.a) + |GROUP BY l.N + """.stripMargin), + (1, 1) :: + (2, 1) :: + (3, 1) :: + (4, 1) :: + (5, 1) :: + (6, 1) :: + (null, 4) :: Nil) + + checkAnswer( + sql( + """ + |SELECT r.a, count(*) + |FROM upperCaseData l FULL OUTER JOIN allNulls r ON (l.N = r.a) + |GROUP BY r.a + """.stripMargin), + (null, 10) :: Nil) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index 213190e812026b2c2a09ad4ae613e61fb2b59b9f..58cee21e8ad4c82d223e4ca909a9cfab1496adae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -118,6 +118,14 @@ object TestData { ) nullInts.registerAsTable("nullInts") + val allNulls = + TestSQLContext.sparkContext.parallelize( + NullInts(null) :: + NullInts(null) :: + NullInts(null) :: + NullInts(null) :: Nil) + allNulls.registerAsTable("allNulls") + case class NullStrings(n: Int, s: String) val nullStrings = TestSQLContext.sparkContext.parallelize(