Skip to content
Snippets Groups Projects
Commit 93eb2acf authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-9030] [STREAMING] [HOTFIX] Make sure that no attempts to create Kinesis...

[SPARK-9030] [STREAMING] [HOTFIX] Make sure that no attempts to create Kinesis streams is made when no enabled

Problem: Even when the environment variable to enable tests are disabled, the `beforeAll` of the KinesisStreamSuite attempted to find AWS credentials to create Kinesis stream, and failed.

Solution: Made sure all accesses to KinesisTestUtils, that created streams, is under `testOrIgnore`

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #7519 from tdas/kinesis-tests and squashes the following commits:

64d6d06 [Tathagata Das] Removed empty lines.
7c18473 [Tathagata Das] Putting all access to KinesisTestUtils inside testOrIgnore
parent 163e3f1d
No related branches found
No related tags found
No related merge requests found
...@@ -33,8 +33,6 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} ...@@ -33,8 +33,6 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
with Eventually with BeforeAndAfter with BeforeAndAfterAll { with Eventually with BeforeAndAfter with BeforeAndAfterAll {
private val kinesisTestUtils = new KinesisTestUtils()
// This is the name that KCL uses to save metadata to DynamoDB // This is the name that KCL uses to save metadata to DynamoDB
private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
...@@ -42,7 +40,6 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper ...@@ -42,7 +40,6 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
private var sc: SparkContext = _ private var sc: SparkContext = _
override def beforeAll(): Unit = { override def beforeAll(): Unit = {
kinesisTestUtils.createStream()
val conf = new SparkConf() val conf = new SparkConf()
.setMaster("local[4]") .setMaster("local[4]")
.setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
...@@ -53,15 +50,6 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper ...@@ -53,15 +50,6 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
sc.stop() sc.stop()
// Delete the Kinesis stream as well as the DynamoDB table generated by // Delete the Kinesis stream as well as the DynamoDB table generated by
// Kinesis Client Library when consuming the stream // Kinesis Client Library when consuming the stream
kinesisTestUtils.deleteStream()
kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
}
before {
// Delete the DynamoDB table generated by Kinesis Client Library when
// consuming from the stream, so that each unit test can start from
// scratch without prior history of data consumption
kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
} }
after { after {
...@@ -96,25 +84,32 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper ...@@ -96,25 +84,32 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
* and you have to set the system environment variable RUN_KINESIS_TESTS=1 . * and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
*/ */
testOrIgnore("basic operation") { testOrIgnore("basic operation") {
ssc = new StreamingContext(sc, Seconds(1)) val kinesisTestUtils = new KinesisTestUtils()
val aWSCredentials = KinesisTestUtils.getAWSCredentials() try {
val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, kinesisTestUtils.createStream()
kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, ssc = new StreamingContext(sc, Seconds(1))
Seconds(10), StorageLevel.MEMORY_ONLY, val aWSCredentials = KinesisTestUtils.getAWSCredentials()
aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey) val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] Seconds(10), StorageLevel.MEMORY_ONLY,
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey)
collected ++= rdd.collect()
logInfo("Collected = " + rdd.collect().toSeq.mkString(", ")) val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
} stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
ssc.start() collected ++= rdd.collect()
logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
val testData = 1 to 10 }
eventually(timeout(120 seconds), interval(10 second)) { ssc.start()
kinesisTestUtils.pushData(testData)
assert(collected === testData.toSet, "\nData received does not match data sent") val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
kinesisTestUtils.pushData(testData)
assert(collected === testData.toSet, "\nData received does not match data sent")
}
ssc.stop()
} finally {
kinesisTestUtils.deleteStream()
kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
} }
ssc.stop()
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment