diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 1efce124c0a6bdd8c2ffecef76d02500463b4b05..b2e9a97129f08dd4f687fe51dfc21af817d00738 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -122,8 +122,7 @@ private[spark] class CompressedMapStatus(
 
 /**
  * A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
- * plus a bitmap for tracking which blocks are empty.  During serialization, this bitmap
- * is compressed.
+ * plus a bitmap for tracking which blocks are empty.
  *
  * @param loc location where the task is being executed
  * @param numNonEmptyBlocks the number of non-empty blocks
@@ -194,6 +193,8 @@ private[spark] object HighlyCompressedMapStatus {
     } else {
       0
     }
+    emptyBlocks.trim()
+    emptyBlocks.runOptimize()
     new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 1bcb3175a3016ba57a4706d593324384a82531a5..d5ba690ed04bee22110e23d3c369f3bba654fe05 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.serializer
 
-import java.io.{EOFException, IOException, InputStream, OutputStream, DataInput, DataOutput}
+import java.io.{DataInput, DataOutput, EOFException, IOException, InputStream, OutputStream}
 import java.nio.ByteBuffer
 import javax.annotation.Nullable
 
@@ -25,9 +25,9 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
-import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
 import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
+import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
 import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
 import org.apache.avro.generic.{GenericData, GenericRecord}
 import org.roaringbitmap.RoaringBitmap
@@ -38,8 +38,8 @@ import org.apache.spark.broadcast.HttpBroadcast
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
 import org.apache.spark.storage._
-import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
 import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
 
 /**
  * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index b8e466fab4506b60bbff782cc6b100f7dd23afa0..15c8de61b82405cfba8f03795cd2a67eec133f31 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.storage.BlockManagerId
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.serializer.JavaSerializer
+import org.roaringbitmap.RoaringBitmap
 
 import scala.util.Random
 
@@ -97,4 +98,34 @@ class MapStatusSuite extends SparkFunSuite {
     val buf = ser.newInstance().serialize(status)
     ser.newInstance().deserialize[MapStatus](buf)
   }
+
+  test("RoaringBitmap: runOptimize succeeded") {
+    val r = new RoaringBitmap
+    (1 to 200000).foreach(i =>
+      if (i % 200 != 0) {
+        r.add(i)
+      }
+    )
+    val size1 = r.getSizeInBytes
+    val success = r.runOptimize()
+    r.trim()
+    val size2 = r.getSizeInBytes
+    assert(size1 > size2)
+    assert(success)
+  }
+
+  test("RoaringBitmap: runOptimize failed") {
+    val r = new RoaringBitmap
+    (1 to 200000).foreach(i =>
+      if (i % 200 == 0) {
+        r.add(i)
+      }
+    )
+    val size1 = r.getSizeInBytes
+    val success = r.runOptimize()
+    r.trim()
+    val size2 = r.getSizeInBytes
+    assert(size1 === size2)
+    assert(!success)
+  }
 }
diff --git a/pom.xml b/pom.xml
index 2a8a445057174e672763b416cc00c5dfd65786ab..940e2d8740bf16d9da6e153cbc74a64f4eb7e4d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -637,7 +637,7 @@
       <dependency>
         <groupId>org.roaringbitmap</groupId>
         <artifactId>RoaringBitmap</artifactId>
-        <version>0.4.5</version>
+        <version>0.5.11</version>
       </dependency>
       <dependency>
         <groupId>commons-net</groupId>