diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index d23c1533db75838807f4151977f209696ffe1115..bc732535fed8795469c9e9061affd51f9aad7f21 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -18,12 +18,13 @@ package org.apache.spark import java.lang.ref.{ReferenceQueue, WeakReference} +import java.util.concurrent.{TimeUnit, ScheduledExecutorService} import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * Classes that represent cleaning tasks. @@ -66,6 +67,20 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val cleaningThread = new Thread() { override def run() { keepCleaning() }} + private val periodicGCService: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc") + + /** + * How often to trigger a garbage collection in this JVM. + * + * This context cleaner triggers cleanups only when weak references are garbage collected. + * In long-running applications with large driver JVMs, where there is little memory pressure + * on the driver, this may happen very occasionally or not at all. Not cleaning at all may + * lead to executors running out of disk space after a while. + */ + private val periodicGCInterval = + sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min") + /** * Whether the cleaning thread will block on cleanup tasks (other than shuffle, which * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter). @@ -104,6 +119,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { cleaningThread.setDaemon(true) cleaningThread.setName("Spark Context Cleaner") cleaningThread.start() + periodicGCService.scheduleAtFixedRate(new Runnable { + override def run(): Unit = System.gc() + }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS) } /** @@ -119,6 +137,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { cleaningThread.interrupt() } cleaningThread.join() + periodicGCService.shutdown() } /** Register a RDD for cleanup when it is garbage collected. */