Skip to content
Snippets Groups Projects
Commit 4beb084f authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #374 from woggling/null-mapout

Generate FetchFailedException even for cached missing map outputs
parents 7adfedb0 4078623b
No related branches found
No related tags found
No related merge requests found
...@@ -139,8 +139,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea ...@@ -139,8 +139,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
case e: InterruptedException => case e: InterruptedException =>
} }
} }
return mapStatuses.get(shuffleId).map(status => return MapOutputTracker.convertMapStatuses(shuffleId, reduceId,
(status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId)))) mapStatuses.get(shuffleId))
} else { } else {
fetching += shuffleId fetching += shuffleId
} }
...@@ -156,21 +156,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea ...@@ -156,21 +156,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
fetchedStatuses = deserializeStatuses(fetchedBytes) fetchedStatuses = deserializeStatuses(fetchedBytes)
logInfo("Got the output locations") logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses) mapStatuses.put(shuffleId, fetchedStatuses)
if (fetchedStatuses.contains(null)) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
}
} finally { } finally {
fetching.synchronized { fetching.synchronized {
fetching -= shuffleId fetching -= shuffleId
fetching.notifyAll() fetching.notifyAll()
} }
} }
return fetchedStatuses.map(s => return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
(s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
} else { } else {
return statuses.map(s => return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
(s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
} }
} }
...@@ -258,6 +252,28 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea ...@@ -258,6 +252,28 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
private[spark] object MapOutputTracker { private[spark] object MapOutputTracker {
private val LOG_BASE = 1.1 private val LOG_BASE = 1.1
// Convert an array of MapStatuses to locations and sizes for a given reduce ID. If
// any of the statuses is null (indicating a missing location due to a failed mapper),
// throw a FetchFailedException.
def convertMapStatuses(
shuffleId: Int,
reduceId: Int,
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
if (statuses == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing all output locations for shuffle " + shuffleId))
}
statuses.map {
status =>
if (status == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
} else {
(status.address, decompressSize(status.compressedSizes(reduceId)))
}
}
}
/** /**
* Compress a size in bytes to 8 bits for efficient reporting of map output sizes. * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
* We do this by encoding the log base 1.1 of the size as an integer, which can support * We do this by encoding the log base 1.1 of the size as an integer, which can support
......
package spark package spark
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import akka.actor._ import akka.actor._
import spark.scheduler.MapStatus import spark.scheduler.MapStatus
import spark.storage.BlockManagerId import spark.storage.BlockManagerId
import spark.util.AkkaUtils
class MapOutputTrackerSuite extends FunSuite { class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter {
after {
System.clearProperty("spark.master.port")
}
test("compressSize") { test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0) assert(MapOutputTracker.compressSize(0L) === 0)
assert(MapOutputTracker.compressSize(1L) === 1) assert(MapOutputTracker.compressSize(1L) === 1)
...@@ -71,6 +77,36 @@ class MapOutputTrackerSuite extends FunSuite { ...@@ -71,6 +77,36 @@ class MapOutputTrackerSuite extends FunSuite {
// The remaining reduce task might try to grab the output dispite the shuffle failure; // The remaining reduce task might try to grab the output dispite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the // this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted. // stage already being aborted.
intercept[Exception] { tracker.getServerStatuses(10, 1) } intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
}
test("remote fetch") {
System.clearProperty("spark.master.host")
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("test", "localhost", 0)
System.setProperty("spark.master.port", boundPort.toString)
val masterTracker = new MapOutputTracker(actorSystem, true)
val slaveTracker = new MapOutputTracker(actorSystem, false)
masterTracker.registerShuffle(10, 1)
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
new BlockManagerId("hostA", 1000), Array(compressedSize1000)))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
Seq((new BlockManagerId("hostA", 1000), size1000)))
masterTracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
// failure should be cached
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment