diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 3848734d6f6391e6c7f92595079be130b5a20da2..ede1e23f4fcc5c8a3658720dde35394c65c12c1c 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -65,7 +65,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
   private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
 
   /**
-   * Whether the cleaning thread will block on cleanup tasks.
+   * Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
+   * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
    *
    * Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
    * workaround for the issue, which is ultimately caused by the way the BlockManager actors
@@ -76,6 +77,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
   private val blockOnCleanupTasks = sc.conf.getBoolean(
     "spark.cleaner.referenceTracking.blocking", true)
 
+  /**
+   * Whether the cleaning thread will block on shuffle cleanup tasks.
+   *
+   * When context cleaner is configured to block on every delete request, it can throw timeout
+   * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
+   * parameter by default disables blocking on shuffle cleanups. Note that this does not affect
+   * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
+   * until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
+   * resolved.
+   */
+  private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
+    "spark.cleaner.referenceTracking.blocking.shuffle", false)
+
   @volatile private var stopped = false
 
   /** Attach a listener object to get information of when objects are cleaned. */
@@ -128,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
             case CleanRDD(rddId) =>
               doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
             case CleanShuffle(shuffleId) =>
-              doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
+              doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
             case CleanBroadcast(broadcastId) =>
               doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
           }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 669307765d1fa1f25b35e79bb451f04ebee48424..e67b3dc5ce02ed03e8afabc8c5dcc9ae386d8dec 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -101,7 +101,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
   def removeRdd(rddId: Int, blocking: Boolean) {
     val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
     future.onFailure {
-      case e: Throwable => logError("Failed to remove RDD " + rddId, e)
+      case e: Exception =>
+        logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
     }
     if (blocking) {
       Await.result(future, timeout)
@@ -112,7 +113,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
   def removeShuffle(shuffleId: Int, blocking: Boolean) {
     val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
     future.onFailure {
-      case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e)
+      case e: Exception =>
+        logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
     }
     if (blocking) {
       Await.result(future, timeout)
@@ -124,9 +126,9 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
     val future = askDriverWithReply[Future[Seq[Int]]](
       RemoveBroadcast(broadcastId, removeFromMaster))
     future.onFailure {
-      case e: Throwable =>
-        logError("Failed to remove broadcast " + broadcastId +
-          " with removeFromMaster = " + removeFromMaster, e)
+      case e: Exception =>
+        logWarning(s"Failed to remove broadcast $broadcastId" +
+          s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
     }
     if (blocking) {
       Await.result(future, timeout)
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 4bc4346c0a288708b0c8a0efaf9e265e29afab18..2744894277ae8c6de16f9ee699e1197e29ff032b 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -52,6 +52,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[Ha
     .setMaster("local[2]")
     .setAppName("ContextCleanerSuite")
     .set("spark.cleaner.referenceTracking.blocking", "true")
+    .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
     .set("spark.shuffle.manager", shuffleManager.getName)
 
   before {
@@ -243,6 +244,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
       .setMaster("local-cluster[2, 1, 512]")
       .setAppName("ContextCleanerSuite")
       .set("spark.cleaner.referenceTracking.blocking", "true")
+      .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
       .set("spark.shuffle.manager", shuffleManager.getName)
     sc = new SparkContext(conf2)
 
@@ -319,6 +321,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor
       .setMaster("local-cluster[2, 1, 512]")
       .setAppName("ContextCleanerSuite")
       .set("spark.cleaner.referenceTracking.blocking", "true")
+      .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
       .set("spark.shuffle.manager", shuffleManager.getName)
     sc = new SparkContext(conf2)