Skip to content
Snippets Groups Projects
Commit 1714350b authored by Yin Huai's avatar Yin Huai Committed by Reynold Xin
Browse files

[SPARK-11792][SQL] SizeEstimator cannot provide a good size estimation of UnsafeHashedRelations

https://issues.apache.org/jira/browse/SPARK-11792

Right now, SizeEstimator will "think" a small UnsafeHashedRelation is several GBs.

Author: Yin Huai <yhuai@databricks.com>

Closes #9788 from yhuai/SPARK-11792.
parent 5e2b4447
No related branches found
No related tags found
No related merge requests found
......@@ -215,6 +215,9 @@ public class TaskMemoryManager {
logger.info(
"{} bytes of memory were used by task {} but are not associated with specific consumers",
memoryNotAccountedFor, taskAttemptId);
logger.info(
"{} bytes of memory are used for execution and {} bytes of memory are used for storage",
memoryManager.executionMemoryUsed(), memoryManager.storageMemoryUsed());
}
}
......
......@@ -31,6 +31,16 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.OpenHashSet
/**
* A trait that allows a class to give [[SizeEstimator]] more accurate size estimation.
* When a class extends it, [[SizeEstimator]] will query the `estimatedSize` first.
* If `estimatedSize` does not return [[None]], [[SizeEstimator]] will use the returned size
* as the size of the object. Otherwise, [[SizeEstimator]] will do the estimation work.
*/
private[spark] trait SizeEstimation {
def estimatedSize: Option[Long]
}
/**
* :: DeveloperApi ::
* Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
......@@ -199,10 +209,18 @@ object SizeEstimator extends Logging {
// the size estimator since it references the whole REPL. Do nothing in this case. In
// general all ClassLoaders and Classes will be shared between objects anyway.
} else {
val classInfo = getClassInfo(cls)
state.size += alignSize(classInfo.shellSize)
for (field <- classInfo.pointerFields) {
state.enqueue(field.get(obj))
val estimatedSize = obj match {
case s: SizeEstimation => s.estimatedSize
case _ => None
}
if (estimatedSize.isDefined) {
state.size += estimatedSize.get
} else {
val classInfo = getClassInfo(cls)
state.size += alignSize(classInfo.shellSize)
for (field <- classInfo.pointerFields) {
state.enqueue(field.get(obj))
}
}
}
}
......
......@@ -60,6 +60,18 @@ class DummyString(val arr: Array[Char]) {
@transient val hash32: Int = 0
}
class DummyClass8 extends SizeEstimation {
val x: Int = 0
override def estimatedSize: Option[Long] = Some(2015)
}
class DummyClass9 extends SizeEstimation {
val x: Int = 0
override def estimatedSize: Option[Long] = None
}
class SizeEstimatorSuite
extends SparkFunSuite
with BeforeAndAfterEach
......@@ -214,4 +226,14 @@ class SizeEstimatorSuite
// Class should be 32 bytes on s390x if recognised as 64 bit platform
assertResult(32)(SizeEstimator.estimate(new DummyClass7))
}
test("SizeEstimation can provide the estimated size") {
// DummyClass8 provides its size estimation.
assertResult(2015)(SizeEstimator.estimate(new DummyClass8))
assertResult(20206)(SizeEstimator.estimate(Array.fill(10)(new DummyClass8)))
// DummyClass9 does not provide its size estimation.
assertResult(16)(SizeEstimator.estimate(new DummyClass9))
assertResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass9)))
}
}
......@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.unsafe.memory.MemoryLocation
import org.apache.spark.util.Utils
import org.apache.spark.util.{SizeEstimation, Utils}
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.{SparkConf, SparkEnv}
......@@ -189,7 +189,9 @@ private[execution] object HashedRelation {
*/
private[joins] final class UnsafeHashedRelation(
private var hashTable: JavaHashMap[UnsafeRow, CompactBuffer[UnsafeRow]])
extends HashedRelation with Externalizable {
extends HashedRelation
with SizeEstimation
with Externalizable {
private[joins] def this() = this(null) // Needed for serialization
......@@ -215,6 +217,10 @@ private[joins] final class UnsafeHashedRelation(
}
}
override def estimatedSize: Option[Long] = {
Option(binaryMap).map(_.getTotalMemoryConsumption)
}
override def get(key: InternalRow): Seq[InternalRow] = {
val unsafeKey = key.asInstanceOf[UnsafeRow]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment