Skip to content
Snippets Groups Projects
Commit b3a82b7d authored by Mridul Muralidharan's avatar Mridul Muralidharan
Browse files

Fix hash bug - caused failure after 35k stages, sigh

parent c592a3c9
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,7 @@ package org.apache.spark.network.netty ...@@ -20,6 +20,7 @@ package org.apache.spark.network.netty
import java.io.File import java.io.File
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.util.Utils
private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging { private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {
...@@ -57,7 +58,7 @@ private[spark] object ShuffleSender { ...@@ -57,7 +58,7 @@ private[spark] object ShuffleSender {
throw new Exception("Block " + blockId + " is not a shuffle block") throw new Exception("Block " + blockId + " is not a shuffle block")
} }
// Figure out which local directory it hashes to, and which subdirectory in that // Figure out which local directory it hashes to, and which subdirectory in that
val hash = math.abs(blockId.hashCode) val hash = Utils.toHash(blockId)
val dirId = hash % localDirs.length val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
......
...@@ -238,7 +238,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) ...@@ -238,7 +238,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
logDebug("Getting file for block " + blockId) logDebug("Getting file for block " + blockId)
// Figure out which local directory it hashes to, and which subdirectory in that // Figure out which local directory it hashes to, and which subdirectory in that
val hash = math.abs(blockId.hashCode) val hash = Utils.toHash(blockId)
val dirId = hash % localDirs.length val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
......
...@@ -778,4 +778,18 @@ private[spark] object Utils extends Logging { ...@@ -778,4 +778,18 @@ private[spark] object Utils extends Logging {
val rawMod = x % mod val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0) rawMod + (if (rawMod < 0) mod else 0)
} }
// Handles idiosyncracies with hash (add more as required)
def toHash(obj: AnyRef): Int = {
// Required ?
if (obj eq null) return 0
val hash = obj.hashCode
// math.abs fails for Int.MinValue
val hashAbs = if (Int.MinValue != hash) math.abs(hash) else 0
// Nothing else to guard against ?
hashAbs
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment