diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
index 3f672a3e0fd91dc29b34a2350f72f04078a943b9..5dc9d0e56608717c63f140f21e8a981cebe28d0e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
@@ -25,8 +25,8 @@ package org.apache.spark.sql.catalyst
 package object analysis {
 
   /**
-   * Responsible for resolving which identifiers refer to the same entity.  For example, by using
-   * case insensitive equality.
+   * Resolver should return true if the first string refers to the same entity as the second string.
+   * For example, by using case insensitive equality.
    */
   type Resolver = (String, String) => Boolean
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 7f122e9d557341db6f1b2c9b19b415719095c60e..f77c56311cc8cbb82b4ea9d93aa04d5d9717b686 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -40,6 +40,17 @@ abstract class NamedExpression extends Expression {
 
   def name: String
   def exprId: ExprId
+
+  /**
+   * All possible qualifiers for the expression.
+   *
+   * For now, since we do not allow using original table name to qualify a column name once the
+   * table is aliased, this can only be:
+   *
+   * 1. Empty Seq: when an attribute doesn't have a qualifier,
+   *    e.g. top level attributes aliased in the SELECT clause, or column from a LocalRelation.
+   * 2. Single element: either the table name or the alias name of the table.
+   */
   def qualifiers: Seq[String]
 
   def toAttribute: Attribute
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 619f42859cbb841049d23808f541c38dee592c8d..17a88e07de15f14f2094acc4e8d47e470fb0fd8d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -152,6 +152,11 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
   /** Prints out the schema in the tree format */
   def printSchema(): Unit = println(schemaString)
 
+  /**
+   * A prefix string used when printing the plan.
+   *
+   * We use "!" to indicate an invalid plan, and "'" to indicate an unresolved plan.
+   */
   protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
 
   override def simpleString = statePrefix + super.simpleString
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index b23f8d03df574d355de7243b7ffcc305b64bd886..8c4f09b58a4f261379e92e9a0a6f1fdbca229030 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -18,41 +18,29 @@
 package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, Resolver}
-import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.trees.TreeNode
-import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.catalyst.trees
 
-/**
- * Estimates of various statistics.  The default estimation logic simply lazily multiplies the
- * corresponding statistic produced by the children.  To override this behavior, override
- * `statistics` and assign it an overridden version of `Statistics`.
- *
- * '''NOTE''': concrete and/or overridden versions of statistics fields should pay attention to the
- * performance of the implementations.  The reason is that estimations might get triggered in
- * performance-critical processes, such as query plan planning.
- *
- * Note that we are using a BigInt here since it is easy to overflow a 64-bit integer in
- * cardinality estimation (e.g. cartesian joins).
- *
- * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
- *                    defaults to the product of children's `sizeInBytes`.
- */
-private[sql] case class Statistics(sizeInBytes: BigInt)
 
 abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
   self: Product =>
 
+  /**
+   * Computes [[Statistics]] for this plan. The default implementation assumes the output
+   * cardinality is the product of of all child plan's cardinality, i.e. applies in the case
+   * of cartesian joins.
+   *
+   * [[LeafNode]]s must override this.
+   */
   def statistics: Statistics = {
     if (children.size == 0) {
       throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
     }
-
-    Statistics(
-      sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
+    Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product)
   }
 
   /**
@@ -128,26 +116,41 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
   def resolve(name: String, resolver: Resolver): Option[NamedExpression] =
     resolve(name, output, resolver)
 
-  def resolveAsTableColumn(
+  /**
+   * Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
+   *
+   * This assumes `name` has multiple parts, where the 1st part is a qualifier
+   * (i.e. table name, alias, or subquery alias).
+   * See the comment above `candidates` variable in resolve() for semantics the returned data.
+   */
+  private def resolveAsTableColumn(
       nameParts: Array[String],
       resolver: Resolver,
-      attribute: Attribute): List[(Attribute, List[String])] = {
-    if (attribute.qualifiers.find(resolver(_, nameParts.head)).nonEmpty && nameParts.size > 1) {
-      val remainingParts = nameParts.drop(1)
+      attribute: Attribute): Option[(Attribute, List[String])] = {
+    assert(nameParts.length > 1)
+    if (attribute.qualifiers.exists(resolver(_, nameParts.head))) {
+      // At least one qualifier matches. See if remaining parts match.
+      val remainingParts = nameParts.tail
       resolveAsColumn(remainingParts, resolver, attribute)
     } else {
-      Nil
+      None
     }
   }
 
-  def resolveAsColumn(
+  /**
+   * Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
+   *
+   * Different from resolveAsTableColumn, this assumes `name` does NOT start with a qualifier.
+   * See the comment above `candidates` variable in resolve() for semantics the returned data.
+   */
+  private def resolveAsColumn(
       nameParts: Array[String],
       resolver: Resolver,
-      attribute: Attribute): List[(Attribute, List[String])] = {
+      attribute: Attribute): Option[(Attribute, List[String])] = {
     if (resolver(attribute.name, nameParts.head)) {
-      (attribute.withName(nameParts.head), nameParts.tail.toList) :: Nil
+      Option((attribute.withName(nameParts.head), nameParts.tail.toList))
     } else {
-      Nil
+      None
     }
   }
 
@@ -159,25 +162,44 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
 
     val parts = name.split("\\.")
 
-    // We will try to resolve this name as `table.column` pattern first.
-    var options = input.flatMap { option =>
-      resolveAsTableColumn(parts, resolver, option)
+    // A sequence of possible candidate matches.
+    // Each candidate is a tuple. The first element is a resolved attribute, followed by a list
+    // of parts that are to be resolved.
+    // For example, consider an example where "a" is the table name, "b" is the column name,
+    // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b",
+    // and the second element will be List("c").
+    var candidates: Seq[(Attribute, List[String])] = {
+      // If the name has 2 or more parts, try to resolve it as `table.column` first.
+      if (parts.length > 1) {
+        input.flatMap { option =>
+          resolveAsTableColumn(parts, resolver, option)
+        }
+      } else {
+        Seq.empty
+      }
     }
 
     // If none of attributes match `table.column` pattern, we try to resolve it as a column.
-    if(options.isEmpty) {
-      options = input.flatMap { option =>
-        resolveAsColumn(parts, resolver, option)
+    if (candidates.isEmpty) {
+      candidates = input.flatMap { candidate =>
+        resolveAsColumn(parts, resolver, candidate)
       }
     }
 
-    options.distinct match {
+    candidates.distinct match {
       // One match, no nested fields, use it.
       case Seq((a, Nil)) => Some(a)
 
       // One match, but we also need to extract the requested nested field.
       case Seq((a, nestedFields)) =>
-        Some(Alias(nestedFields.foldLeft(a: Expression)(UnresolvedGetField), nestedFields.last)())
+        // The foldLeft adds UnresolvedGetField for every remaining parts of the name,
+        // and aliased it with the last part of the name.
+        // For example, consider name "a.b.c", where "a" is resolved to an existing attribute.
+        // Then this will add UnresolvedGetField("b") and UnresolvedGetField("c"), and alias
+        // the final expression as "c".
+        val fieldExprs = nestedFields.foldLeft(a: Expression)(UnresolvedGetField)
+        val aliasName = nestedFields.last
+        Some(Alias(fieldExprs, aliasName)())
 
       // No matches.
       case Seq() =>
@@ -186,8 +208,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
 
       // More than one match.
       case ambiguousReferences =>
-        throw new TreeNodeException(
-          this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
+        throw new AnalysisException(
+          s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
     }
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
new file mode 100644
index 0000000000000000000000000000000000000000..9ac4c3a2a56c8c2d55b63704f50296c0e02fe371
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+/**
+ * Estimates of various statistics.  The default estimation logic simply lazily multiplies the
+ * corresponding statistic produced by the children.  To override this behavior, override
+ * `statistics` and assign it an overridden version of `Statistics`.
+ *
+ * '''NOTE''': concrete and/or overridden versions of statistics fields should pay attention to the
+ * performance of the implementations.  The reason is that estimations might get triggered in
+ * performance-critical processes, such as query plan planning.
+ *
+ * Note that we are using a BigInt here since it is easy to overflow a 64-bit integer in
+ * cardinality estimation (e.g. cartesian joins).
+ *
+ * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
+ *                    defaults to the product of children's `sizeInBytes`.
+ */
+private[sql] case class Statistics(sizeInBytes: BigInt)