-
- Downloads
[SPARK-18588][SS][KAFKA] Create a new KafkaConsumer when error happens to fix the flaky test
## What changes were proposed in this pull request? When KafkaSource fails on Kafka errors, we should create a new consumer to retry rather than using the existing broken one because it's possible that the broken one will fail again. This PR also assigns a new group id to the new created consumer for a possible race condition: the broken consumer cannot talk with the Kafka cluster in `close` but the new consumer can talk to Kafka cluster. I'm not sure if this will happen or not. Just for safety to avoid that the Kafka cluster thinks there are two consumers with the same group id in a short time window. (Note: CachedKafkaConsumer doesn't need this fix since `assign` never uses the group id.) ## How was this patch tested? In https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/70370/console , it ran this flaky test 120 times and all passed. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16282 from zsxwing/kafka-fix. (cherry picked from commit 95efc895) Signed-off-by:Tathagata Das <tathagata.das1565@gmail.com>
Showing
- dev/sparktestsupport/modules.py 2 additions, 1 deletiondev/sparktestsupport/modules.py
- external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala 42 additions, 16 deletions...ain/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
- external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala 7 additions, 14 deletions...a/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
- external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala 1 addition, 1 deletion...cala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
Loading
Please register or sign in to comment