From a848f0ba84e37fd95d0f47863ec68326e3296b33 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu <shixiong@databricks.com> Date: Fri, 23 Dec 2016 15:38:41 -0800 Subject: [PATCH] [SPARK-18991][CORE] Change ContextCleaner.referenceBuffer to use ConcurrentHashMap to make it faster ## What changes were proposed in this pull request? The time complexity of ConcurrentHashMap's `remove` is O(1). Changing ContextCleaner.referenceBuffer's type from `ConcurrentLinkedQueue` to `ConcurrentHashMap's` will make the removal much faster. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16390 from zsxwing/SPARK-18991. --- .../org/apache/spark/ContextCleaner.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index af913454fc..4d884dec07 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -18,7 +18,8 @@ package org.apache.spark import java.lang.ref.{ReferenceQueue, WeakReference} -import java.util.concurrent.{ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} +import java.util.Collections +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} import scala.collection.JavaConverters._ @@ -58,7 +59,12 @@ private class CleanupTaskWeakReference( */ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { - private val referenceBuffer = new ConcurrentLinkedQueue[CleanupTaskWeakReference]() + /** + * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they + * have not been handled by the reference queue. + */ + private val referenceBuffer = + Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap) private val referenceQueue = new ReferenceQueue[AnyRef] @@ -176,10 +182,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { .map(_.asInstanceOf[CleanupTaskWeakReference]) // Synchronize here to avoid being interrupted on stop() synchronized { - reference.map(_.task).foreach { task => - logDebug("Got cleaning task " + task) - referenceBuffer.remove(reference.get) - task match { + reference.foreach { ref => + logDebug("Got cleaning task " + ref.task) + referenceBuffer.remove(ref) + ref.task match { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => -- GitLab