diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 05080835fc4ad5c87479a7482b9d411610b00a35..80edda59e17192537a4c982bbbf18b119e04d544 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.streaming.kinesis
 
 import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -124,8 +125,7 @@ private[kinesis] class KinesisReceiver[T](
   private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange]
 
   /** Sequence number ranges of data added to each generated block */
-  private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, SequenceNumberRanges]
-    with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
+  private val blockIdToSeqNumRanges = new ConcurrentHashMap[StreamBlockId, SequenceNumberRanges]
 
   /**
    * The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval.
@@ -135,8 +135,8 @@ private[kinesis] class KinesisReceiver[T](
   /**
    * Latest sequence number ranges that have been stored successfully.
    * This is used for checkpointing through KCL */
-  private val shardIdToLatestStoredSeqNum = new mutable.HashMap[String, String]
-    with mutable.SynchronizedMap[String, String]
+  private val shardIdToLatestStoredSeqNum = new ConcurrentHashMap[String, String]
+
   /**
    * This is called when the KinesisReceiver starts and must be non-blocking.
    * The KCL creates and manages the receiving/processing thread pool through Worker.run().
@@ -222,7 +222,7 @@ private[kinesis] class KinesisReceiver[T](
 
   /** Get the latest sequence number for the given shard that can be checkpointed through KCL */
   private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
-    shardIdToLatestStoredSeqNum.get(shardId)
+    Option(shardIdToLatestStoredSeqNum.get(shardId))
   }
 
   /**
@@ -257,7 +257,7 @@ private[kinesis] class KinesisReceiver[T](
    * for next block. Internally, this is synchronized with `rememberAddedRange()`.
    */
   private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
-    blockIdToSeqNumRanges(blockId) = SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray)
+    blockIdToSeqNumRanges.put(blockId, SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray))
     seqNumRangesInCurrentBlock.clear()
     logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
   }
@@ -265,7 +265,7 @@ private[kinesis] class KinesisReceiver[T](
   /** Store the block along with its associated ranges */
   private def storeBlockWithRanges(
       blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = {
-    val rangesToReportOption = blockIdToSeqNumRanges.remove(blockId)
+    val rangesToReportOption = Option(blockIdToSeqNumRanges.remove(blockId))
     if (rangesToReportOption.isEmpty) {
       stop("Error while storing block into Spark, could not find sequence number ranges " +
         s"for block $blockId")
@@ -294,7 +294,7 @@ private[kinesis] class KinesisReceiver[T](
     // Note that we are doing this sequentially because the array of sequence number ranges
     // is assumed to be
     rangesToReport.ranges.foreach { range =>
-      shardIdToLatestStoredSeqNum(range.shardId) = range.toSeqNumber
+      shardIdToLatestStoredSeqNum.put(range.shardId, range.toSeqNumber)
     }
   }