Skip to content
Snippets Groups Projects
Commit e33053ee authored by Kent Yao's avatar Kent Yao Committed by Davies Liu
Browse files

[SPARK-11583] [CORE] MapStatus Using RoaringBitmap More Properly

This PR upgrade the version of RoaringBitmap to 0.5.10, to optimize the memory layout, will be much smaller when most of blocks are empty.

This PR is based on #9661 (fix conflicts), see all of the comments at https://github.com/apache/spark/pull/9661 .

Author: Kent Yao <yaooqinn@hotmail.com>
Author: Davies Liu <davies@databricks.com>
Author: Charles Allen <charles@allen-net.com>

Closes #9746 from davies/roaring_mapstatus.
parent bf25f9bd
No related branches found
No related tags found
No related merge requests found
......@@ -122,8 +122,7 @@ private[spark] class CompressedMapStatus(
/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
* is compressed.
* plus a bitmap for tracking which blocks are empty.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
......@@ -194,6 +193,8 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
emptyBlocks.trim()
emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
}
}
......@@ -17,7 +17,7 @@
package org.apache.spark.serializer
import java.io.{EOFException, IOException, InputStream, OutputStream, DataInput, DataOutput}
import java.io.{DataInput, DataOutput, EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable
......@@ -25,9 +25,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.roaringbitmap.RoaringBitmap
......@@ -38,8 +38,8 @@ import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
......
......@@ -21,6 +21,7 @@ import org.apache.spark.storage.BlockManagerId
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.JavaSerializer
import org.roaringbitmap.RoaringBitmap
import scala.util.Random
......@@ -97,4 +98,34 @@ class MapStatusSuite extends SparkFunSuite {
val buf = ser.newInstance().serialize(status)
ser.newInstance().deserialize[MapStatus](buf)
}
test("RoaringBitmap: runOptimize succeeded") {
val r = new RoaringBitmap
(1 to 200000).foreach(i =>
if (i % 200 != 0) {
r.add(i)
}
)
val size1 = r.getSizeInBytes
val success = r.runOptimize()
r.trim()
val size2 = r.getSizeInBytes
assert(size1 > size2)
assert(success)
}
test("RoaringBitmap: runOptimize failed") {
val r = new RoaringBitmap
(1 to 200000).foreach(i =>
if (i % 200 == 0) {
r.add(i)
}
)
val size1 = r.getSizeInBytes
val success = r.runOptimize()
r.trim()
val size2 = r.getSizeInBytes
assert(size1 === size2)
assert(!success)
}
}
......@@ -637,7 +637,7 @@
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.4.5</version>
<version>0.5.11</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment