Skip to content
Snippets Groups Projects
Commit b45c13e7 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

SPARK-2043: ExternalAppendOnlyMap doesn't always find matching keys

The current implementation reads one key with the next hash code as it finishes reading the keys with the current hash code, which may cause it to miss some matches of the next key. This can cause operations like join to give the wrong result when reduce tasks spill to disk and there are hash collisions, as values won't be matched together. This PR fixes it by not reading in that next key, using a peeking iterator instead.

Author: Matei Zaharia <matei@databricks.com>

Closes #986 from mateiz/spark-2043 and squashes the following commits:

0959514 [Matei Zaharia] Added unit test for having many hash collisions
892debb [Matei Zaharia] SPARK-2043: don't read a key with the next hash code in ExternalAppendOnlyMap, instead use a buffered iterator to only read values with the current hash code.
parent 9bad0b73
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
import java.util.Comparator
import scala.collection.BufferedIterator
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
......@@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
private val inputStreams = Seq(sortedMap) ++ spilledMaps
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
inputStreams.foreach { it =>
val kcPairs = getMorePairs(it)
......@@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C](
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
* Assume the given iterator is in sorted order.
*/
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
val kcPairs = new ArrayBuffer[(K, C)]
if (it.hasNext) {
var kc = it.next()
kcPairs += kc
val minHash = kc._1.hashCode()
while (it.hasNext && kc._1.hashCode() == minHash) {
while (it.hasNext && it.head._1.hashCode() == minHash) {
kc = it.next()
kcPairs += kc
}
......@@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C](
*
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
*/
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
private class StreamBuffer(
val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
extends Comparable[StreamBuffer] {
def isEmpty = pairs.length == 0
......
......@@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
("pomatoes", "eructation") // 568647356
)
collisionPairs.foreach { case (w1, w2) =>
// String.hashCode is documented to use a specific algorithm, but check just in case
assert(w1.hashCode === w2.hashCode)
}
(1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
collisionPairs.foreach { case (w1, w2) =>
map.insert(w1, w2)
......@@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
assert(kv._2.equals(expectedValue))
count += 1
}
assert(count == 100000 + collisionPairs.size * 2)
assert(count === 100000 + collisionPairs.size * 2)
}
test("spilling with many hash collisions") {
val conf = new SparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
// problems if the map fails to group together the objects with the same code (SPARK-2043).
for (i <- 1 to 10) {
for (j <- 1 to 10000) {
map.insert(FixedHashObject(j, j % 2), 1)
}
}
val it = map.iterator
var count = 0
while (it.hasNext) {
val kv = it.next()
assert(kv._2 === 10)
count += 1
}
assert(count === 10000)
}
test("spilling with hash collisions using the Int.MaxValue key") {
......@@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
}
}
/**
* A dummy class that always returns the same hash code, to easily test hash collisions
*/
case class FixedHashObject(val v: Int, val h: Int) extends Serializable {
override def hashCode(): Int = h
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment