diff --git a/core/pom.xml b/core/pom.xml
index 7e1205a076f26b04afac351c6a1caacf025d3347..37e3f168ab3748798fc687208c5b99b3cc775fb2 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -177,6 +177,10 @@
       <groupId>net.jpountz.lz4</groupId>
       <artifactId>lz4</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.roaringbitmap</groupId>
+      <artifactId>RoaringBitmap</artifactId>
+    </dependency>
     <dependency>
       <groupId>commons-net</groupId>
       <artifactId>commons-net</artifactId>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 180c8d1827e13c2223b0b92f60ac8dae23bc1101..1efce124c0a6bdd8c2ffecef76d02500463b4b05 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,8 +19,9 @@ package org.apache.spark.scheduler
 
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
+import org.roaringbitmap.RoaringBitmap
+
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.collection.BitSet
 import org.apache.spark.util.Utils
 
 /**
@@ -132,7 +133,7 @@ private[spark] class CompressedMapStatus(
 private[spark] class HighlyCompressedMapStatus private (
     private[this] var loc: BlockManagerId,
     private[this] var numNonEmptyBlocks: Int,
-    private[this] var emptyBlocks: BitSet,
+    private[this] var emptyBlocks: RoaringBitmap,
     private[this] var avgSize: Long)
   extends MapStatus with Externalizable {
 
@@ -145,7 +146,7 @@ private[spark] class HighlyCompressedMapStatus private (
   override def location: BlockManagerId = loc
 
   override def getSizeForBlock(reduceId: Int): Long = {
-    if (emptyBlocks.get(reduceId)) {
+    if (emptyBlocks.contains(reduceId)) {
       0
     } else {
       avgSize
@@ -160,7 +161,7 @@ private[spark] class HighlyCompressedMapStatus private (
 
   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     loc = BlockManagerId(in)
-    emptyBlocks = new BitSet
+    emptyBlocks = new RoaringBitmap()
     emptyBlocks.readExternal(in)
     avgSize = in.readLong()
   }
@@ -176,15 +177,15 @@ private[spark] object HighlyCompressedMapStatus {
     // From a compression standpoint, it shouldn't matter whether we track empty or non-empty
     // blocks. From a performance standpoint, we benefit from tracking empty blocks because
     // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
+    val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val emptyBlocks = new BitSet(totalNumBlocks)
     while (i < totalNumBlocks) {
       var size = uncompressedSizes(i)
       if (size > 0) {
         numNonEmptyBlocks += 1
         totalSize += size
       } else {
-        emptyBlocks.set(i)
+        emptyBlocks.add(i)
       }
       i += 1
     }
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index bc51d4f2820c842f388ad6bc484718b249a0e499..c5195c1143a8ffbae86223c6d7c6bfc93a1667d0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -30,6 +30,7 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
 import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
 import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
 import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}
 
 import org.apache.spark._
 import org.apache.spark.api.python.PythonBroadcast
@@ -38,7 +39,7 @@ import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
 import org.apache.spark.storage._
 import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
-import org.apache.spark.util.collection.{BitSet, CompactBuffer}
+import org.apache.spark.util.collection.CompactBuffer
 
 /**
  * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
@@ -362,7 +363,12 @@ private[serializer] object KryoSerializer {
     classOf[StorageLevel],
     classOf[CompressedMapStatus],
     classOf[HighlyCompressedMapStatus],
-    classOf[BitSet],
+    classOf[RoaringBitmap],
+    classOf[RoaringArray],
+    classOf[RoaringArray.Element],
+    classOf[Array[RoaringArray.Element]],
+    classOf[ArrayContainer],
+    classOf[BitmapContainer],
     classOf[CompactBuffer[_]],
     classOf[BlockManagerId],
     classOf[Array[Byte]],
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index 85c5bdbfcebc000dbb7d414c2e0ffc3be0e27e4f..7ab67fc3a2de95e2f6e31e8b5484a090d2889320 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -17,21 +17,14 @@
 
 package org.apache.spark.util.collection
 
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
-
-import org.apache.spark.util.{Utils => UUtils}
-
-
 /**
  * A simple, fixed-size bit set implementation. This implementation is fast because it avoids
  * safety/bound checking.
  */
-class BitSet(private[this] var numBits: Int) extends Externalizable {
+class BitSet(numBits: Int) extends Serializable {
 
-  private var words = new Array[Long](bit2words(numBits))
-  private def numWords = words.length
-
-  def this() = this(0)
+  private val words = new Array[Long](bit2words(numBits))
+  private val numWords = words.length
 
   /**
    * Compute the capacity (number of bits) that can be represented
@@ -237,19 +230,4 @@ class BitSet(private[this] var numBits: Int) extends Externalizable {
 
   /** Return the number of longs it would take to hold numBits. */
   private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1
-
-  override def writeExternal(out: ObjectOutput): Unit = UUtils.tryOrIOException {
-    out.writeInt(numBits)
-    words.foreach(out.writeLong(_))
-  }
-
-  override def readExternal(in: ObjectInput): Unit = UUtils.tryOrIOException {
-    numBits = in.readInt()
-    words = new Array[Long](bit2words(numBits))
-    var index = 0
-    while (index < words.length) {
-      words(index) = in.readLong()
-      index += 1
-    }
-  }
 }
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index afe2e80358ca04ec641b6c123e109eef3b2bfb0b..e428414cf6e852bace7f5cd5c673d0ff46a3420c 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -322,6 +322,12 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     val conf = new SparkConf(false)
     conf.set("spark.kryo.registrationRequired", "true")
 
+    // these cases require knowing the internals of RoaringBitmap a little.  Blocks span 2^16
+    // values, and they use a bitmap (dense) if they have more than 4096 values, and an
+    // array (sparse) if they use less.  So we just create two cases, one sparse and one dense.
+    // and we use a roaring bitmap for the empty blocks, so we trigger the dense case w/ mostly
+    // empty blocks
+
     val ser = new KryoSerializer(conf).newInstance()
     val denseBlockSizes = new Array[Long](5000)
     val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
index b0db0988eeaabde4367607579397b85f1abf85ec..69dbfa9cd714163700175bb3ae9bdd9245cc7e23 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
@@ -17,10 +17,7 @@
 
 package org.apache.spark.util.collection
 
-import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
-
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.util.{Utils => UUtils}
 
 class BitSetSuite extends SparkFunSuite {
 
@@ -155,50 +152,4 @@ class BitSetSuite extends SparkFunSuite {
     assert(bitsetDiff.nextSetBit(85) === 85)
     assert(bitsetDiff.nextSetBit(86) === -1)
   }
-
-  test("read and write externally") {
-    val tempDir = UUtils.createTempDir()
-    val outputFile = File.createTempFile("bits", null, tempDir)
-
-    val fos = new FileOutputStream(outputFile)
-    val oos = new ObjectOutputStream(fos)
-
-    // Create BitSet
-    val setBits = Seq(0, 9, 1, 10, 90, 96)
-    val bitset = new BitSet(100)
-
-    for (i <- 0 until 100) {
-      assert(!bitset.get(i))
-    }
-
-    setBits.foreach(i => bitset.set(i))
-
-    for (i <- 0 until 100) {
-      if (setBits.contains(i)) {
-        assert(bitset.get(i))
-      } else {
-        assert(!bitset.get(i))
-      }
-    }
-    assert(bitset.cardinality() === setBits.size)
-
-    bitset.writeExternal(oos)
-    oos.close()
-
-    val fis = new FileInputStream(outputFile)
-    val ois = new ObjectInputStream(fis)
-
-    // Read BitSet from the file
-    val bitset2 = new BitSet(0)
-    bitset2.readExternal(ois)
-
-    for (i <- 0 until 100) {
-      if (setBits.contains(i)) {
-        assert(bitset2.get(i))
-      } else {
-        assert(!bitset2.get(i))
-      }
-    }
-    assert(bitset2.cardinality() === setBits.size)
-  }
 }
diff --git a/pom.xml b/pom.xml
index 01afa80617891d5965c50df7fc26638e40e4b22e..2a8a445057174e672763b416cc00c5dfd65786ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -634,6 +634,11 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.roaringbitmap</groupId>
+        <artifactId>RoaringBitmap</artifactId>
+        <version>0.4.5</version>
+      </dependency>
       <dependency>
         <groupId>commons-net</groupId>
         <artifactId>commons-net</artifactId>