Skip to content
Snippets Groups Projects
Commit bb870e72 authored by jerryshao's avatar jerryshao Committed by Tathagata Das
Browse files

[SPARK-5523] [CORE] [STREAMING] Add a cache for hostname in TaskMetrics to...

[SPARK-5523] [CORE] [STREAMING] Add a cache for hostname in TaskMetrics to decrease the memory usage and GC overhead

Hostname in TaskMetrics will be created through deserialization, mostly the number of hostname is only the order of number of cluster node, so adding a cache layer to dedup the object could reduce the memory usage and alleviate GC overhead, especially for long-running and fast job generation applications like Spark Streaming.

Author: jerryshao <saisai.shao@intel.com>
Author: Saisai Shao <saisai.shao@intel.com>

Closes #5064 from jerryshao/SPARK-5523 and squashes the following commits:

3e2412a [jerryshao] Address the comments
b092a81 [Saisai Shao] Add a pool to cache the hostname
parent f957796c
No related branches found
No related tags found
No related merge requests found
......@@ -17,11 +17,15 @@
package org.apache.spark.executor
import java.io.{IOException, ObjectInputStream}
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.DataReadMethod.DataReadMethod
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
......@@ -210,10 +214,26 @@ class TaskMetrics extends Serializable {
private[spark] def updateInputMetrics(): Unit = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}
@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
// Get the hostname from cached data, since hostname is the order of number of nodes in
// cluster, so using cached hostname will decrease the object number and alleviate the GC
// overhead.
_hostname = TaskMetrics.getCachedHostName(_hostname)
}
}
private[spark] object TaskMetrics {
private val hostNameCache = new ConcurrentHashMap[String, String]()
def empty: TaskMetrics = new TaskMetrics
def getCachedHostName(host: String): String = {
val canonicalHost = hostNameCache.putIfAbsent(host, host)
if (canonicalHost != null) canonicalHost else host
}
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment