Skip to content
Snippets Groups Projects
Commit fc1efb72 authored by Adam Roberts's avatar Adam Roberts Committed by Sean Owen
Browse files

[SPARK-17534][TESTS] Increase timeouts for DirectKafkaStreamSuite tests

**## What changes were proposed in this pull request?**
There are two tests in this suite that are particularly flaky on the following hardware:

2x Intel(R) Xeon(R) CPU E5-2697 v2  2.70GHz and 16 GB of RAM, 1 TB HDD

This simple PR increases the timeout times and batch duration so they can reliably pass

**## How was this patch tested?**
Existing unit tests with the two core box where I was seeing the failures often

Author: Adam Roberts <aroberts@uk.ibm.com>

Closes #15094 from a-roberts/patch-6.
parent b2e27262
No related branches found
No related tags found
No related merge requests found
......@@ -108,7 +108,7 @@ class DirectKafkaStreamSuite
val expectedTotal = (data.values.sum * topics.size) - 2
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
ssc = new StreamingContext(sparkConf, Milliseconds(200))
ssc = new StreamingContext(sparkConf, Milliseconds(1000))
val stream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String](
ssc,
......@@ -150,7 +150,7 @@ class DirectKafkaStreamSuite
allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
}
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
eventually(timeout(100000.milliseconds), interval(1000.milliseconds)) {
assert(allReceived.size === expectedTotal,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
......@@ -172,7 +172,7 @@ class DirectKafkaStreamSuite
val expectedTotal = (data.values.sum * 2) - 3
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
ssc = new StreamingContext(sparkConf, Milliseconds(200))
ssc = new StreamingContext(sparkConf, Milliseconds(1000))
val stream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String](
ssc,
......@@ -214,7 +214,7 @@ class DirectKafkaStreamSuite
allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
}
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
eventually(timeout(100000.milliseconds), interval(1000.milliseconds)) {
assert(allReceived.size === expectedTotal,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment