Skip to content
Snippets Groups Projects
Commit ad8aff6c authored by folone's avatar folone
Browse files

Merge remote-tracking branch 'upstream/master'

parents a5403acd 54c0f9f1
No related branches found
No related tags found
No related merge requests found
...@@ -139,8 +139,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea ...@@ -139,8 +139,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
case e: InterruptedException => case e: InterruptedException =>
} }
} }
return mapStatuses.get(shuffleId).map(status => return MapOutputTracker.convertMapStatuses(shuffleId, reduceId,
(status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId)))) mapStatuses.get(shuffleId))
} else { } else {
fetching += shuffleId fetching += shuffleId
} }
...@@ -156,21 +156,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea ...@@ -156,21 +156,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
fetchedStatuses = deserializeStatuses(fetchedBytes) fetchedStatuses = deserializeStatuses(fetchedBytes)
logInfo("Got the output locations") logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses) mapStatuses.put(shuffleId, fetchedStatuses)
if (fetchedStatuses.contains(null)) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
}
} finally { } finally {
fetching.synchronized { fetching.synchronized {
fetching -= shuffleId fetching -= shuffleId
fetching.notifyAll() fetching.notifyAll()
} }
} }
return fetchedStatuses.map(s => return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
(s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
} else { } else {
return statuses.map(s => return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
(s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
} }
} }
...@@ -258,6 +252,28 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea ...@@ -258,6 +252,28 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
private[spark] object MapOutputTracker { private[spark] object MapOutputTracker {
private val LOG_BASE = 1.1 private val LOG_BASE = 1.1
// Convert an array of MapStatuses to locations and sizes for a given reduce ID. If
// any of the statuses is null (indicating a missing location due to a failed mapper),
// throw a FetchFailedException.
def convertMapStatuses(
shuffleId: Int,
reduceId: Int,
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
if (statuses == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing all output locations for shuffle " + shuffleId))
}
statuses.map {
status =>
if (status == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
} else {
(status.address, decompressSize(status.compressedSizes(reduceId)))
}
}
}
/** /**
* Compress a size in bytes to 8 bits for efficient reporting of map output sizes. * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
* We do this by encoding the log base 1.1 of the size as an integer, which can support * We do this by encoding the log base 1.1 of the size as an integer, which can support
......
...@@ -134,7 +134,7 @@ private object Utils extends Logging { ...@@ -134,7 +134,7 @@ private object Utils extends Logging {
*/ */
def fetchFile(url: String, targetDir: File) { def fetchFile(url: String, targetDir: File) {
val filename = url.split("/").last val filename = url.split("/").last
val tempDir = System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")) val tempDir = getLocalDir
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir)) val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename) val targetFile = new File(targetDir, filename)
val uri = new URI(url) val uri = new URI(url)
...@@ -204,6 +204,15 @@ private object Utils extends Logging { ...@@ -204,6 +204,15 @@ private object Utils extends Logging {
FileUtil.chmod(filename, "a+x") FileUtil.chmod(filename, "a+x")
} }
/**
* Get a temporary directory using Spark's spark.local.dir property, if set. This will always
* return a single directory, even though the spark.local.dir property might be a list of
* multiple paths.
*/
def getLocalDir: String = {
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
}
/** /**
* Shuffle the elements of a collection into a random order, returning the * Shuffle the elements of a collection into a random order, returning the
* result in a new collection. Unlike scala.util.Random.shuffle, this method * result in a new collection. Unlike scala.util.Random.shuffle, this method
......
...@@ -89,7 +89,7 @@ private object HttpBroadcast extends Logging { ...@@ -89,7 +89,7 @@ private object HttpBroadcast extends Logging {
} }
private def createServer() { private def createServer() {
broadcastDir = Utils.createTempDir() broadcastDir = Utils.createTempDir(Utils.getLocalDir)
server = new HttpServer(broadcastDir) server = new HttpServer(broadcastDir)
server.start() server.start()
serverUri = server.uri serverUri = server.uri
......
...@@ -24,9 +24,6 @@ private[spark] class StandaloneExecutorBackend( ...@@ -24,9 +24,6 @@ private[spark] class StandaloneExecutorBackend(
with ExecutorBackend with ExecutorBackend
with Logging { with Logging {
val threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
var master: ActorRef = null var master: ActorRef = null
override def preStart() { override def preStart() {
......
...@@ -15,9 +15,11 @@ private[spark] class ResultTask[T, U]( ...@@ -15,9 +15,11 @@ private[spark] class ResultTask[T, U](
override def run(attemptId: Long): U = { override def run(attemptId: Long): U = {
val context = new TaskContext(stageId, partition, attemptId) val context = new TaskContext(stageId, partition, attemptId)
val result = func(context, rdd.iterator(split, context)) try {
context.executeOnCompleteCallbacks() func(context, rdd.iterator(split, context))
result } finally {
context.executeOnCompleteCallbacks()
}
} }
override def preferredLocations: Seq[String] = locs override def preferredLocations: Seq[String] = locs
......
package spark package spark
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import akka.actor._ import akka.actor._
import spark.scheduler.MapStatus import spark.scheduler.MapStatus
import spark.storage.BlockManagerId import spark.storage.BlockManagerId
import spark.util.AkkaUtils
class MapOutputTrackerSuite extends FunSuite { class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter {
after {
System.clearProperty("spark.master.port")
}
test("compressSize") { test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0) assert(MapOutputTracker.compressSize(0L) === 0)
assert(MapOutputTracker.compressSize(1L) === 1) assert(MapOutputTracker.compressSize(1L) === 1)
...@@ -71,6 +77,36 @@ class MapOutputTrackerSuite extends FunSuite { ...@@ -71,6 +77,36 @@ class MapOutputTrackerSuite extends FunSuite {
// The remaining reduce task might try to grab the output dispite the shuffle failure; // The remaining reduce task might try to grab the output dispite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the // this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted. // stage already being aborted.
intercept[Exception] { tracker.getServerStatuses(10, 1) } intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
}
test("remote fetch") {
System.clearProperty("spark.master.host")
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("test", "localhost", 0)
System.setProperty("spark.master.port", boundPort.toString)
val masterTracker = new MapOutputTracker(actorSystem, true)
val slaveTracker = new MapOutputTracker(actorSystem, false)
masterTracker.registerShuffle(10, 1)
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
new BlockManagerId("hostA", 1000), Array(compressedSize1000)))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
Seq((new BlockManagerId("hostA", 1000), size1000)))
masterTracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
// failure should be cached
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
} }
} }
package spark.scheduler
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import spark.TaskContext
import spark.RDD
import spark.SparkContext
import spark.Split
class TaskContextSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
after {
if (sc != null) {
sc.stop()
sc = null
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
test("Calls executeOnCompleteCallbacks after failure") {
var completed = false
sc = new SparkContext("local", "test")
val rdd = new RDD[String](sc) {
override val splits = Array[Split](StubSplit(0))
override val dependencies = List()
override def compute(split: Split, context: TaskContext) = {
context.addOnCompleteCallback(() => completed = true)
sys.error("failed")
}
}
val func = (c: TaskContext, i: Iterator[String]) => i.next
val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
intercept[RuntimeException] {
task.run(0)
}
assert(completed === true)
}
case class StubSplit(val index: Int) extends Split
}
\ No newline at end of file
...@@ -6,6 +6,13 @@ FWDIR="$(cd `dirname $0`; pwd)" ...@@ -6,6 +6,13 @@ FWDIR="$(cd `dirname $0`; pwd)"
# Export this as SPARK_HOME # Export this as SPARK_HOME
export SPARK_HOME="$FWDIR" export SPARK_HOME="$FWDIR"
# Exit if the user hasn't compiled Spark
if [ ! -e "$SPARK_HOME/repl/target" ]; then
echo "Failed to find Spark classes in $SPARK_HOME/repl/target" >&2
echo "You need to compile Spark before running this program" >&2
exit 1
fi
# Load environment variables from conf/spark-env.sh, if it exists # Load environment variables from conf/spark-env.sh, if it exists
if [ -e $FWDIR/conf/spark-env.sh ] ; then if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh . $FWDIR/conf/spark-env.sh
......
...@@ -65,6 +65,13 @@ EXAMPLES_DIR="$FWDIR/examples" ...@@ -65,6 +65,13 @@ EXAMPLES_DIR="$FWDIR/examples"
BAGEL_DIR="$FWDIR/bagel" BAGEL_DIR="$FWDIR/bagel"
PYSPARK_DIR="$FWDIR/python" PYSPARK_DIR="$FWDIR/python"
# Exit if the user hasn't compiled Spark
if [ ! -e "$REPL_DIR/target" ]; then
echo "Failed to find Spark classes in $REPL_DIR/target" >&2
echo "You need to compile Spark before running this program" >&2
exit 1
fi
# Build up classpath # Build up classpath
CLASSPATH="$SPARK_CLASSPATH" CLASSPATH="$SPARK_CLASSPATH"
CLASSPATH+=":$FWDIR/conf" CLASSPATH+=":$FWDIR/conf"
......
@echo off @echo off
set SCALA_VERSION=2.9.1 set SCALA_VERSION=2.9.2
rem Figure out where the Spark framework is installed rem Figure out where the Spark framework is installed
set FWDIR=%~dp0 set FWDIR=%~dp0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment