Skip to content
Snippets Groups Projects
Commit 0bc0a60d authored by Tathagata Das's avatar Tathagata Das
Browse files

Modifications to make sure LocalScheduler terminate cleanly without errors...

Modifications to make sure LocalScheduler terminate cleanly without errors when SparkContext is shutdown, to minimize spurious exception during master failure tests.
parent 7c33f762
No related branches found
No related tags found
No related merge requests found
......@@ -488,17 +488,19 @@ class SparkContext(
if (dagScheduler != null) {
dagScheduler.stop()
dagScheduler = null
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
// Clean up locally linked files
clearFiles()
clearJars()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
ResultTask.clearCache()
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
}
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
// Clean up locally linked files
clearFiles()
clearJars()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
ResultTask.clearCache()
logInfo("Successfully stopped SparkContext")
}
/**
......
......@@ -81,7 +81,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
ser.serialize(Accumulators.values))
logInfo("Finished task " + idInJob)
listener.taskEnded(task, Success, resultToReturn, accumUpdates)
// If the threadpool has not already been shutdown, notify DAGScheduler
if (!Thread.currentThread().isInterrupted)
listener.taskEnded(task, Success, resultToReturn, accumUpdates)
} catch {
case t: Throwable => {
logError("Exception in task " + idInJob, t)
......@@ -91,7 +94,8 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
submitTask(task, idInJob)
} else {
// TODO: Do something nicer here to return all the way to the user
listener.taskEnded(task, new ExceptionFailure(t), null, null)
if (!Thread.currentThread().isInterrupted)
listener.taskEnded(task, new ExceptionFailure(t), null, null)
}
}
}
......
# Set everything to be logged to the console
# Set everything to be logged to the file spark-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
......
......@@ -47,6 +47,8 @@ object TestObject {
val nums = sc.parallelize(Array(1, 2, 3, 4))
val answer = nums.map(_ + x).reduce(_ + _)
sc.stop()
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
return answer
}
}
......
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Set everything to be logged to the file streaming-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment