Skip to content
Snippets Groups Projects
Commit 03c27435 authored by Tathagata Das's avatar Tathagata Das
Browse files

[TEST][STREAMING] Fix flaky Kafka rate controlling test

## What changes were proposed in this pull request?

The current test is incorrect, because
- The expected number of messages does not take into account that the topic has 2 partitions, and rate is set per partition.
- Also in some cases, the test ran out of data in Kafka while waiting for the right amount of data per batch.

The PR
- Reduces the number of partitions to 1
- Adds more data to Kafka
- Runs with 0.5 second so that batches are created slowly

## How was this patch tested?
Ran many times locally, going to run it many times in Jenkins

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #14361 from tdas/kafka-rate-test-fix.
parent 6959061f
No related branches found
No related tags found
No related merge requests found
......@@ -544,15 +544,14 @@ class DirectKafkaStreamSuite
test("using rate controller") {
val topic = "backpressure"
val topicPartitions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
kafkaTestUtils.createTopic(topic, 2)
kafkaTestUtils.createTopic(topic, 1)
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
val executorKafkaParams = new JHashMap[String, Object](kafkaParams)
KafkaUtils.fixKafkaParams(executorKafkaParams)
val batchIntervalMilliseconds = 100
val batchIntervalMilliseconds = 500
val estimator = new ConstantEstimator(100)
val messages = Map("foo" -> 200)
val messages = Map("foo" -> 5000)
kafkaTestUtils.sendMessages(topic, messages)
val sparkConf = new SparkConf()
......@@ -596,7 +595,7 @@ class DirectKafkaStreamSuite
estimator.updateRate(rate) // Set a new rate.
// Expect blocks of data equal to "rate", scaled by the interval length in secs.
val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
eventually(timeout(5.seconds), interval(10 milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
assert(collectedData.asScala.exists(_.size == expectedSize),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment