Skip to content
Snippets Groups Projects
Commit b8ebf63c authored by cody koeninger's avatar cody koeninger Committed by Tathagata Das
Browse files

[SPARK-16212][STREAMING][KAFKA] apply test tweaks from 0-10 to 0-8 as well

## What changes were proposed in this pull request?

Bring the kafka-0-8 subproject up to date with some test modifications from development on 0-10.

Main changes are
- eliminating waits on concurrent queue in favor of an assert on received results,
- atomics instead of volatile (although this probably doesn't matter)
- increasing uniqueness of topic names

## How was this patch tested?

Unit tests

Author: cody koeninger <cody@koeninger.org>

Closes #14073 from koeninger/kafka-0-8-test-direct-cleanup.
parent 8e3e4ed6
No related branches found
No related tags found
No related merge requests found
......@@ -244,12 +244,9 @@ class DirectKafkaStreamSuite
)
// Send data to Kafka and wait for it to be received
def sendDataAndWaitForReceive(data: Seq[Int]) {
def sendData(data: Seq[Int]) {
val strings = data.map { _.toString}
kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains })
}
}
// Setup the streaming context
......@@ -264,21 +261,21 @@ class DirectKafkaStreamSuite
}
ssc.checkpoint(testDir.getAbsolutePath)
// This is to collect the raw data received from Kafka
kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
val data = rdd.map { _._2 }.collect()
DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
}
// This is ensure all the data is eventually receiving only once
stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 }
rdd.collect().headOption.foreach { x =>
DirectKafkaStreamSuite.total.set(x._2)
}
}
ssc.start()
// Send some data and wait for them to be received
// Send some data
for (i <- (1 to 10).grouped(4)) {
sendDataAndWaitForReceive(i)
sendData(i)
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
}
ssc.stop()
......@@ -302,23 +299,26 @@ class DirectKafkaStreamSuite
val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
// Verify offset ranges have been recovered
val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
val recoveredOffsetRanges = getOffsetRanges(recoveredStream).map { x => (x._1, x._2.toSet) }
assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) }
val earlierOffsetRanges = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) }
assert(
recoveredOffsetRanges.forall { or =>
earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
earlierOffsetRanges.contains((or._1, or._2))
},
"Recovered ranges are not the same as the ones generated\n" +
s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
s"earlierOffsetRanges: $earlierOffsetRanges"
)
// Restart context, give more data and verify the total at the end
// If the total is write that means each records has been received only once
ssc.start()
sendDataAndWaitForReceive(11 to 20)
for (i <- (11 to 20).grouped(4)) {
sendData(i)
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
assert(DirectKafkaStreamSuite.total === (1 to 20).sum)
assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
}
ssc.stop()
}
......@@ -488,8 +488,7 @@ class DirectKafkaStreamSuite
}
object DirectKafkaStreamSuite {
val collectedData = new ConcurrentLinkedQueue[String]()
@volatile var total = -1L
val total = new AtomicLong(-1L)
class InputInfoCollector extends StreamingListener {
val numRecordsSubmitted = new AtomicLong(0L)
......
......@@ -53,13 +53,13 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
}
test("basic usage") {
val topic = s"topicbasic-${Random.nextInt}"
val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}"
kafkaTestUtils.createTopic(topic)
val messages = Array("the", "quick", "brown", "fox")
kafkaTestUtils.sendMessages(topic, messages)
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt}")
"group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
......@@ -92,12 +92,12 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
test("iterator boundary conditions") {
// the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
val topic = s"topicboundary-${Random.nextInt}"
val topic = s"topicboundary-${Random.nextInt}-${System.currentTimeMillis}"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
kafkaTestUtils.createTopic(topic)
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt}")
"group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
val kc = new KafkaCluster(kafkaParams)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment