From 425ff03f5ac4f3ddda1ba06656e620d5426f4209 Mon Sep 17 00:00:00 2001 From: Wenchen Fan <wenchen@databricks.com> Date: Tue, 3 Nov 2015 12:47:39 +0100 Subject: [PATCH] [SPARK-11436] [SQL] rebind right encoder when join 2 datasets When we join 2 datasets, we will combine 2 encoders into a tupled one, and use it as the encoder for the jioned dataset. Assume both of the 2 encoders are flat, their `constructExpression`s both reference to the first element of input row. However, when we combine 2 encoders, the schema of input row changed, now the right encoder should reference to second element of input row. So we should rebind right encoder to let it know the new schema of input row before combine it. Author: Wenchen Fan <wenchen@databricks.com> Closes #9391 from cloud-fan/join and squashes the following commits: 846d3ab [Wenchen Fan] rebind right encoder when join 2 datasets --- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index e0ab5f593e..ed98a25415 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -390,7 +390,9 @@ class Dataset[T] private( val rightEncoder = if (other.encoder.flat) other.encoder else other.encoder.nested(rightData.toAttribute) implicit val tuple2Encoder: Encoder[(T, U)] = - ExpressionEncoder.tuple(leftEncoder, rightEncoder) + ExpressionEncoder.tuple( + leftEncoder, + rightEncoder.rebind(right.output, left.output ++ right.output)) withPlan[(T, U)](other) { (left, right) => Project( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 993e6d269e..95b8d05cf4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -214,4 +214,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext { cogrouped, 1 -> "a#", 2 -> "#q", 3 -> "abcfoo#w", 5 -> "hello#er") } + + test("SPARK-11436: we should rebind right encoder when join 2 datasets") { + val ds1 = Seq("1", "2").toDS().as("a") + val ds2 = Seq(2, 3).toDS().as("b") + + val joined = ds1.joinWith(ds2, $"a.value" === $"b.value") + checkAnswer(joined, ("2", 2)) + } } -- GitLab