Skip to content
Snippets Groups Projects
Commit 8f35d3ea authored by Huaxin Gao's avatar Huaxin Gao Committed by Sean Owen
Browse files

[SPARK-13186][STREAMING] migrate away from SynchronizedMap

trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to java.util.concurrent.ConcurrentHashMap instead.

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #11250 from huaxingao/spark__13186.
parent 39ff1545
No related branches found
No related tags found
No related merge requests found
...@@ -65,19 +65,20 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter ...@@ -65,19 +65,20 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long] val result = new mutable.HashMap[String, Long]()
stream.map(_._2).countByValue().foreachRDD { r => stream.map(_._2).countByValue().foreachRDD { r =>
val ret = r.collect() r.collect().foreach { kv =>
ret.toMap.foreach { kv => result.synchronized {
val count = result.getOrElseUpdate(kv._1, 0) + kv._2 val count = result.getOrElseUpdate(kv._1, 0) + kv._2
result.put(kv._1, count) result.put(kv._1, count)
}
} }
} }
ssc.start() ssc.start()
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(sent === result) assert(result.synchronized { sent === result })
} }
} }
} }
...@@ -230,7 +230,6 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ...@@ -230,7 +230,6 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
val awsCredentials = KinesisTestUtils.getAWSCredentials() val awsCredentials = KinesisTestUtils.getAWSCredentials()
val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])] val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
...@@ -241,13 +240,16 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ...@@ -241,13 +240,16 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => { kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
collectedData(time) = (kRdd.arrayOfseqNumberRanges, data) collectedData.synchronized {
collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
}
}) })
ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
ssc.start() ssc.start()
def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty) def numBatchesWithData: Int =
collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) }
def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
...@@ -268,21 +270,23 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ...@@ -268,21 +270,23 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
// Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
// and return the same data // and return the same data
val times = collectedData.keySet collectedData.synchronized {
times.foreach { time => val times = collectedData.keySet
val (arrayOfSeqNumRanges, data) = collectedData(time) times.foreach { time =>
val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]] val (arrayOfSeqNumRanges, data) = collectedData(time)
rdd shouldBe a [KinesisBackedBlockRDD[_]] val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
rdd shouldBe a[KinesisBackedBlockRDD[_]]
// Verify the recovered sequence ranges
val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] // Verify the recovered sequence ranges
assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size) val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) => assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
assert(expected.ranges.toSeq === found.ranges.toSeq) arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
assert(expected.ranges.toSeq === found.ranges.toSeq)
}
// Verify the recovered data
assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
} }
// Verify the recovered data
assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
} }
ssc.stop() ssc.stop()
} }
......
...@@ -117,7 +117,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( ...@@ -117,7 +117,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Map of batch-time to selected file info for the remembered batches // Map of batch-time to selected file info for the remembered batches
// This is a concurrent map because it's also accessed in unit tests // This is a concurrent map because it's also accessed in unit tests
@transient private[streaming] var batchTimeToSelectedFiles = @transient private[streaming] var batchTimeToSelectedFiles =
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] new mutable.HashMap[Time, Array[String]]
// Set of files that were selected in the remembered batches // Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]() @transient private var recentlySelectedFiles = new mutable.HashSet[String]()
...@@ -148,7 +148,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( ...@@ -148,7 +148,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Find new files // Find new files
val newFiles = findNewFiles(validTime.milliseconds) val newFiles = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
batchTimeToSelectedFiles += ((validTime, newFiles)) batchTimeToSelectedFiles.synchronized {
batchTimeToSelectedFiles += ((validTime, newFiles))
}
recentlySelectedFiles ++= newFiles recentlySelectedFiles ++= newFiles
val rdds = Some(filesToRDD(newFiles)) val rdds = Some(filesToRDD(newFiles))
// Copy newFiles to immutable.List to prevent from being modified by the user // Copy newFiles to immutable.List to prevent from being modified by the user
...@@ -162,14 +164,15 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( ...@@ -162,14 +164,15 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
/** Clear the old time-to-files mappings along with old RDDs */ /** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) { protected[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time) batchTimeToSelectedFiles.synchronized {
val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration)) val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
batchTimeToSelectedFiles --= oldFiles.keys batchTimeToSelectedFiles --= oldFiles.keys
recentlySelectedFiles --= oldFiles.values.flatten recentlySelectedFiles --= oldFiles.values.flatten
logInfo("Cleared " + oldFiles.size + " old files that were older than " + logInfo("Cleared " + oldFiles.size + " old files that were older than " +
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
logDebug("Cleared files are:\n" + logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
// Delete file mod times that weren't accessed in the last round of getting new files // Delete file mod times that weren't accessed in the last round of getting new files
fileToModTime.clearOldValues(lastNewFileFindingTime - 1) fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
} }
...@@ -307,8 +310,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( ...@@ -307,8 +310,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug(this.getClass().getSimpleName + ".readObject used") logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject() ois.defaultReadObject()
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]() generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
batchTimeToSelectedFiles = batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]() recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true) fileToModTime = new TimeStampedHashMap[String, Long](true)
} }
...@@ -324,7 +326,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( ...@@ -324,7 +326,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
override def update(time: Time) { override def update(time: Time) {
hadoopFiles.clear() hadoopFiles.clear()
hadoopFiles ++= batchTimeToSelectedFiles batchTimeToSelectedFiles.synchronized { hadoopFiles ++= batchTimeToSelectedFiles }
} }
override def cleanup(time: Time) { } override def cleanup(time: Time) { }
...@@ -335,7 +337,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( ...@@ -335,7 +337,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Restore the metadata in both files and generatedRDDs // Restore the metadata in both files and generatedRDDs
logInfo("Restoring files for time " + t + " - " + logInfo("Restoring files for time " + t + " - " +
f.mkString("[", ", ", "]") ) f.mkString("[", ", ", "]") )
batchTimeToSelectedFiles += ((t, f)) batchTimeToSelectedFiles.synchronized { batchTimeToSelectedFiles += ((t, f)) }
recentlySelectedFiles ++= f recentlySelectedFiles ++= f
generatedRDDs += ((t, filesToRDD(f))) generatedRDDs += ((t, filesToRDD(f)))
} }
......
...@@ -613,7 +613,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester ...@@ -613,7 +613,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
def recordedFiles(ssc: StreamingContext): Seq[Int] = { def recordedFiles(ssc: StreamingContext): Seq[Int] = {
val fileInputDStream = val fileInputDStream =
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized
{ fileInputDStream.batchTimeToSelectedFiles.values.flatten }
filenames.map(_.split(File.separator).last.toInt).toSeq.sorted filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
} }
......
...@@ -270,7 +270,10 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { ...@@ -270,7 +270,10 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
} }
} }
_ssc.stop() _ssc.stop()
failureReasonsCollector.failureReasons.toMap failureReasonsCollector.failureReasons.synchronized
{
failureReasonsCollector.failureReasons.toMap
}
} }
/** Check if a sequence of numbers is in increasing order */ /** Check if a sequence of numbers is in increasing order */
...@@ -354,12 +357,15 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O ...@@ -354,12 +357,15 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O
*/ */
class FailureReasonsCollector extends StreamingListener { class FailureReasonsCollector extends StreamingListener {
val failureReasons = new HashMap[Int, String] with SynchronizedMap[Int, String] val failureReasons = new HashMap[Int, String]
override def onOutputOperationCompleted( override def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = { outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
outputOperationCompleted.outputOperationInfo.failureReason.foreach { f => outputOperationCompleted.outputOperationInfo.failureReason.foreach { f =>
failureReasons(outputOperationCompleted.outputOperationInfo.id) = f failureReasons.synchronized
{
failureReasons(outputOperationCompleted.outputOperationInfo.id) = f
}
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment