Skip to content
Snippets Groups Projects
Commit 03138b67 authored by Jean-Baptiste Onofré's avatar Jean-Baptiste Onofré Committed by Sean Owen
Browse files

[SPARK-11193] Use Java ConcurrentHashMap instead of SynchronizedMap trait in...

[SPARK-11193] Use Java ConcurrentHashMap instead of SynchronizedMap trait in order to avoid ClassCastException due to KryoSerializer in KinesisReceiver

Author: Jean-Baptiste Onofré <jbonofre@apache.org>

Closes #10203 from jbonofre/SPARK-11193.
parent 1e3526c2
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.spark.streaming.kinesis package org.apache.spark.streaming.kinesis
import java.util.UUID import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
...@@ -124,8 +125,7 @@ private[kinesis] class KinesisReceiver[T]( ...@@ -124,8 +125,7 @@ private[kinesis] class KinesisReceiver[T](
private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange] private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange]
/** Sequence number ranges of data added to each generated block */ /** Sequence number ranges of data added to each generated block */
private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, SequenceNumberRanges] private val blockIdToSeqNumRanges = new ConcurrentHashMap[StreamBlockId, SequenceNumberRanges]
with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
/** /**
* The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval. * The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval.
...@@ -135,8 +135,8 @@ private[kinesis] class KinesisReceiver[T]( ...@@ -135,8 +135,8 @@ private[kinesis] class KinesisReceiver[T](
/** /**
* Latest sequence number ranges that have been stored successfully. * Latest sequence number ranges that have been stored successfully.
* This is used for checkpointing through KCL */ * This is used for checkpointing through KCL */
private val shardIdToLatestStoredSeqNum = new mutable.HashMap[String, String] private val shardIdToLatestStoredSeqNum = new ConcurrentHashMap[String, String]
with mutable.SynchronizedMap[String, String]
/** /**
* This is called when the KinesisReceiver starts and must be non-blocking. * This is called when the KinesisReceiver starts and must be non-blocking.
* The KCL creates and manages the receiving/processing thread pool through Worker.run(). * The KCL creates and manages the receiving/processing thread pool through Worker.run().
...@@ -222,7 +222,7 @@ private[kinesis] class KinesisReceiver[T]( ...@@ -222,7 +222,7 @@ private[kinesis] class KinesisReceiver[T](
/** Get the latest sequence number for the given shard that can be checkpointed through KCL */ /** Get the latest sequence number for the given shard that can be checkpointed through KCL */
private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = { private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
shardIdToLatestStoredSeqNum.get(shardId) Option(shardIdToLatestStoredSeqNum.get(shardId))
} }
/** /**
...@@ -257,7 +257,7 @@ private[kinesis] class KinesisReceiver[T]( ...@@ -257,7 +257,7 @@ private[kinesis] class KinesisReceiver[T](
* for next block. Internally, this is synchronized with `rememberAddedRange()`. * for next block. Internally, this is synchronized with `rememberAddedRange()`.
*/ */
private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = { private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
blockIdToSeqNumRanges(blockId) = SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray) blockIdToSeqNumRanges.put(blockId, SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray))
seqNumRangesInCurrentBlock.clear() seqNumRangesInCurrentBlock.clear()
logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges") logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
} }
...@@ -265,7 +265,7 @@ private[kinesis] class KinesisReceiver[T]( ...@@ -265,7 +265,7 @@ private[kinesis] class KinesisReceiver[T](
/** Store the block along with its associated ranges */ /** Store the block along with its associated ranges */
private def storeBlockWithRanges( private def storeBlockWithRanges(
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = { blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = {
val rangesToReportOption = blockIdToSeqNumRanges.remove(blockId) val rangesToReportOption = Option(blockIdToSeqNumRanges.remove(blockId))
if (rangesToReportOption.isEmpty) { if (rangesToReportOption.isEmpty) {
stop("Error while storing block into Spark, could not find sequence number ranges " + stop("Error while storing block into Spark, could not find sequence number ranges " +
s"for block $blockId") s"for block $blockId")
...@@ -294,7 +294,7 @@ private[kinesis] class KinesisReceiver[T]( ...@@ -294,7 +294,7 @@ private[kinesis] class KinesisReceiver[T](
// Note that we are doing this sequentially because the array of sequence number ranges // Note that we are doing this sequentially because the array of sequence number ranges
// is assumed to be // is assumed to be
rangesToReport.ranges.foreach { range => rangesToReport.ranges.foreach { range =>
shardIdToLatestStoredSeqNum(range.shardId) = range.toSeqNumber shardIdToLatestStoredSeqNum.put(range.shardId, range.toSeqNumber)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment