diff --git a/core/src/main/scala/spark/SizeEstimator.scala b/core/src/main/scala/spark/SizeEstimator.scala index b3bd4daa739cabc7b9dd77012141d1b0ff203d32..e5ad8b52dc14efc3d0092722e66a1860c89cd56b 100644 --- a/core/src/main/scala/spark/SizeEstimator.scala +++ b/core/src/main/scala/spark/SizeEstimator.scala @@ -7,6 +7,10 @@ import java.util.IdentityHashMap import java.util.concurrent.ConcurrentHashMap import java.util.Random +import javax.management.MBeanServer +import java.lang.management.ManagementFactory +import com.sun.management.HotSpotDiagnosticMXBean + import scala.collection.mutable.ArrayBuffer import it.unimi.dsi.fastutil.ints.IntOpenHashSet @@ -18,9 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet * Based on the following JavaWorld article: * http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html */ -object SizeEstimator { - private val OBJECT_SIZE = 8 // Minimum size of a java.lang.Object - private val POINTER_SIZE = 4 // Size of an object reference +object SizeEstimator extends Logging { // Sizes of primitive types private val BYTE_SIZE = 1 @@ -32,9 +34,68 @@ object SizeEstimator { private val FLOAT_SIZE = 4 private val DOUBLE_SIZE = 8 + // Alignment boundary for objects + // TODO: Is this arch dependent ? + private val ALIGN_SIZE = 8 + // A cache of ClassInfo objects for each class private val classInfos = new ConcurrentHashMap[Class[_], ClassInfo] - classInfos.put(classOf[Object], new ClassInfo(OBJECT_SIZE, Nil)) + + // Object and pointer sizes are arch dependent + private var is64bit = false + + // Size of an object reference + // Based on https://wikis.oracle.com/display/HotSpotInternals/CompressedOops + private var isCompressedOops = false + private var pointerSize = 4 + + // Minimum size of a java.lang.Object + private var objectSize = 8 + + initialize() + + // Sets object size, pointer size based on architecture and CompressedOops settings + // from the JVM. + private def initialize() { + is64bit = System.getProperty("os.arch").contains("64") + isCompressedOops = getIsCompressedOops + + objectSize = if (!is64bit) 8 else { + if(!isCompressedOops) { + 16 + } else { + 12 + } + } + pointerSize = if (is64bit && !isCompressedOops) 8 else 4 + classInfos.clear() + classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil)) + } + + private def getIsCompressedOops : Boolean = { + if (System.getProperty("spark.test.useCompressedOops") != null) { + return System.getProperty("spark.test.useCompressedOops").toBoolean + } + try { + val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic"; + val server = ManagementFactory.getPlatformMBeanServer(); + val bean = ManagementFactory.newPlatformMXBeanProxy(server, + hotSpotMBeanName, classOf[HotSpotDiagnosticMXBean]); + return bean.getVMOption("UseCompressedOops").getValue.toBoolean + } catch { + case e: IllegalArgumentException => { + logWarning("Exception while trying to check if compressed oops is enabled", e) + // Fall back to checking if maxMemory < 32GB + return Runtime.getRuntime.maxMemory < (32L*1024*1024*1024) + } + + case e: SecurityException => { + logWarning("No permission to create MBeanServer", e) + // Fall back to checking if maxMemory < 32GB + return Runtime.getRuntime.maxMemory < (32L*1024*1024*1024) + } + } + } /** * The state of an ongoing size estimation. Contains a stack of objects to visit as well as an @@ -101,10 +162,17 @@ object SizeEstimator { private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) { val length = JArray.getLength(array) val elementClass = cls.getComponentType + + // Arrays have object header and length field which is an integer + var arrSize: Long = alignSize(objectSize + INT_SIZE) + if (elementClass.isPrimitive) { - state.size += length * primitiveSize(elementClass) + arrSize += alignSize(length * primitiveSize(elementClass)) + state.size += arrSize } else { - state.size += length * POINTER_SIZE + arrSize += alignSize(length * pointerSize) + state.size += arrSize + if (length <= ARRAY_SIZE_FOR_SAMPLING) { for (i <- 0 until length) { state.enqueue(JArray.get(array, i)) @@ -170,15 +238,22 @@ object SizeEstimator { shellSize += primitiveSize(fieldClass) } else { field.setAccessible(true) // Enable future get()'s on this field - shellSize += POINTER_SIZE + shellSize += pointerSize pointerFields = field :: pointerFields } } } + shellSize = alignSize(shellSize) + // Create and cache a new ClassInfo val newInfo = new ClassInfo(shellSize, pointerFields) classInfos.put(cls, newInfo) return newInfo } + + private def alignSize(size: Long): Long = { + val rem = size % ALIGN_SIZE + return if (rem == 0) size else (size + ALIGN_SIZE - rem) + } } diff --git a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala index 024ce0b8d19cf2de6b9e2448329f5dd17278afc3..dff2970566fbea3e397330c127806754b02a552e 100644 --- a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala +++ b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala @@ -1,31 +1,50 @@ package spark import org.scalatest.FunSuite +import org.scalatest.PrivateMethodTester -class BoundedMemoryCacheSuite extends FunSuite { +class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester { test("constructor test") { - val cache = new BoundedMemoryCache(40) - expect(40)(cache.getCapacity) + val cache = new BoundedMemoryCache(60) + expect(60)(cache.getCapacity) } test("caching") { - val cache = new BoundedMemoryCache(40) { + // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case + val oldArch = System.setProperty("os.arch", "amd64") + val oldOops = System.setProperty("spark.test.useCompressedOops", "true") + val initialize = PrivateMethod[Unit]('initialize) + SizeEstimator invokePrivate initialize() + + val cache = new BoundedMemoryCache(60) { //TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry' override protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) { logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size)) } } //should be OK - expect(CachePutSuccess(30))(cache.put("1", 0, "Meh")) + expect(CachePutSuccess(56))(cache.put("1", 0, "Meh")) //we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from //cache because it's from the same dataset expect(CachePutFailure())(cache.put("1", 1, "Meh")) //should be OK, dataset '1' can be evicted from cache - expect(CachePutSuccess(30))(cache.put("2", 0, "Meh")) + expect(CachePutSuccess(56))(cache.put("2", 0, "Meh")) //should fail, cache should obey it's capacity expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string")) + + if (oldArch != null) { + System.setProperty("os.arch", oldArch) + } else { + System.clearProperty("os.arch") + } + + if (oldOops != null) { + System.setProperty("spark.test.useCompressedOops", oldOops) + } else { + System.clearProperty("spark.test.useCompressedOops") + } } } diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala index 63bc951858d0b308e0a13af2f3eecaa36d4f752e..a2015644eea4b606ddfeac613ef543fafae88fd6 100644 --- a/core/src/test/scala/spark/SizeEstimatorSuite.scala +++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala @@ -1,6 +1,8 @@ package spark import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfterAll +import org.scalatest.PrivateMethodTester class DummyClass1 {} @@ -17,61 +19,114 @@ class DummyClass4(val d: DummyClass3) { val x: Int = 0 } -class SizeEstimatorSuite extends FunSuite { +class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMethodTester { + var oldArch: String = _ + var oldOops: String = _ + + override def beforeAll() { + // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case + oldArch = System.setProperty("os.arch", "amd64") + oldOops = System.setProperty("spark.test.useCompressedOops", "true") + } + + override def afterAll() { + resetOrClear("os.arch", oldArch) + resetOrClear("spark.test.useCompressedOops", oldOops) + } + test("simple classes") { - expect(8)(SizeEstimator.estimate(new DummyClass1)) - expect(12)(SizeEstimator.estimate(new DummyClass2)) - expect(20)(SizeEstimator.estimate(new DummyClass3)) - expect(16)(SizeEstimator.estimate(new DummyClass4(null))) - expect(36)(SizeEstimator.estimate(new DummyClass4(new DummyClass3))) + expect(16)(SizeEstimator.estimate(new DummyClass1)) + expect(16)(SizeEstimator.estimate(new DummyClass2)) + expect(24)(SizeEstimator.estimate(new DummyClass3)) + expect(24)(SizeEstimator.estimate(new DummyClass4(null))) + expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3))) } test("strings") { - expect(24)(SizeEstimator.estimate("")) - expect(26)(SizeEstimator.estimate("a")) - expect(28)(SizeEstimator.estimate("ab")) - expect(40)(SizeEstimator.estimate("abcdefgh")) + expect(48)(SizeEstimator.estimate("")) + expect(56)(SizeEstimator.estimate("a")) + expect(56)(SizeEstimator.estimate("ab")) + expect(64)(SizeEstimator.estimate("abcdefgh")) } test("primitive arrays") { - expect(10)(SizeEstimator.estimate(new Array[Byte](10))) - expect(20)(SizeEstimator.estimate(new Array[Char](10))) - expect(20)(SizeEstimator.estimate(new Array[Short](10))) - expect(40)(SizeEstimator.estimate(new Array[Int](10))) - expect(80)(SizeEstimator.estimate(new Array[Long](10))) - expect(40)(SizeEstimator.estimate(new Array[Float](10))) - expect(80)(SizeEstimator.estimate(new Array[Double](10))) - expect(4000)(SizeEstimator.estimate(new Array[Int](1000))) - expect(8000)(SizeEstimator.estimate(new Array[Long](1000))) + expect(32)(SizeEstimator.estimate(new Array[Byte](10))) + expect(40)(SizeEstimator.estimate(new Array[Char](10))) + expect(40)(SizeEstimator.estimate(new Array[Short](10))) + expect(56)(SizeEstimator.estimate(new Array[Int](10))) + expect(96)(SizeEstimator.estimate(new Array[Long](10))) + expect(56)(SizeEstimator.estimate(new Array[Float](10))) + expect(96)(SizeEstimator.estimate(new Array[Double](10))) + expect(4016)(SizeEstimator.estimate(new Array[Int](1000))) + expect(8016)(SizeEstimator.estimate(new Array[Long](1000))) } test("object arrays") { // Arrays containing nulls should just have one pointer per element - expect(40)(SizeEstimator.estimate(new Array[String](10))) - expect(40)(SizeEstimator.estimate(new Array[AnyRef](10))) + expect(56)(SizeEstimator.estimate(new Array[String](10))) + expect(56)(SizeEstimator.estimate(new Array[AnyRef](10))) // For object arrays with non-null elements, each object should take one pointer plus // however many bytes that class takes. (Note that Array.fill calls the code in its // second parameter separately for each object, so we get distinct objects.) - expect(120)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1))) - expect(160)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2))) - expect(240)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3))) - expect(12 + 16)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2))) + expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1))) + expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2))) + expect(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3))) + expect(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2))) // Past size 100, our samples 100 elements, but we should still get the right size. - expect(24000)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3))) + expect(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3))) // If an array contains the *same* element many times, we should only count it once. val d1 = new DummyClass1 - expect(48)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object - expect(408)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object + expect(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object + expect(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object // Same thing with huge array containing the same element many times. Note that this won't - // return exactly 4008 because it can't tell that *all* the elements will equal the first + // return exactly 4032 because it can't tell that *all* the elements will equal the first // one it samples, but it should be close to that. + + // TODO: If we sample 100 elements, this should always be 4176 ? val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1)) assert(estimatedSize >= 4000, "Estimated size " + estimatedSize + " should be more than 4000") - assert(estimatedSize <= 4100, "Estimated size " + estimatedSize + " should be less than 4100") + assert(estimatedSize <= 4200, "Estimated size " + estimatedSize + " should be less than 4100") } -} + test("32-bit arch") { + val arch = System.setProperty("os.arch", "x86") + + val initialize = PrivateMethod[Unit]('initialize) + SizeEstimator invokePrivate initialize() + + expect(40)(SizeEstimator.estimate("")) + expect(48)(SizeEstimator.estimate("a")) + expect(48)(SizeEstimator.estimate("ab")) + expect(56)(SizeEstimator.estimate("abcdefgh")) + + resetOrClear("os.arch", arch) + } + + test("64-bit arch with no compressed oops") { + val arch = System.setProperty("os.arch", "amd64") + val oops = System.setProperty("spark.test.useCompressedOops", "false") + + val initialize = PrivateMethod[Unit]('initialize) + SizeEstimator invokePrivate initialize() + + expect(64)(SizeEstimator.estimate("")) + expect(72)(SizeEstimator.estimate("a")) + expect(72)(SizeEstimator.estimate("ab")) + expect(80)(SizeEstimator.estimate("abcdefgh")) + + resetOrClear("os.arch", arch) + resetOrClear("spark.test.useCompressedOops", oops) + } + + def resetOrClear(prop: String, oldValue: String) { + if (oldValue != null) { + System.setProperty(prop, oldValue) + } else { + System.clearProperty(prop) + } + } +} diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 61decd81e6e1ffa2e70f6819d3beb69fff6ccdce..f3f891e47143aa9d6baae0ddcbe5182e26a0f68e 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -6,17 +6,27 @@ import akka.actor._ import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter +import org.scalatest.PrivateMethodTester import spark.KryoSerializer +import spark.SizeEstimator import spark.util.ByteBufferInputStream -class BlockManagerSuite extends FunSuite with BeforeAndAfter { +class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { var actorSystem: ActorSystem = null var master: BlockManagerMaster = null + var oldArch: String = _ + var oldOops: String = _ before { actorSystem = ActorSystem("test") master = new BlockManagerMaster(actorSystem, true, true) + + // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case + oldArch = System.setProperty("os.arch", "amd64") + oldOops = System.setProperty("spark.test.useCompressedOops", "true") + val initialize = PrivateMethod[Unit]('initialize) + SizeEstimator invokePrivate initialize() } after { @@ -24,6 +34,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { actorSystem.awaitTermination() actorSystem = null master = null + + if (oldArch != null) { + System.setProperty("os.arch", oldArch) + } else { + System.clearProperty("os.arch") + } + + if (oldOops != null) { + System.setProperty("spark.test.useCompressedOops", oldOops) + } else { + System.clearProperty("spark.test.useCompressedOops") + } } test("manager-master interaction") { @@ -57,7 +79,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("in-memory LRU storage") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -78,7 +100,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("in-memory LRU storage with serialization") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -99,7 +121,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("on-disk storage") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -112,7 +134,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("disk and memory storage") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -126,7 +148,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("disk and memory storage with serialization") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -140,7 +162,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("LRU with mixed storage levels") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -166,7 +188,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("in-memory LRU with streams") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -192,7 +214,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("LRU with mixed storage levels and streams") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200))