diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index d3dd541fe437182c7a046ed45123f9c213f3dd3c..50f71413abf3771842e509f70fcf1b53d05b10f2 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -33,8 +33,6 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper with Eventually with BeforeAndAfter with BeforeAndAfterAll { - private val kinesisTestUtils = new KinesisTestUtils() - // This is the name that KCL uses to save metadata to DynamoDB private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" @@ -42,7 +40,6 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper private var sc: SparkContext = _ override def beforeAll(): Unit = { - kinesisTestUtils.createStream() val conf = new SparkConf() .setMaster("local[4]") .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name @@ -53,15 +50,6 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper sc.stop() // Delete the Kinesis stream as well as the DynamoDB table generated by // Kinesis Client Library when consuming the stream - kinesisTestUtils.deleteStream() - kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) - } - - before { - // Delete the DynamoDB table generated by Kinesis Client Library when - // consuming from the stream, so that each unit test can start from - // scratch without prior history of data consumption - kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) } after { @@ -96,25 +84,32 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper * and you have to set the system environment variable RUN_KINESIS_TESTS=1 . */ testOrIgnore("basic operation") { - ssc = new StreamingContext(sc, Seconds(1)) - val aWSCredentials = KinesisTestUtils.getAWSCredentials() - val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, - kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, - aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey) - - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] - stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => - collected ++= rdd.collect() - logInfo("Collected = " + rdd.collect().toSeq.mkString(", ")) - } - ssc.start() - - val testData = 1 to 10 - eventually(timeout(120 seconds), interval(10 second)) { - kinesisTestUtils.pushData(testData) - assert(collected === testData.toSet, "\nData received does not match data sent") + val kinesisTestUtils = new KinesisTestUtils() + try { + kinesisTestUtils.createStream() + ssc = new StreamingContext(sc, Seconds(1)) + val aWSCredentials = KinesisTestUtils.getAWSCredentials() + val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, + kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, + Seconds(10), StorageLevel.MEMORY_ONLY, + aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey) + + val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] + stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => + collected ++= rdd.collect() + logInfo("Collected = " + rdd.collect().toSeq.mkString(", ")) + } + ssc.start() + + val testData = 1 to 10 + eventually(timeout(120 seconds), interval(10 second)) { + kinesisTestUtils.pushData(testData) + assert(collected === testData.toSet, "\nData received does not match data sent") + } + ssc.stop() + } finally { + kinesisTestUtils.deleteStream() + kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) } - ssc.stop() } }