-
- Downloads
[SPARK-18373][SPARK-18529][SS][KAFKA] Make failOnDataLoss=false work with Spark jobs
## What changes were proposed in this pull request? This PR adds `CachedKafkaConsumer.getAndIgnoreLostData` to handle corner cases of `failOnDataLoss=false`. It also resolves [SPARK-18529](https://issues.apache.org/jira/browse/SPARK-18529 ) after refactoring codes: Timeout will throw a TimeoutException. ## How was this patch tested? Because I cannot find any way to manually control the Kafka server to clean up logs, it's impossible to write unit tests for each corner case. Therefore, I just created `test("stress test for failOnDataLoss=false")` which should cover most of corner cases. I also modified some existing tests to test for both `failOnDataLoss=false` and `failOnDataLoss=true` to make sure it doesn't break existing logic. Author: Shixiong Zhu <shixiong@databricks.com> Closes #15820 from zsxwing/failOnDataLoss. (cherry picked from commit 2fd101b2) Signed-off-by:Tathagata Das <tathagata.das1565@gmail.com>
Showing
- external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala 208 additions, 28 deletions...a/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
- external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala 19 additions, 4 deletions...ain/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
- external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala 26 additions, 16 deletions.../scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
- external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala 255 additions, 42 deletions...cala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
- external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala 15 additions, 5 deletions.../scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
Loading
Please register or sign in to comment