Skip to content
Snippets Groups Projects
Commit bf578dea authored by Reynold Xin's avatar Reynold Xin
Browse files

Removed throwable field from FetchFailedException and added MetadataFetchFailedException

FetchFailedException used to have a Throwable field, but in reality we never propagate any of the throwable/exceptions back to the driver because Executor explicitly looks for FetchFailedException and then sends FetchFailed as the TaskEndReason.

This pull request removes the throwable and adds a MetadataFetchFailedException that extends FetchFailedException (so now MapOutputTracker throws MetadataFetchFailedException instead).

Author: Reynold Xin <rxin@apache.org>

Closes #1227 from rxin/metadataFetchException and squashes the following commits:

5cb1e0a [Reynold Xin] MetadataFetchFailedException extends FetchFailedException.
8861ee2 [Reynold Xin] Throw MetadataFetchFailedException in MapOutputTracker.
parent 981bde9b
No related branches found
No related tags found
No related merge requests found
......@@ -25,9 +25,11 @@ import scala.concurrent.Await
import akka.actor._
import akka.pattern.ask
import org.apache.spark.util._
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util._
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
......@@ -168,8 +170,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
} else {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing all output locations for shuffle " + shuffleId))
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses.synchronized {
......@@ -371,8 +373,8 @@ private[spark] object MapOutputTracker {
statuses.map {
status =>
if (status == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
(status.location, decompressSize(status.compressedSizes(reduceId)))
}
......
......@@ -65,7 +65,7 @@ case object Resubmitted extends TaskFailedReason {
*/
@DeveloperApi
case class FetchFailed(
bmAddress: BlockManagerId,
bmAddress: BlockManagerId, // Note that bmAddress can be null
shuffleId: Int,
mapId: Int,
reduceId: Int)
......
......@@ -26,8 +26,8 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.{AkkaUtils, Utils}
......
......@@ -21,6 +21,10 @@ import java.nio.ByteBuffer
import org.apache.spark.util.SerializableBuffer
/**
* Description of a task that gets passed onto executors to be executed, usually created by
* [[TaskSetManager.resourceOffer]].
*/
private[spark] class TaskDescription(
val taskId: Long,
val executorId: String,
......
......@@ -15,31 +15,38 @@
* limitations under the License.
*/
package org.apache.spark
package org.apache.spark.shuffle
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.{FetchFailed, TaskEndReason}
/**
* Failed to fetch a shuffle block. The executor catches this exception and propagates it
* back to DAGScheduler (through TaskEndReason) so we'd resubmit the previous stage.
*
* Note that bmAddress can be null.
*/
private[spark] class FetchFailedException(
taskEndReason: TaskEndReason,
message: String,
cause: Throwable)
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
extends Exception {
def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int,
cause: Throwable) =
this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
cause)
def this (shuffleId: Int, reduceId: Int, cause: Throwable) =
this(FetchFailed(null, shuffleId, -1, reduceId),
"Unable to fetch locations from master: %d %d".format(shuffleId, reduceId), cause)
override def getMessage(): String = message
override def getMessage: String =
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId)
}
override def getCause(): Throwable = cause
def toTaskEndReason: TaskEndReason = taskEndReason
/**
* Failed to get shuffle metadata from [[org.apache.spark.MapOutputTracker]].
*/
private[spark] class MetadataFetchFailedException(
shuffleId: Int,
reduceId: Int,
message: String)
extends FetchFailedException(null, shuffleId, -1, reduceId) {
override def getMessage: String = message
}
......@@ -20,11 +20,12 @@ package org.apache.spark.shuffle.hash
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.executor.ShuffleReadMetrics
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator
import org.apache.spark._
private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
......@@ -63,7 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block")
......
......@@ -24,6 +24,7 @@ import akka.testkit.TestActorRef
import org.scalatest.FunSuite
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment