Skip to content
Snippets Groups Projects
Commit b06c23db authored by Tathagata Das's avatar Tathagata Das Committed by Shixiong Zhu
Browse files

[SPARK-18283][STRUCTURED STREAMING][KAFKA] Added test to check whether default...

[SPARK-18283][STRUCTURED STREAMING][KAFKA] Added test to check whether default starting offset in latest

## What changes were proposed in this pull request?

Added test to check whether default starting offset in latest

## How was this patch tested?
new unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #15778 from tdas/SPARK-18283.
parent daa975f4
No related branches found
No related tags found
No related merge requests found
...@@ -306,6 +306,30 @@ class KafkaSourceSuite extends KafkaSourceTest { ...@@ -306,6 +306,30 @@ class KafkaSourceSuite extends KafkaSourceTest {
) )
} }
test("starting offset is latest by default") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 5)
testUtils.sendMessages(topic, Array("0"))
require(testUtils.getLatestOffsets(Set(topic)).size === 5)
val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
val kafka = reader.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
val mapped = kafka.map(_.toInt)
testStream(mapped)(
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(1, 2, 3) // should not have 0
)
}
test("bad source options") { test("bad source options") {
def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = { def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
val ex = intercept[IllegalArgumentException] { val ex = intercept[IllegalArgumentException] {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment