From b3a82b7df38e4c3e9cd12ca60aa18fa10c8d84c8 Mon Sep 17 00:00:00 2001
From: Mridul Muralidharan <mridul@gmail.com>
Date: Wed, 4 Sep 2013 06:57:38 +0530
Subject: [PATCH] Fix hash bug - caused failure after 35k stages, sigh

---
 .../apache/spark/network/netty/ShuffleSender.scala |  3 ++-
 .../scala/org/apache/spark/storage/DiskStore.scala |  2 +-
 .../main/scala/org/apache/spark/util/Utils.scala   | 14 ++++++++++++++
 3 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index 537f225469..17d59caf9a 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -20,6 +20,7 @@ package org.apache.spark.network.netty
 import java.io.File
 
 import org.apache.spark.Logging
+import org.apache.spark.util.Utils
 
 
 private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {
@@ -57,7 +58,7 @@ private[spark] object ShuffleSender {
           throw new Exception("Block " + blockId + " is not a shuffle block")
         }
         // Figure out which local directory it hashes to, and which subdirectory in that
-        val hash = math.abs(blockId.hashCode)
+        val hash = Utils.toHash(blockId)
         val dirId = hash % localDirs.length
         val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
         val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index fc25ef0fae..e24f627dd8 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -238,7 +238,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
     logDebug("Getting file for block " + blockId)
 
     // Figure out which local directory it hashes to, and which subdirectory in that
-    val hash = math.abs(blockId.hashCode)
+    val hash = Utils.toHash(blockId)
     val dirId = hash % localDirs.length
     val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
 
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 468800b2bd..54768b798f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -778,4 +778,18 @@ private[spark] object Utils extends Logging {
     val rawMod = x % mod
     rawMod + (if (rawMod < 0) mod else 0)
   }
+
+  // Handles idiosyncracies with hash (add more as required)
+  def toHash(obj: AnyRef): Int = {
+
+    // Required ?
+    if (obj eq null) return 0
+
+    val hash = obj.hashCode
+    // math.abs fails for Int.MinValue
+    val hashAbs = if (Int.MinValue != hash) math.abs(hash) else 0
+
+    // Nothing else to guard against ?
+    hashAbs
+  }
 }
-- 
GitLab