diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 3f438e99185b5556f1325a161b22ec8b309e460b..3f396a7e6b6985314ff05f67282de94aaccf627d 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private(
     var toFetchOffset = offset
     while (toFetchOffset != UNKNOWN_OFFSET) {
       try {
-        return fetchData(toFetchOffset, pollTimeoutMs)
+        return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
       } catch {
         case e: OffsetOutOfRangeException =>
           // When there is some error thrown, it's better to use a new consumer to drop all cached
@@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private(
   }
 
   /**
-   * Get the record at `offset`.
+   * Get the record for the given offset if available. Otherwise it will either throw error
+   * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
+   * or null.
    *
    * @throws OffsetOutOfRangeException if `offset` is out of range
    * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
    */
   private def fetchData(
       offset: Long,
-      pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+      untilOffset: Long,
+      pollTimeoutMs: Long,
+      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
     if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
       // This is the first fetch, or the last pre-fetched data has been drained.
       // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
@@ -190,10 +194,31 @@ private[kafka010] case class CachedKafkaConsumer private(
     } else {
       val record = fetchedData.next()
       nextOffsetInFetchedData = record.offset + 1
-      // `seek` is always called before "poll". So "record.offset" must be same as "offset".
-      assert(record.offset == offset,
-        s"The fetched data has a different offset: expected $offset but was ${record.offset}")
-      record
+      // In general, Kafka uses the specified offset as the start point, and tries to fetch the next
+      // available offset. Hence we need to handle offset mismatch.
+      if (record.offset > offset) {
+        // This may happen when some records aged out but their offsets already got verified
+        if (failOnDataLoss) {
+          reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
+          // Never happen as "reportDataLoss" will throw an exception
+          null
+        } else {
+          if (record.offset >= untilOffset) {
+            reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
+            null
+          } else {
+            reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
+            record
+          }
+        }
+      } else if (record.offset < offset) {
+        // This should not happen. If it does happen, then we probably misunderstand Kafka internal
+        // mechanism.
+        throw new IllegalStateException(
+          s"Tried to fetch $offset but the returned record offset was ${record.offset}")
+      } else {
+        record
+      }
     }
   }
 
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index d9ab4bb4f873d927e7e3aa8c15df28fdb64cb52c..92ee0ed93d940b4b011567c76ebadd5dbfb4b9a6 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource(
     sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
 
   private val offsetFetchAttemptIntervalMs =
-    sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
+    sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
 
   private val maxOffsetsPerTrigger =
     sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 2d6ccb22ddb06366ee3f1836610ec5a126205862..0e40abac65251bd91cc7dd4d2c0fb1585a768b90 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -31,11 +31,12 @@ import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.SparkContext
 import org.apache.spark.sql.ForeachWriter
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
 
 abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
 
@@ -811,6 +812,11 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
 
   private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
 
+  override def createSparkSession(): TestSparkSession = {
+    // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+    new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+  }
+
   override def beforeAll(): Unit = {
     super.beforeAll()
     testUtils = new KafkaTestUtils {
@@ -839,7 +845,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
     }
   }
 
-  ignore("stress test for failOnDataLoss=false") {
+  test("stress test for failOnDataLoss=false") {
     val reader = spark
       .readStream
       .format("kafka")
@@ -848,6 +854,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
       .option("subscribePattern", "failOnDataLoss.*")
       .option("startingOffsets", "earliest")
       .option("failOnDataLoss", "false")
+      .option("fetchOffset.retryIntervalMs", "3000")
     val kafka = reader.load()
       .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
       .as[(String, String)]
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index f43917e151c579778031066dcc0a5e237a36c3c9..fd1689acf6727de8bb5a5e730e6e593accf60a6e 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -184,7 +184,7 @@ class KafkaTestUtils extends Logging {
   def deleteTopic(topic: String): Unit = {
     val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size
     AdminUtils.deleteTopic(zkUtils, topic)
-    verifyTopicDeletion(zkUtils, topic, partitions, List(this.server))
+    verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server))
   }
 
   /** Add new paritions to a Kafka topic */
@@ -286,36 +286,57 @@ class KafkaTestUtils extends Logging {
     props
   }
 
+  /** Verify topic is deleted in all places, e.g, brokers, zookeeper. */
   private def verifyTopicDeletion(
+      topic: String,
+      numPartitions: Int,
+      servers: Seq[KafkaServer]): Unit = {
+    val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
+
+    import ZkUtils._
+    // wait until admin path for delete topic is deleted, signaling completion of topic deletion
+    assert(
+      !zkUtils.pathExists(getDeleteTopicPath(topic)),
+      s"${getDeleteTopicPath(topic)} still exists")
+    assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
+    // ensure that the topic-partition has been deleted from all brokers' replica managers
+    assert(servers.forall(server => topicAndPartitions.forall(tp =>
+      server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
+      s"topic $topic still exists in the replica manager")
+    // ensure that logs from all replicas are deleted if delete topic is marked successful
+    assert(servers.forall(server => topicAndPartitions.forall(tp =>
+      server.getLogManager().getLog(tp).isEmpty)),
+      s"topic $topic still exists in log mananger")
+    // ensure that topic is removed from all cleaner offsets
+    assert(servers.forall(server => topicAndPartitions.forall { tp =>
+      val checkpoints = server.getLogManager().logDirs.map { logDir =>
+        new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
+      }
+      checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
+    }), s"checkpoint for topic $topic still exists")
+    // ensure the topic is gone
+    assert(
+      !zkUtils.getAllTopics().contains(topic),
+      s"topic $topic still exists on zookeeper")
+  }
+
+  /** Verify topic is deleted. Retry to delete the topic if not. */
+  private def verifyTopicDeletionWithRetries(
       zkUtils: ZkUtils,
       topic: String,
       numPartitions: Int,
       servers: Seq[KafkaServer]) {
-    import ZkUtils._
-    val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
-    def isDeleted(): Boolean = {
-      // wait until admin path for delete topic is deleted, signaling completion of topic deletion
-      val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic))
-      val topicPath = !zkUtils.pathExists(getTopicPath(topic))
-      // ensure that the topic-partition has been deleted from all brokers' replica managers
-      val replicaManager = servers.forall(server => topicAndPartitions.forall(tp =>
-        server.replicaManager.getPartition(tp.topic, tp.partition) == None))
-      // ensure that logs from all replicas are deleted if delete topic is marked successful
-      val logManager = servers.forall(server => topicAndPartitions.forall(tp =>
-        server.getLogManager().getLog(tp).isEmpty))
-      // ensure that topic is removed from all cleaner offsets
-      val cleaner = servers.forall(server => topicAndPartitions.forall { tp =>
-        val checkpoints = server.getLogManager().logDirs.map { logDir =>
-          new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
-        }
-        checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
-      })
-      // ensure the topic is gone
-      val deleted = !zkUtils.getAllTopics().contains(topic)
-      deletePath && topicPath && replicaManager && logManager && cleaner && deleted
-    }
-    eventually(timeout(60.seconds)) {
-      assert(isDeleted, s"$topic not deleted after timeout")
+    eventually(timeout(60.seconds), interval(200.millis)) {
+      try {
+        verifyTopicDeletion(topic, numPartitions, servers)
+      } catch {
+        case e: Throwable =>
+          // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
+          // chance that a topic will be recreated after deletion due to the asynchronous update.
+          // Hence, delete the topic and retry.
+          AdminUtils.deleteTopic(zkUtils, topic)
+          throw e
+      }
     }
   }
 
@@ -331,7 +352,7 @@ class KafkaTestUtils extends Logging {
       case _ =>
         false
     }
-    eventually(timeout(10.seconds)) {
+    eventually(timeout(60.seconds)) {
       assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index db24ee8b46dd5b5a8945e4b2558638e9b8cdb9ee..2239f10870eda19015da83bf105143d60d581be0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -48,14 +48,18 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
    */
   protected implicit def sqlContext: SQLContext = _spark.sqlContext
 
+  protected def createSparkSession: TestSparkSession = {
+    new TestSparkSession(
+      sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
+  }
+
   /**
    * Initialize the [[TestSparkSession]].
    */
   protected override def beforeAll(): Unit = {
     SparkSession.sqlListener.set(null)
     if (_spark == null) {
-      _spark = new TestSparkSession(
-        sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
+      _spark = createSparkSession
     }
     // Ensure we have initialized the context before calling parent code
     super.beforeAll()