Skip to content
Snippets Groups Projects
Commit 3e2864e4 authored by Tathagata Das's avatar Tathagata Das Committed by Patrick Wendell
Browse files

[SPARK-3139] Made ContextCleaner to not block on shuffles

As a workaround for SPARK-3015, the ContextCleaner was made "blocking", that is, it cleaned items one-by-one. But shuffles can take a long time to be deleted. Given that the RC for 1.1 is imminent, this PR makes a narrow change in the context cleaner - not wait for shuffle cleanups to complete. Also it changes the error messages on failure to delete to be milder warnings, as exceptions in the delete code path for one item does not really stop the actual functioning of the system.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #2143 from tdas/cleaner-shuffle-fix and squashes the following commits:

9c84202 [Tathagata Das] Restoring default blocking behavior in ContextCleanerSuite, and added docs to identify that spark.cleaner.referenceTracking.blocking does not control shuffle.
2181329 [Tathagata Das] Mark shuffle cleanup as non-blocking.
e337cc2 [Tathagata Das] Changed semantics based on PR comments.
387b578 [Tathagata Das] Made ContextCleaner to not block on shuffles
parent 9d65f271
No related branches found
No related tags found
No related merge requests found
......@@ -65,7 +65,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
/**
* Whether the cleaning thread will block on cleanup tasks.
* Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
*
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
......@@ -76,6 +77,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val blockOnCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking", true)
/**
* Whether the cleaning thread will block on shuffle cleanup tasks.
*
* When context cleaner is configured to block on every delete request, it can throw timeout
* exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
* parameter by default disables blocking on shuffle cleanups. Note that this does not affect
* the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
* until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
* resolved.
*/
private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking.shuffle", false)
@volatile private var stopped = false
/** Attach a listener object to get information of when objects are cleaned. */
......@@ -128,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
}
......
......@@ -101,7 +101,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
def removeRdd(rddId: Int, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
case e: Throwable => logError("Failed to remove RDD " + rddId, e)
case e: Exception =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
}
if (blocking) {
Await.result(future, timeout)
......@@ -112,7 +113,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
def removeShuffle(shuffleId: Int, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.onFailure {
case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e)
case e: Exception =>
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
}
if (blocking) {
Await.result(future, timeout)
......@@ -124,9 +126,9 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
val future = askDriverWithReply[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.onFailure {
case e: Throwable =>
logError("Failed to remove broadcast " + broadcastId +
" with removeFromMaster = " + removeFromMaster, e)
case e: Exception =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
}
if (blocking) {
Await.result(future, timeout)
......
......@@ -52,6 +52,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[Ha
.setMaster("local[2]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
.set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
.set("spark.shuffle.manager", shuffleManager.getName)
before {
......@@ -243,6 +244,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
.setMaster("local-cluster[2, 1, 512]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
.set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
.set("spark.shuffle.manager", shuffleManager.getName)
sc = new SparkContext(conf2)
......@@ -319,6 +321,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor
.setMaster("local-cluster[2, 1, 512]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
.set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
.set("spark.shuffle.manager", shuffleManager.getName)
sc = new SparkContext(conf2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment