Skip to content
Snippets Groups Projects
Commit a848f0ba authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-18991][CORE] Change ContextCleaner.referenceBuffer to use...

[SPARK-18991][CORE] Change ContextCleaner.referenceBuffer to use ConcurrentHashMap to make it faster

## What changes were proposed in this pull request?

The time complexity of ConcurrentHashMap's `remove` is O(1). Changing ContextCleaner.referenceBuffer's type from `ConcurrentLinkedQueue` to `ConcurrentHashMap's` will make the removal much faster.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16390 from zsxwing/SPARK-18991.
parent 1311448e
No related branches found
No related tags found
No related merge requests found
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
package org.apache.spark package org.apache.spark
import java.lang.ref.{ReferenceQueue, WeakReference} import java.lang.ref.{ReferenceQueue, WeakReference}
import java.util.concurrent.{ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} import java.util.Collections
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
...@@ -58,7 +59,12 @@ private class CleanupTaskWeakReference( ...@@ -58,7 +59,12 @@ private class CleanupTaskWeakReference(
*/ */
private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val referenceBuffer = new ConcurrentLinkedQueue[CleanupTaskWeakReference]() /**
* A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they
* have not been handled by the reference queue.
*/
private val referenceBuffer =
Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
private val referenceQueue = new ReferenceQueue[AnyRef] private val referenceQueue = new ReferenceQueue[AnyRef]
...@@ -176,10 +182,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { ...@@ -176,10 +182,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
.map(_.asInstanceOf[CleanupTaskWeakReference]) .map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop() // Synchronize here to avoid being interrupted on stop()
synchronized { synchronized {
reference.map(_.task).foreach { task => reference.foreach { ref =>
logDebug("Got cleaning task " + task) logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(reference.get) referenceBuffer.remove(ref)
task match { ref.task match {
case CleanRDD(rddId) => case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks) doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) => case CleanShuffle(shuffleId) =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment