diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ee82d9fa7874bde4baad0dd9bebe995f26543ea9..182abacc475aeaf13ecbd13a72b4fd9a3b578395 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -25,9 +25,11 @@ import scala.concurrent.Await import akka.actor._ import akka.pattern.ask + +import org.apache.spark.util._ import org.apache.spark.scheduler.MapStatus +import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util._ private[spark] sealed trait MapOutputTrackerMessage private[spark] case class GetMapOutputStatuses(shuffleId: Int) @@ -168,8 +170,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses) } } else { - throw new FetchFailedException(null, shuffleId, -1, reduceId, - new Exception("Missing all output locations for shuffle " + shuffleId)) + throw new MetadataFetchFailedException( + shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId) } } else { statuses.synchronized { @@ -371,8 +373,8 @@ private[spark] object MapOutputTracker { statuses.map { status => if (status == null) { - throw new FetchFailedException(null, shuffleId, -1, reduceId, - new Exception("Missing an output location for shuffle " + shuffleId)) + throw new MetadataFetchFailedException( + shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId) } else { (status.location, decompressSize(status.compressedSizes(reduceId))) } diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 5e8bd8c8e533a55d637fdb706657a670c1dff9e9..df42d679b4699cf296b613b396614f2666e85427 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -65,7 +65,7 @@ case object Resubmitted extends TaskFailedReason { */ @DeveloperApi case class FetchFailed( - bmAddress: BlockManagerId, + bmAddress: BlockManagerId, // Note that bmAddress can be null shuffleId: Int, mapId: Int, reduceId: Int) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 557b9a3f46a082512c9ba15434cbaf28d2bd5119..4d3ba11633bf5709fb6ee4c3879c9d2370520eb0 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -26,8 +26,8 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import org.apache.spark._ -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util.{AkkaUtils, Utils} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 1481d70db42e1dc6017b21276be44a394cd1e27c..4c96b9e5fef6099c1ca59ae07523ae5aec480db1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -21,6 +21,10 @@ import java.nio.ByteBuffer import org.apache.spark.util.SerializableBuffer +/** + * Description of a task that gets passed onto executors to be executed, usually created by + * [[TaskSetManager.resourceOffer]]. + */ private[spark] class TaskDescription( val taskId: Long, val executorId: String, diff --git a/core/src/main/scala/org/apache/spark/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala similarity index 50% rename from core/src/main/scala/org/apache/spark/FetchFailedException.scala rename to core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala index 8eaa26bdb1b5b866d507213672f5c0219351b48f..71c08e9d5a8c3cbcba47516e89a7abf91a96d8d1 100644 --- a/core/src/main/scala/org/apache/spark/FetchFailedException.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala @@ -15,31 +15,38 @@ * limitations under the License. */ -package org.apache.spark +package org.apache.spark.shuffle import org.apache.spark.storage.BlockManagerId +import org.apache.spark.{FetchFailed, TaskEndReason} +/** + * Failed to fetch a shuffle block. The executor catches this exception and propagates it + * back to DAGScheduler (through TaskEndReason) so we'd resubmit the previous stage. + * + * Note that bmAddress can be null. + */ private[spark] class FetchFailedException( - taskEndReason: TaskEndReason, - message: String, - cause: Throwable) + bmAddress: BlockManagerId, + shuffleId: Int, + mapId: Int, + reduceId: Int) extends Exception { - def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, - cause: Throwable) = - this(FetchFailed(bmAddress, shuffleId, mapId, reduceId), - "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId), - cause) - - def this (shuffleId: Int, reduceId: Int, cause: Throwable) = - this(FetchFailed(null, shuffleId, -1, reduceId), - "Unable to fetch locations from master: %d %d".format(shuffleId, reduceId), cause) - - override def getMessage(): String = message + override def getMessage: String = + "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId) + def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId) +} - override def getCause(): Throwable = cause - - def toTaskEndReason: TaskEndReason = taskEndReason +/** + * Failed to get shuffle metadata from [[org.apache.spark.MapOutputTracker]]. + */ +private[spark] class MetadataFetchFailedException( + shuffleId: Int, + reduceId: Int, + message: String) + extends FetchFailedException(null, shuffleId, -1, reduceId) { + override def getMessage: String = message } diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index b05b6ea345df3231f25bc69f878f7cfb0cda8f4d..a932455776e3467a482d774dd50023238a8de111 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -20,11 +20,12 @@ package org.apache.spark.shuffle.hash import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap +import org.apache.spark._ import org.apache.spark.executor.ShuffleReadMetrics import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId} import org.apache.spark.util.CompletionIterator -import org.apache.spark._ private[hash] object BlockStoreShuffleFetcher extends Logging { def fetch[T]( @@ -63,7 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { blockId match { case ShuffleBlockId(shufId, mapId, _) => val address = statuses(mapId.toInt)._1 - throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null) + throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId) case _ => throw new SparkException( "Failed to get block " + blockId + ", which is not a shuffle block") diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 95ba273f16a71e584c69f2b6eaabf29c874565df..9702838085627b7fd555071eff872d8a25867a94 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -24,6 +24,7 @@ import akka.testkit.TestActorRef import org.scalatest.FunSuite import org.apache.spark.scheduler.MapStatus +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AkkaUtils