diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 170f09be215344562a7ed3edbf38de9562806b78..288badd3160f895e6a021368cb7fc2f3c7e4d967 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
 import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
 import java.util.Comparator
 
+import scala.collection.BufferedIterator
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
@@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
     // Input streams are derived both from the in-memory map and spilled maps on disk
     // The in-memory map is sorted in place, while the spilled maps are already in sorted order
     private val sortedMap = currentMap.destructiveSortedIterator(comparator)
-    private val inputStreams = Seq(sortedMap) ++ spilledMaps
+    private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
 
     inputStreams.foreach { it =>
       val kcPairs = getMorePairs(it)
@@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C](
      * In the event of key hash collisions, this ensures no pairs are hidden from being merged.
      * Assume the given iterator is in sorted order.
      */
-    private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
+    private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
       val kcPairs = new ArrayBuffer[(K, C)]
       if (it.hasNext) {
         var kc = it.next()
         kcPairs += kc
         val minHash = kc._1.hashCode()
-        while (it.hasNext && kc._1.hashCode() == minHash) {
+        while (it.hasNext && it.head._1.hashCode() == minHash) {
           kc = it.next()
           kcPairs += kc
         }
@@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C](
      *
      * StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
      */
-    private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
+    private class StreamBuffer(
+        val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
       extends Comparable[StreamBuffer] {
 
       def isEmpty = pairs.length == 0
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index cdebefb67510c5d73f1741d1956f852f251fe866..deb780953579d47343f95322461f746b2af2f322 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       ("pomatoes", "eructation")      // 568647356
     )
 
+    collisionPairs.foreach { case (w1, w2) =>
+      // String.hashCode is documented to use a specific algorithm, but check just in case
+      assert(w1.hashCode === w2.hashCode)
+    }
+
     (1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
     collisionPairs.foreach { case (w1, w2) =>
       map.insert(w1, w2)
@@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       assert(kv._2.equals(expectedValue))
       count += 1
     }
-    assert(count == 100000 + collisionPairs.size * 2)
+    assert(count === 100000 + collisionPairs.size * 2)
+  }
+
+  test("spilling with many hash collisions") {
+    val conf = new SparkConf(true)
+    conf.set("spark.shuffle.memoryFraction", "0.0001")
+    sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+
+    val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
+
+    // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
+    // problems if the map fails to group together the objects with the same code (SPARK-2043).
+    for (i <- 1 to 10) {
+      for (j <- 1 to 10000) {
+        map.insert(FixedHashObject(j, j % 2), 1)
+      }
+    }
+
+    val it = map.iterator
+    var count = 0
+    while (it.hasNext) {
+      val kv = it.next()
+      assert(kv._2 === 10)
+      count += 1
+    }
+    assert(count === 10000)
   }
 
   test("spilling with hash collisions using the Int.MaxValue key") {
@@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
     }
   }
 }
+
+/**
+ * A dummy class that always returns the same hash code, to easily test hash collisions
+ */
+case class FixedHashObject(val v: Int, val h: Int) extends Serializable {
+  override def hashCode(): Int = h
+}