Skip to content
Snippets Groups Projects
Commit b82a6dd2 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #332 from JoshRosen/spark-607

Add try-finally to handle MapOutputTracker timeouts
parents 1072f970 cf52d9ca
No related branches found
No related tags found
No related merge requests found
...@@ -148,18 +148,23 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea ...@@ -148,18 +148,23 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
// We won the race to fetch the output locs; do so // We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor) logInfo("Doing the fetch; tracker actor = " + trackerActor)
val host = System.getProperty("spark.hostname", Utils.localHostName) val host = System.getProperty("spark.hostname", Utils.localHostName)
val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]] // This try-finally prevents hangs due to timeouts:
val fetchedStatuses = deserializeStatuses(fetchedBytes) var fetchedStatuses: Array[MapStatus] = null
try {
logInfo("Got the output locations") val fetchedBytes =
mapStatuses.put(shuffleId, fetchedStatuses) askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
fetching.synchronized { fetchedStatuses = deserializeStatuses(fetchedBytes)
fetching -= shuffleId logInfo("Got the output locations")
fetching.notifyAll() mapStatuses.put(shuffleId, fetchedStatuses)
} if (fetchedStatuses.contains(null)) {
if (fetchedStatuses.contains(null)) { throw new FetchFailedException(null, shuffleId, -1, reduceId,
throw new FetchFailedException(null, shuffleId, -1, reduceId, new Exception("Missing an output location for shuffle " + shuffleId))
new Exception("Missing an output location for shuffle " + shuffleId)) }
} finally {
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
} }
return fetchedStatuses.map(s => return fetchedStatuses.map(s =>
(s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId)))) (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment