Skip to content
Snippets Groups Projects
Commit fe63e822 authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-6132] ContextCleaner race condition across SparkContexts

The problem is that `ContextCleaner` may clean variables that belong to a different `SparkContext`. This can happen if the `SparkContext` to which the cleaner belongs stops, and a new one is started immediately afterwards in the same JVM. In this case, if the cleaner is in the middle of cleaning a broadcast, for instance, it will do so through `SparkEnv.get.blockManager`, which could be one that belongs to a different `SparkContext`.

JoshRosen and I suspect that this is the cause of many flaky tests, most notably the `JavaAPISuite`. We were able to reproduce the failure locally (though it is not deterministic and very hard to reproduce).

Author: Andrew Or <andrew@databricks.com>

Closes #4869 from andrewor14/cleaner-masquerade and squashes the following commits:

29168c0 [Andrew Or] Synchronize ContextCleaner stop
parent e750a6bf
No related branches found
No related tags found
No related merge requests found
......@@ -105,9 +105,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.start()
}
/** Stop the cleaner. */
/**
* Stop the cleaning thread and wait until the thread has finished running its current task.
*/
def stop() {
stopped = true
// Interrupt the cleaning thread, but wait until the current task has finished before
// doing so. This guards against the race condition where a cleaning thread may
// potentially clean similarly named variables created by a different SparkContext,
// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
synchronized {
cleaningThread.interrupt()
}
cleaningThread.join()
}
/** Register a RDD for cleanup when it is garbage collected. */
......@@ -140,18 +150,21 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
reference.map(_.task).foreach { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.map(_.task).foreach { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
}
}
}
} catch {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment