Skip to content
Snippets Groups Projects
Commit 2053d793 authored by Reynold Xin's avatar Reynold Xin
Browse files

Improve MapOutputTracker error logging.

Author: Reynold Xin <rxin@apache.org>

Closes #1258 from rxin/mapOutputTracker and squashes the following commits:

a7c95b6 [Reynold Xin] Improve MapOutputTracker error logging.
parent 3c104c79
No related branches found
No related tags found
No related merge requests found
...@@ -26,10 +26,10 @@ import scala.concurrent.Await ...@@ -26,10 +26,10 @@ import scala.concurrent.Await
import akka.actor._ import akka.actor._
import akka.pattern.ask import akka.pattern.ask
import org.apache.spark.util._
import org.apache.spark.scheduler.MapStatus import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.BlockManagerId import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util._
private[spark] sealed trait MapOutputTrackerMessage private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int) private[spark] case class GetMapOutputStatuses(shuffleId: Int)
...@@ -107,14 +107,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging ...@@ -107,14 +107,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
Await.result(future, timeout) Await.result(future, timeout)
} catch { } catch {
case e: Exception => case e: Exception =>
logError("Error communicating with MapOutputTracker", e)
throw new SparkException("Error communicating with MapOutputTracker", e) throw new SparkException("Error communicating with MapOutputTracker", e)
} }
} }
/** Send a one-way message to the trackerActor, to which we expect it to reply with true. */ /** Send a one-way message to the trackerActor, to which we expect it to reply with true. */
protected def sendTracker(message: Any) { protected def sendTracker(message: Any) {
if (askTracker(message) != true) { val response = askTracker(message)
throw new SparkException("Error reply received from MapOutputTracker") if (response != true) {
throw new SparkException(
"Error reply received from MapOutputTracker. Expecting true, got " + response.toString)
} }
} }
...@@ -366,9 +369,9 @@ private[spark] object MapOutputTracker { ...@@ -366,9 +369,9 @@ private[spark] object MapOutputTracker {
// any of the statuses is null (indicating a missing location due to a failed mapper), // any of the statuses is null (indicating a missing location due to a failed mapper),
// throw a FetchFailedException. // throw a FetchFailedException.
private def convertMapStatuses( private def convertMapStatuses(
shuffleId: Int, shuffleId: Int,
reduceId: Int, reduceId: Int,
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
assert (statuses != null) assert (statuses != null)
statuses.map { statuses.map {
status => status =>
...@@ -403,7 +406,7 @@ private[spark] object MapOutputTracker { ...@@ -403,7 +406,7 @@ private[spark] object MapOutputTracker {
if (compressedSize == 0) { if (compressedSize == 0) {
0 0
} else { } else {
math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong math.pow(LOG_BASE, compressedSize & 0xFF).toLong
} }
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment