Skip to content
Snippets Groups Projects
Commit 31d069d4 authored by Wilson Wu's avatar Wilson Wu Committed by Sean Owen
Browse files

[SPARK-13746][TESTS] stop using deprecated SynchronizedSet

trait SynchronizedSet in package mutable is deprecated

Author: Wilson Wu <wilson888888888@gmail.com>

Closes #11580 from wilson888888888/spark-synchronizedset.
parent acdf2197
No related branches found
No related tags found
No related merge requests found
...@@ -19,7 +19,7 @@ package org.apache.spark ...@@ -19,7 +19,7 @@ package org.apache.spark
import java.lang.ref.WeakReference import java.lang.ref.WeakReference
import scala.collection.mutable.{HashSet, SynchronizedSet} import scala.collection.mutable.HashSet
import scala.language.existentials import scala.language.existentials
import scala.util.Random import scala.util.Random
...@@ -442,25 +442,25 @@ class CleanerTester( ...@@ -442,25 +442,25 @@ class CleanerTester(
checkpointIds: Seq[Long] = Seq.empty) checkpointIds: Seq[Long] = Seq.empty)
extends Logging { extends Logging {
val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds val toBeCleanedRDDIds = new HashSet[Int] ++= rddIds
val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds val toBeCleanedShuffleIds = new HashSet[Int] ++= shuffleIds
val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds val toBeCleanedBroadcstIds = new HashSet[Long] ++= broadcastIds
val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds val toBeCheckpointIds = new HashSet[Long] ++= checkpointIds
val isDistributed = !sc.isLocal val isDistributed = !sc.isLocal
val cleanerListener = new CleanerListener { val cleanerListener = new CleanerListener {
def rddCleaned(rddId: Int): Unit = { def rddCleaned(rddId: Int): Unit = {
toBeCleanedRDDIds -= rddId toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds -= rddId }
logInfo("RDD " + rddId + " cleaned") logInfo("RDD " + rddId + " cleaned")
} }
def shuffleCleaned(shuffleId: Int): Unit = { def shuffleCleaned(shuffleId: Int): Unit = {
toBeCleanedShuffleIds -= shuffleId toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds -= shuffleId }
logInfo("Shuffle " + shuffleId + " cleaned") logInfo("Shuffle " + shuffleId + " cleaned")
} }
def broadcastCleaned(broadcastId: Long): Unit = { def broadcastCleaned(broadcastId: Long): Unit = {
toBeCleanedBroadcstIds -= broadcastId toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds -= broadcastId }
logInfo("Broadcast " + broadcastId + " cleaned") logInfo("Broadcast " + broadcastId + " cleaned")
} }
...@@ -469,7 +469,7 @@ class CleanerTester( ...@@ -469,7 +469,7 @@ class CleanerTester(
} }
def checkpointCleaned(rddId: Long): Unit = { def checkpointCleaned(rddId: Long): Unit = {
toBeCheckpointIds -= rddId toBeCheckpointIds.synchronized { toBeCheckpointIds -= rddId }
logInfo("checkpoint " + rddId + " cleaned") logInfo("checkpoint " + rddId + " cleaned")
} }
} }
...@@ -578,18 +578,27 @@ class CleanerTester( ...@@ -578,18 +578,27 @@ class CleanerTester(
} }
private def uncleanedResourcesToString = { private def uncleanedResourcesToString = {
val s1 = toBeCleanedRDDIds.synchronized {
toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")
}
val s2 = toBeCleanedShuffleIds.synchronized {
toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")
}
val s3 = toBeCleanedBroadcstIds.synchronized {
toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")
}
s""" s"""
|\tRDDs = ${toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")} |\tRDDs = $s1
|\tShuffles = ${toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")} |\tShuffles = $s2
|\tBroadcasts = ${toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")} |\tBroadcasts = $s3
""".stripMargin """.stripMargin
} }
private def isAllCleanedUp = private def isAllCleanedUp =
toBeCleanedRDDIds.isEmpty && toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds.isEmpty } &&
toBeCleanedShuffleIds.isEmpty && toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds.isEmpty } &&
toBeCleanedBroadcstIds.isEmpty && toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds.isEmpty } &&
toBeCheckpointIds.isEmpty toBeCheckpointIds.synchronized { toBeCheckpointIds.isEmpty }
private def getRDDBlocks(rddId: Int): Seq[BlockId] = { private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match { blockManager.master.getMatchingBlockIds( _ match {
......
...@@ -180,17 +180,20 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ...@@ -180,17 +180,20 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
Seconds(10), StorageLevel.MEMORY_ONLY, Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] val collected = new mutable.HashSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
collected ++= rdd.collect() collected.synchronized {
logInfo("Collected = " + collected.mkString(", ")) collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
}
} }
ssc.start() ssc.start()
val testData = 1 to 10 val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) { eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData, aggregateTestData) testUtils.pushData(testData, aggregateTestData)
assert(collected === testData.toSet, "\nData received does not match data sent") assert(collected.synchronized { collected === testData.toSet },
"\nData received does not match data sent")
} }
ssc.stop(stopSparkContext = false) ssc.stop(stopSparkContext = false)
} }
...@@ -205,10 +208,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ...@@ -205,10 +208,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
stream shouldBe a [ReceiverInputDStream[_]] stream shouldBe a [ReceiverInputDStream[_]]
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] val collected = new mutable.HashSet[Int]
stream.foreachRDD { rdd => stream.foreachRDD { rdd =>
collected ++= rdd.collect() collected.synchronized {
logInfo("Collected = " + collected.mkString(", ")) collected ++= rdd.collect()
logInfo("Collected = " + collected.mkString(", "))
}
} }
ssc.start() ssc.start()
...@@ -216,7 +221,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ...@@ -216,7 +221,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
eventually(timeout(120 seconds), interval(10 second)) { eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData, aggregateTestData) testUtils.pushData(testData, aggregateTestData)
val modData = testData.map(_ + 5) val modData = testData.map(_ + 5)
assert(collected === modData.toSet, "\nData received does not match data sent") assert(collected.synchronized { collected === modData.toSet },
"\nData received does not match data sent")
} }
ssc.stop(stopSparkContext = false) ssc.stop(stopSparkContext = false)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment