Skip to content
Snippets Groups Projects
Commit 8e1c00db authored by Hong Shen's avatar Hong Shen Committed by Sean Owen
Browse files

[SPARK-6738] [CORE] Improve estimate the size of a large array

Currently, SizeEstimator.visitArray is not correct in the follow case,
```
array size > 200,
elem has the share object
```

when I add a debug log in SizeTracker.scala:
```
 System.err.println(s"numUpdates:$numUpdates, size:$ts, bytesPerUpdate:$bytesPerUpdate, cost time:$b")
```
I get the following log:
```
 numUpdates:1, size:262448, bytesPerUpdate:0.0, cost time:35
 numUpdates:2, size:420698, bytesPerUpdate:158250.0, cost time:35
 numUpdates:4, size:420754, bytesPerUpdate:28.0, cost time:32
 numUpdates:7, size:420754, bytesPerUpdate:0.0, cost time:27
 numUpdates:12, size:420754, bytesPerUpdate:0.0, cost time:28
 numUpdates:20, size:420754, bytesPerUpdate:0.0, cost time:25
 numUpdates:32, size:420754, bytesPerUpdate:0.0, cost time:21
 numUpdates:52, size:420754, bytesPerUpdate:0.0, cost time:20
 numUpdates:84, size:420754, bytesPerUpdate:0.0, cost time:20
 numUpdates:135, size:420754, bytesPerUpdate:0.0, cost time:20
 numUpdates:216, size:420754, bytesPerUpdate:0.0, cost time:11
 numUpdates:346, size:420754, bytesPerUpdate:0.0, cost time:6
 numUpdates:554, size:488911, bytesPerUpdate:327.67788461538464, cost time:8
 numUpdates:887, size:2312259426, bytesPerUpdate:6942253.798798799, cost time:198
15/04/21 14:27:26 INFO collection.ExternalAppendOnlyMap: Thread 51 spilling in-memory map of 3.0 GB to disk (1 time so far)
15/04/21 14:27:26 INFO collection.ExternalAppendOnlyMap: /data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc
```
But in fact the file size is only 162K:
```
$ ll -h /data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc
-rw-r----- 1 spark users 162K Apr 21 14:27 /data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc
```

In order to test case, I change visitArray to:
```
       var size = 0l
         for (i <- 0 until length) {
          val obj = JArray.get(array, i)
          size += SizeEstimator.estimate(obj, state.visited).toLong
        }
       state.size += size
```
I get the following log:
```
...
14895 277016088 566.9046118590662 time:8470
23832 281840544 552.3308270676691 time:8031
38132 289891824 539.8294729775092 time:7897
61012 302803640 563.0265734265735 time:13044
97620 322904416 564.3276223776223 time:13554
15/04/14 11:46:43 INFO collection.ExternalAppendOnlyMap: Thread 51 spilling in-memory map of 314.5 MB to disk (1 time so far)
15/04/14 11:46:43 INFO collection.ExternalAppendOnlyMap: /data1/yarnenv/local/usercache/spark/appcache/application_1426746631567_8477/spark-local-20150414114020-2fcb/14/temp_local_5b6b98d5-5bfa-47e2-8216-059482ccbda0
```
 the file size is 85M.
```
$ ll -h /data1/yarnenv/local/usercache/spark/appcache/application_1426746631567_8477/spark- local-20150414114020-2fcb/14/
total 85M
-rw-r----- 1 spark users 85M Apr 14 11:46 temp_local_5b6b98d5-5bfa-47e2-8216-059482ccbda0
```

The following log is when I use this patch,
```
....
numUpdates:32, size:365484, bytesPerUpdate:0.0, cost time:7
numUpdates:52, size:365484, bytesPerUpdate:0.0, cost time:5
numUpdates:84, size:365484, bytesPerUpdate:0.0, cost time:5
numUpdates:135, size:372208, bytesPerUpdate:131.84313725490196, cost time:86
numUpdates:216, size:379020, bytesPerUpdate:84.09876543209876, cost time:21
numUpdates:346, size:1865208, bytesPerUpdate:11432.215384615385, cost time:23
numUpdates:554, size:2052380, bytesPerUpdate:899.8653846153846, cost time:16
numUpdates:887, size:2142820, bytesPerUpdate:271.59159159159157, cost time:15
..
numUpdates:14895, size:251675500, bytesPerUpdate:438.5263157894737, cost time:13
numUpdates:23832, size:257010268, bytesPerUpdate:596.9305135951662, cost time:14
numUpdates:38132, size:263922396, bytesPerUpdate:483.3655944055944, cost time:15
numUpdates:61012, size:268962596, bytesPerUpdate:220.28846153846155, cost time:24
numUpdates:97620, size:286980644, bytesPerUpdate:492.1888111888112, cost time:22
15/04/21 14:45:12 INFO collection.ExternalAppendOnlyMap: Thread 53 spilling in-memory map of 328.7 MB to disk (1 time so far)
15/04/21 14:45:12 INFO collection.ExternalAppendOnlyMap: /data4/yarnenv/local/usercache/spark/appcache/application_1426746631567_11758/spark-local-20150421144456-a2a5/2a/temp_local_9c109510-af16-4468-8f23-48cad04da88f
```
 the file size is 88M.
```
$ ll -h /data4/yarnenv/local/usercache/spark/appcache/application_1426746631567_11758/spark-local-20150421144456-a2a5/2a/
total 88M
-rw-r----- 1 spark users 88M Apr 21 14:45 temp_local_9c109510-af16-4468-8f23-48cad04da88f
```

Author: Hong Shen <hongshen@tencent.com>

Closes #5608 from shenh062326/my_change5 and squashes the following commits:

5506bae [Hong Shen] Fix compile error
c275dd3 [Hong Shen] Alter code style
fe202a2 [Hong Shen] Change the code style and add documentation.
a9fca84 [Hong Shen] Add test case for SizeEstimator
4877eee [Hong Shen] Improve estimate the size of a large array
a2ea7ac [Hong Shen] Alter code style
4c28e36 [Hong Shen] Improve estimate the size of a large array
parent b9de9e04
No related branches found
No related tags found
No related merge requests found
......@@ -179,7 +179,7 @@ private[spark] object SizeEstimator extends Logging {
}
// Estimate the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
private val ARRAY_SIZE_FOR_SAMPLING = 200
private val ARRAY_SIZE_FOR_SAMPLING = 400
private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
private def visitArray(array: AnyRef, arrayClass: Class[_], state: SearchState) {
......@@ -204,25 +204,40 @@ private[spark] object SizeEstimator extends Logging {
}
} else {
// Estimate the size of a large array by sampling elements without replacement.
var size = 0.0
// To exclude the shared objects that the array elements may link, sample twice
// and use the min one to caculate array size.
val rand = new Random(42)
val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE)
var numElementsDrawn = 0
while (numElementsDrawn < ARRAY_SAMPLE_SIZE) {
var index = 0
do {
index = rand.nextInt(length)
} while (drawn.contains(index))
drawn.add(index)
val elem = ScalaRunTime.array_apply(array, index).asInstanceOf[AnyRef]
size += SizeEstimator.estimate(elem, state.visited)
numElementsDrawn += 1
}
state.size += ((length / (ARRAY_SAMPLE_SIZE * 1.0)) * size).toLong
val drawn = new OpenHashSet[Int](2 * ARRAY_SAMPLE_SIZE)
val s1 = sampleArray(array, state, rand, drawn, length)
val s2 = sampleArray(array, state, rand, drawn, length)
val size = math.min(s1, s2)
state.size += math.max(s1, s2) +
(size * ((length - ARRAY_SAMPLE_SIZE) / (ARRAY_SAMPLE_SIZE))).toLong
}
}
}
private def sampleArray(
array: AnyRef,
state: SearchState,
rand: Random,
drawn: OpenHashSet[Int],
length: Int): Long = {
var size = 0L
for (i <- 0 until ARRAY_SAMPLE_SIZE) {
var index = 0
do {
index = rand.nextInt(length)
} while (drawn.contains(index))
drawn.add(index)
val obj = ScalaRunTime.array_apply(array, index).asInstanceOf[AnyRef]
if (obj != null) {
size += SizeEstimator.estimate(obj, state.visited).toLong
}
}
size
}
private def primitiveSize(cls: Class[_]): Long = {
if (cls == classOf[Byte]) {
BYTE_SIZE
......
......@@ -17,6 +17,8 @@
package org.apache.spark.util
import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite, PrivateMethodTester}
class DummyClass1 {}
......@@ -96,6 +98,22 @@ class SizeEstimatorSuite
// Past size 100, our samples 100 elements, but we should still get the right size.
assertResult(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))
val arr = new Array[Char](100000)
assertResult(200016)(SizeEstimator.estimate(arr))
assertResult(480032)(SizeEstimator.estimate(Array.fill(10000)(new DummyString(arr))))
val buf = new ArrayBuffer[DummyString]()
for (i <- 0 until 5000) {
buf.append(new DummyString(new Array[Char](10)))
}
assertResult(340016)(SizeEstimator.estimate(buf.toArray))
for (i <- 0 until 5000) {
buf.append(new DummyString(arr))
}
assertResult(683912)(SizeEstimator.estimate(buf.toArray))
// If an array contains the *same* element many times, we should only count it once.
val d1 = new DummyClass1
// 10 pointers plus 8-byte object
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment