diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
index ef3cc554b79c07217342899a7d9a0f6dc87a1b82..96a11e352ec509176d66334d5100bd0a12095525 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
@@ -26,13 +26,6 @@ object AttributeMap {
   def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = {
     new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap)
   }
-
-  /** Given a schema, constructs an [[AttributeMap]] from [[Attribute]] to ordinal */
-  def byIndex(schema: Seq[Attribute]): AttributeMap[Int] = apply(schema.zipWithIndex)
-
-  /** Given a schema, constructs a map from ordinal to Attribute. */
-  def toIndex(schema: Seq[Attribute]): Map[Int, Attribute] =
-    schema.zipWithIndex.map { case (a, i) => i -> a }.toMap
 }
 
 class AttributeMap[A](baseMap: Map[ExprId, (Attribute, A)])
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index a38f1ec09156d0a626ae8544f85fcfd178f9f41c..7d16118c9d59ff943be39507f3df306853e3ffb5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -82,16 +82,16 @@ object BindReferences extends Logging {
 
   def bindReference[A <: Expression](
       expression: A,
-      input: Seq[Attribute],
+      input: AttributeSeq,
       allowFailures: Boolean = false): A = {
     expression.transform { case a: AttributeReference =>
       attachTree(a, "Binding attribute") {
-        val ordinal = input.indexWhere(_.exprId == a.exprId)
+        val ordinal = input.indexOf(a.exprId)
         if (ordinal == -1) {
           if (allowFailures) {
             a
           } else {
-            sys.error(s"Couldn't find $a in ${input.mkString("[", ",", "]")}")
+            sys.error(s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}")
           }
         } else {
           BoundReference(ordinal, a.dataType, input(ordinal).nullable)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index 23baa6f7837fbcc24bd27d04f75a9e69189d8172..81f5bb4a65096ed45d434f27b168ef6599ddc731 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst
 
+import com.google.common.collect.Maps
+
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types.{StructField, StructType}
 
@@ -86,11 +88,41 @@ package object expressions  {
   /**
    * Helper functions for working with `Seq[Attribute]`.
    */
-  implicit class AttributeSeq(attrs: Seq[Attribute]) {
+  implicit class AttributeSeq(val attrs: Seq[Attribute]) extends Serializable {
     /** Creates a StructType with a schema matching this `Seq[Attribute]`. */
     def toStructType: StructType = {
       StructType(attrs.map(a => StructField(a.name, a.dataType, a.nullable)))
     }
+
+    // It's possible that `attrs` is a linked list, which can lead to bad O(n^2) loops when
+    // accessing attributes by their ordinals. To avoid this performance penalty, convert the input
+    // to an array.
+    @transient private lazy val attrsArray = attrs.toArray
+
+    @transient private lazy val exprIdToOrdinal = {
+      val arr = attrsArray
+      val map = Maps.newHashMapWithExpectedSize[ExprId, Int](arr.length)
+      // Iterate over the array in reverse order so that the final map value is the first attribute
+      // with a given expression id.
+      var index = arr.length - 1
+      while (index >= 0) {
+        map.put(arr(index).exprId, index)
+        index -= 1
+      }
+      map
+    }
+
+    /**
+     * Returns the attribute at the given index.
+     */
+    def apply(ordinal: Int): Attribute = attrsArray(ordinal)
+
+    /**
+     * Returns the index of first attribute with a matching expression id, or -1 if no match exists.
+     */
+    def indexOf(exprId: ExprId): Int = {
+      Option(exprIdToOrdinal.get(exprId)).getOrElse(-1)
+    }
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 3de15a9a3f544d0aabfb3fead6aba80c9185a14c..19a66cff4fae4655c69a1152119b35ea04ba24cd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -296,7 +296,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
   /**
    * All the attributes that are used for this plan.
    */
-  lazy val allAttributes: Seq[Attribute] = children.flatMap(_.output)
+  lazy val allAttributes: AttributeSeq = children.flatMap(_.output)
 
   private def cleanExpression(e: Expression): Expression = e match {
     case a: Alias =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index f5bc0628b6458abba40109d34dc7eb857f691ef4..f270ca07554f5a831f61c2c7fb469c7a68d27655 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -49,7 +49,7 @@ case class HashAggregateExec(
 
   require(HashAggregateExec.supportsAggregate(aggregateBufferAttributes))
 
-  override lazy val allAttributes: Seq[Attribute] =
+  override lazy val allAttributes: AttributeSeq =
     child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++
       aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index bd55e1a8751daa61d8e108c2dab082e5fa5ba633..a1c2f0a8fbcf41b2ead08b0be800bb7647ae757f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -310,7 +310,7 @@ private[sql] case class InMemoryTableScanExec(
     // within the map Partitions closure.
     val schema = relation.partitionStatistics.schema
     val schemaIndex = schema.zipWithIndex
-    val relOutput = relation.output
+    val relOutput: AttributeSeq = relation.output
     val buffers = relation.cachedColumnBuffers
 
     buffers.mapPartitionsInternal { cachedBatchIterator =>
@@ -321,7 +321,7 @@ private[sql] case class InMemoryTableScanExec(
       // Find the ordinals and data types of the requested columns.
       val (requestedColumnIndices, requestedColumnDataTypes) =
         attributes.map { a =>
-          relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType
+          relOutput.indexOf(a.exprId) -> a.dataType
         }.unzip
 
       // Do partition batch pruning if enabled