diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index ed4cc75920e8e066f5f1763aa14241cbfebb1368..89e713f92df464630a136256f7f13b35b6a4c3b4 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -306,6 +306,30 @@ class KafkaSourceSuite extends KafkaSourceTest {
     )
   }
 
+  test("starting offset is latest by default") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, Array("0"))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+
+    val kafka = reader.load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+    val mapped = kafka.map(_.toInt)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2, 3),
+      CheckAnswer(1, 2, 3)  // should not have 0
+    )
+  }
+
   test("bad source options") {
     def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
       val ex = intercept[IllegalArgumentException] {