Skip to content
Snippets Groups Projects
Commit a1ecec8d authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge branch 'master' of github.com:mesos/spark

parents 286f8f87 f6eb1f08
No related branches found
No related tags found
No related merge requests found
......@@ -673,6 +673,16 @@ object SparkContext {
def zero(initialValue: Int) = 0
}
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
def zero(initialValue: Long) = 0l
}
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float) = t1 + t2
def zero(initialValue: Float) = 0f
}
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
......
......@@ -308,10 +308,10 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
} else {
// TODO: We might want to run this less often, when we are sure that something has become
// runnable that wasn't before.
logDebug("Checking for newly runnable parent stages")
logDebug("running: " + running)
logDebug("waiting: " + waiting)
logDebug("failed: " + failed)
logTrace("Checking for newly runnable parent stages")
logTrace("running: " + running)
logTrace("waiting: " + waiting)
logTrace("failed: " + failed)
val waiting2 = waiting.toArray
waiting.clear()
for (stage <- waiting2.sortBy(_.priority)) {
......@@ -393,6 +393,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
logDebug("New pending tasks: " + myPending)
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
if (!stage.submissionTime.isDefined) {
stage.submissionTime = Some(System.currentTimeMillis())
}
} else {
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
......@@ -407,6 +410,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stage = idToStage(task.stageId)
def markStageAsFinished(stage: Stage) = {
val serviceTime = stage.submissionTime match {
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
case _ => "Unkown"
}
logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime))
running -= stage
}
event.reason match {
case Success =>
logInfo("Completed " + task)
......@@ -421,13 +433,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
if (!job.finished(rt.outputId)) {
job.finished(rt.outputId) = true
job.numFinished += 1
job.listener.taskSucceeded(rt.outputId, event.result)
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
activeJobs -= job
resultStageToJob -= stage
running -= stage
markStageAsFinished(stage)
}
job.listener.taskSucceeded(rt.outputId, event.result)
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
......@@ -444,8 +456,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
stage.addOutputLoc(smt.partition, status)
}
if (running.contains(stage) && pendingTasks(stage).isEmpty) {
logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages")
running -= stage
markStageAsFinished(stage)
logInfo("looking for newly runnable stages")
logInfo("running: " + running)
logInfo("waiting: " + waiting)
logInfo("failed: " + failed)
......
......@@ -32,6 +32,9 @@ private[spark] class Stage(
val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
var numAvailableOutputs = 0
/** When first task was submitted to scheduler. */
var submissionTime: Option[Long] = None
private var nextAttemptId = 0
def isAvailable: Boolean = {
......
......@@ -243,7 +243,7 @@ class BlockManager(
val startTimeMs = System.currentTimeMillis
var managers = master.getLocations(blockId)
val locations = managers.map(_.ip)
logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs))
logDebug("Got block locations in " + Utils.getUsedTimeMs(startTimeMs))
return locations
}
......@@ -253,7 +253,7 @@ class BlockManager(
def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
val startTimeMs = System.currentTimeMillis
val locations = master.getLocations(blockIds).map(_.map(_.ip).toSeq).toArray
logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
return locations
}
......@@ -645,7 +645,7 @@ class BlockManager(
var size = 0L
myInfo.synchronized {
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
if (level.useMemory) {
......@@ -677,8 +677,10 @@ class BlockManager(
}
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
// Replicate block if required
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
......@@ -688,12 +690,10 @@ class BlockManager(
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, level)
logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime))
}
BlockManager.dispose(bytesAfterPut)
logDebug("Put block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs))
return size
}
......@@ -978,7 +978,7 @@ object BlockManager extends Logging {
*/
def dispose(buffer: ByteBuffer) {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
logDebug("Unmapping " + buffer)
logTrace("Unmapping " + buffer)
if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
buffer.asInstanceOf[DirectBuffer].cleaner().clean()
}
......
......@@ -115,7 +115,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
def expireDeadHosts() {
logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
val toRemove = new HashSet[BlockManagerId]
......
......@@ -17,6 +17,12 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
val d = sc.parallelize(1 to 20)
d.foreach{x => acc += x}
acc.value should be (210)
val longAcc = sc.accumulator(0l)
val maxInt = Integer.MAX_VALUE.toLong
d.foreach{x => longAcc += maxInt + x}
longAcc.value should be (210l + maxInt * 20)
}
test ("value not assignable from tasks") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment