diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3f0d77ad6322aae2e895980827d08c40e11bb577..2d1fa106a2aa99fcd15d39e5110d6d719c724121 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -53,14 +53,11 @@ class Analyzer(catalog: Catalog,
   val extendedRules: Seq[Rule[LogicalPlan]] = Nil
 
   lazy val batches: Seq[Batch] = Seq(
-    Batch("MultiInstanceRelations", Once,
-      NewRelationInstances),
     Batch("Resolution", fixedPoint,
-      ResolveReferences ::
       ResolveRelations ::
+      ResolveReferences ::
       ResolveGroupingAnalytics ::
       ResolveSortReferences ::
-      NewRelationInstances ::
       ImplicitGenerate ::
       ResolveFunctions ::
       GlobalAggregates ::
@@ -285,6 +282,27 @@ class Analyzer(catalog: Catalog,
           }
         )
 
+      // Special handling for cases when self-join introduce duplicate expression ids.
+      case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
+        val conflictingAttributes = left.outputSet.intersect(right.outputSet)
+
+        val (oldRelation, newRelation, attributeRewrites) = right.collect {
+          case oldVersion: MultiInstanceRelation
+              if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
+            val newVersion = oldVersion.newInstance()
+            val newAttributes = AttributeMap(oldVersion.output.zip(newVersion.output))
+            (oldVersion, newVersion, newAttributes)
+        }.head // Only handle first case found, others will be fixed on the next pass.
+
+        val newRight = right transformUp {
+          case r if r == oldRelation => newRelation
+          case other => other transformExpressions {
+            case a: Attribute => attributeRewrites.get(a).getOrElse(a)
+          }
+        }
+
+        j.copy(right = newRight)
+
       case q: LogicalPlan =>
         logTrace(s"Attempting to resolve ${q.simpleString}")
         q transformExpressionsUp  {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
index 4c5fb3f45bf49a3c914c092ba9f0d8219bbb3f6c..894c3500cf5333b5aaefbddf8c97f80acad14072 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -26,28 +26,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
  * produced by distinct operators in a query tree as this breaks the guarantee that expression
  * ids, which are used to differentiate attributes, are unique.
  *
- * Before analysis, all operators that include this trait will be asked to produce a new version
+ * During analysis, operators that include this trait may be asked to produce a new version
  * of itself with globally unique expression ids.
  */
 trait MultiInstanceRelation {
   def newInstance(): this.type
 }
-
-/**
- * If any MultiInstanceRelation appears more than once in the query plan then the plan is updated so
- * that each instance has unique expression ids for the attributes produced.
- */
-object NewRelationInstances extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = {
-    val localRelations = plan collect { case l: MultiInstanceRelation => l}
-    val multiAppearance = localRelations
-      .groupBy(identity[MultiInstanceRelation])
-      .filter { case (_, ls) => ls.size > 1 }
-      .map(_._1)
-      .toSet
-
-    plan transform {
-      case l: MultiInstanceRelation if multiAppearance.contains(l) => l.newInstance()
-    }
-  }
-}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index c4a1f899d8a1326d3cdf198d37ae41ba5bae922a..7d609b91389c6b0ec6c09e9a819436cfcd912f5b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -33,11 +33,9 @@ class PlanTest extends FunSuite {
    * we must normalize them to check if two different queries are identical.
    */
   protected def normalizeExprIds(plan: LogicalPlan) = {
-    val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
-    val minId = if (list.isEmpty) 0 else list.min
     plan transformAllExpressions {
       case a: AttributeReference =>
-        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
+        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 05ac1623d78ed0597f5ba5a7484f1bb25ab09a7b..fd121ce05698cb14dcb0d094e1217e48a694ca19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -122,6 +122,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     case _ =>
   }
 
+  @transient
   protected[sql] val cacheManager = new CacheManager(this)
 
   /**
@@ -159,6 +160,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *       DataTypes.StringType);
    * }}}
    */
+  @transient
   val udf: UDFRegistration = new UDFRegistration(this)
 
   /** Returns true if the table is currently cached in-memory. */
diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties
index fbed0a782dd3eb50d0b5f70e3a79b3a94b0b392b..28e90b9520b2cbd2fb14c6d2f3b29478a9c6b03b 100644
--- a/sql/core/src/test/resources/log4j.properties
+++ b/sql/core/src/test/resources/log4j.properties
@@ -39,6 +39,9 @@ log4j.appender.FA.Threshold = INFO
 log4j.additivity.parquet.hadoop.ParquetRecordReader=false
 log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
 
+log4j.additivity.parquet.hadoop.ParquetOutputCommitter=false
+log4j.logger.parquet.hadoop.ParquetOutputCommitter=OFF
+
 log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
 log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 02623f73c7f766189117b9039fd456866e671056..7be9215a443f0706d170c6383dac3bd0cb9d77b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext.logicalPlanToSparkQuery
 import org.apache.spark.sql.test.TestSQLContext.implicits._
+import org.apache.spark.sql.test.TestSQLContext.sql
 
 
 class DataFrameSuite extends QueryTest {
@@ -88,6 +89,15 @@ class DataFrameSuite extends QueryTest {
       testData.collect().toSeq)
   }
 
+  test("self join") {
+    val df1 = testData.select(testData("key")).as('df1)
+    val df2 = testData.select(testData("key")).as('df2)
+
+    checkAnswer(
+      df1.join(df2, $"df1.key" === $"df2.key"),
+      sql("SELECT a.key, b.key FROM testData a JOIN testData b ON a.key = b.key").collect().toSeq)
+  }
+
   test("selectExpr") {
     checkAnswer(
       testData.selectExpr("abs(key)", "value"),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index 081d94b6fc02001c24cce5c9d2e8f118d9361b36..44ee5ab5975fb60aded34ae856af39f4e092c600 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -35,11 +35,9 @@ class PlanTest extends FunSuite {
    * we must normalize them to check if two different queries are identical.
    */
   protected def normalizeExprIds(plan: LogicalPlan) = {
-    val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
-    val minId = if (list.isEmpty) 0 else list.min
     plan transformAllExpressions {
       case a: AttributeReference =>
-        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
+        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
     }
   }