diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e0ab5f593e933212327adc75b5e1cbfdf52d0f08..ed98a2541598f132b9c2790f809a9816c6b906a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -390,7 +390,9 @@ class Dataset[T] private(
     val rightEncoder =
       if (other.encoder.flat) other.encoder else other.encoder.nested(rightData.toAttribute)
     implicit val tuple2Encoder: Encoder[(T, U)] =
-      ExpressionEncoder.tuple(leftEncoder, rightEncoder)
+      ExpressionEncoder.tuple(
+        leftEncoder,
+        rightEncoder.rebind(right.output, left.output ++ right.output))
 
     withPlan[(T, U)](other) { (left, right) =>
       Project(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 993e6d269ee034eefa1a697f03b7c065710d5f3e..95b8d05cf44141f4751294d482c65cd8458e144c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -214,4 +214,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
       cogrouped,
       1 -> "a#", 2 -> "#q", 3 -> "abcfoo#w", 5 -> "hello#er")
   }
+
+  test("SPARK-11436: we should rebind right encoder when join 2 datasets") {
+    val ds1 = Seq("1", "2").toDS().as("a")
+    val ds2 = Seq(2, 3).toDS().as("b")
+
+    val joined = ds1.joinWith(ds2, $"a.value" === $"b.value")
+    checkAnswer(joined, ("2", 2))
+  }
 }