diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index cb782d27fe22cb61714c5f45a18057e2bc0cdd40..ab1c5055a253f0bd6c313e8a1ef84f201a70f98c 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -244,12 +244,9 @@ class DirectKafkaStreamSuite ) // Send data to Kafka and wait for it to be received - def sendDataAndWaitForReceive(data: Seq[Int]) { + def sendData(data: Seq[Int]) { val strings = data.map { _.toString} kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap) - eventually(timeout(10 seconds), interval(50 milliseconds)) { - assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains }) - } } // Setup the streaming context @@ -264,21 +261,21 @@ class DirectKafkaStreamSuite } ssc.checkpoint(testDir.getAbsolutePath) - // This is to collect the raw data received from Kafka - kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => - val data = rdd.map { _._2 }.collect() - DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*)) - } - // This is ensure all the data is eventually receiving only once stateStream.foreachRDD { (rdd: RDD[(String, Int)]) => - rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 } + rdd.collect().headOption.foreach { x => + DirectKafkaStreamSuite.total.set(x._2) + } } ssc.start() - // Send some data and wait for them to be received + // Send some data for (i <- (1 to 10).grouped(4)) { - sendDataAndWaitForReceive(i) + sendData(i) + } + + eventually(timeout(10 seconds), interval(50 milliseconds)) { + assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum) } ssc.stop() @@ -302,23 +299,26 @@ class DirectKafkaStreamSuite val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]] // Verify offset ranges have been recovered - val recoveredOffsetRanges = getOffsetRanges(recoveredStream) + val recoveredOffsetRanges = getOffsetRanges(recoveredStream).map { x => (x._1, x._2.toSet) } assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered") - val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) } + val earlierOffsetRanges = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) } assert( recoveredOffsetRanges.forall { or => - earlierOffsetRangesAsSets.contains((or._1, or._2.toSet)) + earlierOffsetRanges.contains((or._1, or._2)) }, "Recovered ranges are not the same as the ones generated\n" + s"recoveredOffsetRanges: $recoveredOffsetRanges\n" + - s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets" + s"earlierOffsetRanges: $earlierOffsetRanges" ) // Restart context, give more data and verify the total at the end // If the total is write that means each records has been received only once ssc.start() - sendDataAndWaitForReceive(11 to 20) + for (i <- (11 to 20).grouped(4)) { + sendData(i) + } + eventually(timeout(10 seconds), interval(50 milliseconds)) { - assert(DirectKafkaStreamSuite.total === (1 to 20).sum) + assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum) } ssc.stop() } @@ -488,8 +488,7 @@ class DirectKafkaStreamSuite } object DirectKafkaStreamSuite { - val collectedData = new ConcurrentLinkedQueue[String]() - @volatile var total = -1L + val total = new AtomicLong(-1L) class InputInfoCollector extends StreamingListener { val numRecordsSubmitted = new AtomicLong(0L) diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 5e539c1d790cc91b753771a7b7b418365a0c1973..809699a7399620188f9d023a7c6dc26367245621 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -53,13 +53,13 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { } test("basic usage") { - val topic = s"topicbasic-${Random.nextInt}" + val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}" kafkaTestUtils.createTopic(topic) val messages = Array("the", "quick", "brown", "fox") kafkaTestUtils.sendMessages(topic, messages) val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "group.id" -> s"test-consumer-${Random.nextInt}") + "group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}") val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) @@ -92,12 +92,12 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { test("iterator boundary conditions") { // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd - val topic = s"topicboundary-${Random.nextInt}" + val topic = s"topicboundary-${Random.nextInt}-${System.currentTimeMillis}" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) kafkaTestUtils.createTopic(topic) val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "group.id" -> s"test-consumer-${Random.nextInt}") + "group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}") val kc = new KafkaCluster(kafkaParams)