From b06c23db9aedae48c9eba9d702ae82fa5647cfe5 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Mon, 7 Nov 2016 10:43:36 -0800
Subject: [PATCH] [SPARK-18283][STRUCTURED STREAMING][KAFKA] Added test to
 check whether default starting offset in latest

## What changes were proposed in this pull request?

Added test to check whether default starting offset in latest

## How was this patch tested?
new unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #15778 from tdas/SPARK-18283.
---
 .../spark/sql/kafka010/KafkaSourceSuite.scala | 24 +++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index ed4cc75920..89e713f92d 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -306,6 +306,30 @@ class KafkaSourceSuite extends KafkaSourceTest {
     )
   }
 
+  test("starting offset is latest by default") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, Array("0"))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+
+    val kafka = reader.load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+    val mapped = kafka.map(_.toInt)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2, 3),
+      CheckAnswer(1, 2, 3)  // should not have 0
+    )
+  }
+
   test("bad source options") {
     def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
       val ex = intercept[IllegalArgumentException] {
-- 
GitLab