diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0a3f5a7b5cade39cc8161486515739cceb1f0d4d..b06759f144fd962aa43cd8a7c81509a143367436 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -283,7 +283,7 @@ class Analyzer(
         val conflictingAttributes = left.outputSet.intersect(right.outputSet)
         logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j")
 
-        val (oldRelation, newRelation) = right.collect {
+        right.collect {
           // Handle base relations that might appear more than once.
           case oldVersion: MultiInstanceRelation
               if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
@@ -308,25 +308,27 @@ class Analyzer(
               if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
                 .nonEmpty =>
             (oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
-        }.headOption.getOrElse { // Only handle first case, others will be fixed on the next pass.
-          sys.error(
-            s"""
-              |Failure when resolving conflicting references in Join:
-              |$plan
-              |
-              |Conflicting attributes: ${conflictingAttributes.mkString(",")}
-              """.stripMargin)
         }
-
-        val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
-        val newRight = right transformUp {
-          case r if r == oldRelation => newRelation
-        } transformUp {
-          case other => other transformExpressions {
-            case a: Attribute => attributeRewrites.get(a).getOrElse(a)
-          }
+          // Only handle first case, others will be fixed on the next pass.
+          .headOption match {
+          case None =>
+            /*
+             * No result implies that there is a logical plan node that produces new references
+             * that this rule cannot handle. When that is the case, there must be another rule
+             * that resolves these conflicts. Otherwise, the analysis will fail.
+             */
+            j
+          case Some((oldRelation, newRelation)) =>
+            val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
+            val newRight = right transformUp {
+              case r if r == oldRelation => newRelation
+            } transformUp {
+              case other => other transformExpressions {
+                case a: Attribute => attributeRewrites.get(a).getOrElse(a)
+              }
+            }
+            j.copy(right = newRight)
         }
-        j.copy(right = newRight)
 
       // When resolve `SortOrder`s in Sort based on child, don't report errors as
       // we still have chance to resolve it based on grandchild
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index c5a1437be6d0518c789de25859ca9ff63f30f1bd..a069b4710f38ca8e5b61b5cd50b7a052e50e4517 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -48,6 +48,7 @@ trait CheckAnalysis {
     // We transform up and order the rules so as to catch the first possible failure instead
     // of the result of cascading resolution failures.
     plan.foreachUp {
+
       case operator: LogicalPlan =>
         operator transformExpressionsUp {
           case a: Attribute if !a.resolved =>
@@ -121,6 +122,17 @@ trait CheckAnalysis {
 
           case _ => // Analysis successful!
         }
+
+      // Special handling for cases when self-join introduce duplicate expression ids.
+      case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
+        val conflictingAttributes = left.outputSet.intersect(right.outputSet)
+        failAnalysis(
+          s"""
+             |Failure when resolving conflicting references in Join:
+             |$plan
+             |Conflicting attributes: ${conflictingAttributes.mkString(",")}
+             |""".stripMargin)
+
     }
     extendedCheckRules.foreach(_(plan))
   }