Skip to content
Snippets Groups Projects
Commit c0622242 authored by zsxwing's avatar zsxwing Committed by Patrick Wendell
Browse files

[SPARK-4505][Core] Add a ClassTag parameter to CompactBuffer[T]

Added a ClassTag parameter to CompactBuffer. So CompactBuffer[T] can create primitive arrays for primitive types. It will reduce the memory usage for primitive types significantly and only pay minor performance lost.

Here is my test code:
```Scala
  // Call org.apache.spark.util.SizeEstimator.estimate
  def estimateSize(obj: AnyRef): Long = {
    val c = Class.forName("org.apache.spark.util.SizeEstimator$")
    val f = c.getField("MODULE$")
    val o = f.get(c)
    val m = c.getMethod("estimate", classOf[Object])
    m.setAccessible(true)
    m.invoke(o, obj).asInstanceOf[Long]
  }

  sc.parallelize(1 to 10000).groupBy(_ => 1).foreach {
    case (k, v) =>
      println(v.getClass() + " size: " + estimateSize(v))
  }
```

Using the previous CompactBuffer outputed
```
class org.apache.spark.util.collection.CompactBuffer size: 313358
```

Using the new CompactBuffer outputed
```
class org.apache.spark.util.collection.CompactBuffer size: 65712
```

In this case, the new `CompactBuffer` only used 20% memory of the previous one. It's really helpful for `groupByKey` when using a primitive value.

Author: zsxwing <zsxwing@gmail.com>

Closes #3378 from zsxwing/SPARK-4505 and squashes the following commits:

4abdbba [zsxwing] Add a ClassTag parameter to reduce the memory usage of CompactBuffer[T] when T is a primitive type
parent 938dc141
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package org.apache.spark.util.collection package org.apache.spark.util.collection
import scala.reflect.ClassTag
/** /**
* An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers. * An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
* ArrayBuffer always allocates an Object array to store the data, with 16 entries by default, * ArrayBuffer always allocates an Object array to store the data, with 16 entries by default,
...@@ -25,7 +27,7 @@ package org.apache.spark.util.collection ...@@ -25,7 +27,7 @@ package org.apache.spark.util.collection
* entries than that. This makes it more efficient for operations like groupBy where we expect * entries than that. This makes it more efficient for operations like groupBy where we expect
* some keys to have very few elements. * some keys to have very few elements.
*/ */
private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable {
// First two elements // First two elements
private var element0: T = _ private var element0: T = _
private var element1: T = _ private var element1: T = _
...@@ -34,7 +36,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { ...@@ -34,7 +36,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
private var curSize = 0 private var curSize = 0
// Array for extra elements // Array for extra elements
private var otherElements: Array[AnyRef] = null private var otherElements: Array[T] = null
def apply(position: Int): T = { def apply(position: Int): T = {
if (position < 0 || position >= curSize) { if (position < 0 || position >= curSize) {
...@@ -45,7 +47,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { ...@@ -45,7 +47,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
} else if (position == 1) { } else if (position == 1) {
element1 element1
} else { } else {
otherElements(position - 2).asInstanceOf[T] otherElements(position - 2)
} }
} }
...@@ -58,7 +60,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { ...@@ -58,7 +60,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
} else if (position == 1) { } else if (position == 1) {
element1 = value element1 = value
} else { } else {
otherElements(position - 2) = value.asInstanceOf[AnyRef] otherElements(position - 2) = value
} }
} }
...@@ -72,7 +74,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { ...@@ -72,7 +74,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
curSize = 2 curSize = 2
} else { } else {
growToSize(curSize + 1) growToSize(curSize + 1)
otherElements(newIndex - 2) = value.asInstanceOf[AnyRef] otherElements(newIndex - 2) = value
} }
this this
} }
...@@ -139,7 +141,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { ...@@ -139,7 +141,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
newArrayLen = Int.MaxValue - 2 newArrayLen = Int.MaxValue - 2
} }
} }
val newArray = new Array[AnyRef](newArrayLen) val newArray = new Array[T](newArrayLen)
if (otherElements != null) { if (otherElements != null) {
System.arraycopy(otherElements, 0, newArray, 0, otherElements.length) System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
} }
...@@ -150,9 +152,9 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { ...@@ -150,9 +152,9 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
} }
private[spark] object CompactBuffer { private[spark] object CompactBuffer {
def apply[T](): CompactBuffer[T] = new CompactBuffer[T] def apply[T: ClassTag](): CompactBuffer[T] = new CompactBuffer[T]
def apply[T](value: T): CompactBuffer[T] = { def apply[T: ClassTag](value: T): CompactBuffer[T] = {
val buf = new CompactBuffer[T] val buf = new CompactBuffer[T]
buf += value buf += value
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment