diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index cce4b74ff2994f4da35c5f01b1b1ad86674ca899..25c8a69b1f1edeafbfe3ff194d07feef9fb6c563 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -17,13 +17,9 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.io._
-import java.nio.charset.StandardCharsets
+import scala.collection.mutable.ArrayBuffer
 
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.io.Codec
-
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -44,33 +40,12 @@ class FileStreamSource(
     dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
 
   private val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration)
-  private var maxBatchId = -1
-  private val seenFiles = new OpenHashSet[String]
+  private val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataPath)
+  private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
 
-  /** Map of batch id to files. This map is also stored in `metadataPath`. */
-  private val batchToMetadata = new HashMap[Long, Seq[String]]
-
-  {
-    // Restore file paths from the metadata files
-    val existingBatchFiles = fetchAllBatchFiles()
-    if (existingBatchFiles.nonEmpty) {
-      val existingBatchIds = existingBatchFiles.map(_.getPath.getName.toInt)
-      maxBatchId = existingBatchIds.max
-      // Recover "batchToMetadata" and "seenFiles" from existing metadata files.
-      existingBatchIds.sorted.foreach { batchId =>
-        val files = readBatch(batchId)
-        if (files.isEmpty) {
-          // Assert that the corrupted file must be the latest metadata file.
-          if (batchId != maxBatchId) {
-            throw new IllegalStateException("Invalid metadata files")
-          }
-          maxBatchId = maxBatchId - 1
-        } else {
-          batchToMetadata(batchId) = files
-          files.foreach(seenFiles.add)
-        }
-      }
-    }
+  private val seenFiles = new OpenHashSet[String]
+  metadataLog.get(None, maxBatchId).foreach { case (batchId, files) =>
+    files.foreach(seenFiles.add)
   }
 
   /** Returns the schema of the data from this source */
@@ -112,7 +87,7 @@ class FileStreamSource(
 
     if (newFiles.nonEmpty) {
       maxBatchId += 1
-      writeBatch(maxBatchId, newFiles)
+      metadataLog.add(maxBatchId, newFiles)
     }
 
     new LongOffset(maxBatchId)
@@ -140,9 +115,7 @@ class FileStreamSource(
     val endId = end.offset
 
     if (startId + 1 <= endId) {
-      val files = (startId + 1 to endId).filter(_ >= 0).flatMap { batchId =>
-          batchToMetadata.getOrElse(batchId, Nil)
-        }.toArray
+      val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
       logDebug(s"Return files from batches ${startId + 1}:$endId")
       logDebug(s"Streaming ${files.mkString(", ")}")
       Some(new Batch(end, dataFrameBuilder(files)))
@@ -152,89 +125,9 @@ class FileStreamSource(
     }
   }
 
-  private def fetchAllBatchFiles(): Seq[FileStatus] = {
-    try fs.listStatus(new Path(metadataPath)) catch {
-      case _: java.io.FileNotFoundException =>
-        fs.mkdirs(new Path(metadataPath))
-        Seq.empty
-    }
-  }
-
   private def fetchAllFiles(): Seq[String] = {
     fs.listStatus(new Path(path))
       .filterNot(_.getPath.getName.startsWith("_"))
       .map(_.getPath.toUri.toString)
   }
-
-  /**
-   * Write the metadata of a batch to disk. The file format is as follows:
-   *
-   * {{{
-   *   <FileStreamSource.VERSION>
-   *   START
-   *   -/a/b/c
-   *   -/d/e/f
-   *   ...
-   *   END
-   * }}}
-   *
-   * Note: <FileStreamSource.VERSION> means the value of `FileStreamSource.VERSION`. Every file
-   * path starts with "-" so that we can know if a line is a file path easily.
-   */
-  private def writeBatch(id: Int, files: Seq[String]): Unit = {
-    assert(files.nonEmpty, "create a new batch without any file")
-    val output = fs.create(new Path(metadataPath + "/" + id), true)
-    val writer = new PrintWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8))
-    try {
-      // scalastyle:off println
-      writer.println(FileStreamSource.VERSION)
-      writer.println(FileStreamSource.START_TAG)
-      files.foreach(file => writer.println(FileStreamSource.PATH_PREFIX + file))
-      writer.println(FileStreamSource.END_TAG)
-      // scalastyle:on println
-    } finally {
-      writer.close()
-    }
-    batchToMetadata(id) = files
-  }
-
-  /** Read the file names of the specified batch id from the metadata file */
-  private def readBatch(id: Int): Seq[String] = {
-    val input = fs.open(new Path(metadataPath + "/" + id))
-    try {
-      FileStreamSource.readBatch(input)
-    } finally {
-      input.close()
-    }
-  }
-}
-
-object FileStreamSource {
-
-  private val START_TAG = "START"
-  private val END_TAG = "END"
-  private val PATH_PREFIX = "-"
-  val VERSION = "FILESTREAM_V1"
-
-  /**
-   * Parse a metadata file and return the content. If the metadata file is corrupted, it will return
-   * an empty `Seq`.
-   */
-  def readBatch(input: InputStream): Seq[String] = {
-    val lines = scala.io.Source.fromInputStream(input)(Codec.UTF8).getLines().toArray
-    if (lines.length < 4) {
-      // version + start tag + end tag + at least one file path
-      return Nil
-    }
-    if (lines.head != VERSION) {
-      return Nil
-    }
-    if (lines(1) != START_TAG) {
-      return Nil
-    }
-    if (lines.last != END_TAG) {
-      return Nil
-    }
-    lines.slice(2, lines.length - 1).map(_.stripPrefix(PATH_PREFIX)) // Drop character "-"
-  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
new file mode 100644
index 0000000000000000000000000000000000000000..ac2842b6d5df984435af0c963cc8ae8059eacafd
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -0,0 +1,193 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{FileNotFoundException, IOException}
+import java.nio.ByteBuffer
+import java.util.{ConcurrentModificationException, EnumSet}
+
+import scala.reflect.ClassTag
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.sql.SQLContext
+
+/**
+ * A [[MetadataLog]] implementation based on HDFS. [[HDFSMetadataLog]] uses the specified `path`
+ * as the metadata storage.
+ *
+ * When writing a new batch, [[HDFSMetadataLog]] will firstly write to a temp file and then rename
+ * it to the final batch file. If the rename step fails, there must be multiple writers and only
+ * one of them will succeed and the others will fail.
+ *
+ * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
+ * files in a directory always shows the latest files.
+ */
+class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends MetadataLog[T] {
+
+  private val metadataPath = new Path(path)
+
+  private val fc =
+    if (metadataPath.toUri.getScheme == null) {
+      FileContext.getFileContext(sqlContext.sparkContext.hadoopConfiguration)
+    } else {
+      FileContext.getFileContext(metadataPath.toUri, sqlContext.sparkContext.hadoopConfiguration)
+    }
+
+  if (!fc.util().exists(metadataPath)) {
+    fc.mkdir(metadataPath, FsPermission.getDirDefault, true)
+  }
+
+  /**
+   * A `PathFilter` to filter only batch files
+   */
+  private val batchFilesFilter = new PathFilter {
+    override def accept(path: Path): Boolean = try {
+      path.getName.toLong
+      true
+    } catch {
+      case _: NumberFormatException => false
+    }
+  }
+
+  private val serializer = new JavaSerializer(sqlContext.sparkContext.conf).newInstance()
+
+  private def batchFile(batchId: Long): Path = {
+    new Path(metadataPath, batchId.toString)
+  }
+
+  override def add(batchId: Long, metadata: T): Boolean = {
+    get(batchId).map(_ => false).getOrElse {
+      // Only write metadata when the batch has not yet been written.
+      val buffer = serializer.serialize(metadata)
+      try {
+        writeBatch(batchId, JavaUtils.bufferToArray(buffer))
+        true
+      } catch {
+        case e: IOException if "java.lang.InterruptedException" == e.getMessage =>
+          // create may convert InterruptedException to IOException. Let's convert it back to
+          // InterruptedException so that this failure won't crash StreamExecution
+          throw new InterruptedException("Creating file is interrupted")
+      }
+    }
+  }
+
+  /**
+   * Write a batch to a temp file then rename it to the batch file.
+   *
+   * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
+   * valid behavior, we still need to prevent it from destroying the files.
+   */
+  private def writeBatch(batchId: Long, bytes: Array[Byte]): Unit = {
+    // Use nextId to create a temp file
+    var nextId = 0
+    while (true) {
+      val tempPath = new Path(metadataPath, s".${batchId}_$nextId.tmp")
+      fc.deleteOnExit(tempPath)
+      try {
+        val output = fc.create(tempPath, EnumSet.of(CreateFlag.CREATE))
+        try {
+          output.write(bytes)
+        } finally {
+          output.close()
+        }
+        try {
+          // Try to commit the batch
+          // It will fail if there is an existing file (someone has committed the batch)
+          fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE)
+          return
+        } catch {
+          case e: IOException if isFileAlreadyExistsException(e) =>
+            // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
+            // So throw an exception to tell the user this is not a valid behavior.
+            throw new ConcurrentModificationException(
+              s"Multiple HDFSMetadataLog are using $path", e)
+          case e: FileNotFoundException =>
+            // Sometimes, "create" will succeed when multiple writers are calling it at the same
+            // time. However, only one writer can call "rename" successfully, others will get
+            // FileNotFoundException because the first writer has removed it.
+            throw new ConcurrentModificationException(
+              s"Multiple HDFSMetadataLog are using $path", e)
+        }
+      } catch {
+        case e: IOException if isFileAlreadyExistsException(e) =>
+          // Failed to create "tempPath". There are two cases:
+          // 1. Someone is creating "tempPath" too.
+          // 2. This is a restart. "tempPath" has already been created but not moved to the final
+          // batch file (not committed).
+          //
+          // For both cases, the batch has not yet been committed. So we can retry it.
+          //
+          // Note: there is a potential risk here: if HDFSMetadataLog A is running, people can use
+          // the same metadata path to create "HDFSMetadataLog" and fail A. However, this is not a
+          // big problem because it requires the attacker must have the permission to write the
+          // metadata path. In addition, the old Streaming also have this issue, people can create
+          // malicious checkpoint files to crash a Streaming application too.
+          nextId += 1
+      }
+    }
+  }
+
+  private def isFileAlreadyExistsException(e: IOException): Boolean = {
+    e.isInstanceOf[FileAlreadyExistsException] ||
+      // Old Hadoop versions don't throw FileAlreadyExistsException. Although it's fixed in
+      // HADOOP-9361, we still need to support old Hadoop versions.
+      (e.getMessage != null && e.getMessage.startsWith("File already exists: "))
+  }
+
+  override def get(batchId: Long): Option[T] = {
+    val batchMetadataFile = batchFile(batchId)
+    if (fc.util().exists(batchMetadataFile)) {
+      val input = fc.open(batchMetadataFile)
+      val bytes = IOUtils.toByteArray(input)
+      Some(serializer.deserialize[T](ByteBuffer.wrap(bytes)))
+    } else {
+      None
+    }
+  }
+
+  override def get(startId: Option[Long], endId: Long): Array[(Long, T)] = {
+    val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
+      .map(_.getPath.getName.toLong)
+      .filter { batchId =>
+      batchId <= endId && (startId.isEmpty || batchId >= startId.get)
+    }
+    batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map {
+      case (batchId, metadataOption) =>
+        (batchId, metadataOption.get)
+    }
+  }
+
+  override def getLatest(): Option[(Long, T)] = {
+    val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
+      .map(_.getPath.getName.toLong)
+      .sorted
+      .reverse
+    for (batchId <- batchIds) {
+      val batch = get(batchId)
+      if (batch.isDefined) {
+        return Some((batchId, batch.get))
+      }
+    }
+    None
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
new file mode 100644
index 0000000000000000000000000000000000000000..3f9896d23ce36b86a6a5991759092efa5ce455cf
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.streaming
+
+/**
+ * A general MetadataLog that supports the following features:
+ *
+ *  - Allow the user to store a metadata object for each batch.
+ *  - Allow the user to query the latest batch id.
+ *  - Allow the user to query the metadata object of a specified batch id.
+ *  - Allow the user to query metadata objects in a range of batch ids.
+ */
+trait MetadataLog[T] {
+
+  /**
+   * Store the metadata for the specified batchId and return `true` if successful. If the batchId's
+   * metadata has already been stored, this method will return `false`.
+   */
+  def add(batchId: Long, metadata: T): Boolean
+
+  /**
+   * Return the metadata for the specified batchId if it's stored. Otherwise, return None.
+   */
+  def get(batchId: Long): Option[T]
+
+  /**
+   * Return metadata for batches between startId (inclusive) and endId (inclusive). If `startId` is
+   * `None`, just return all batches before endId (inclusive).
+   */
+  def get(startId: Option[Long], endId: Long): Array[(Long, T)]
+
+  /**
+   * Return the latest batch Id and its metadata if exist.
+   */
+  def getLatest(): Option[(Long, T)]
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..4ddc218455eb2b53e4f5781669de92ade44209cd
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.ConcurrentModificationException
+
+import org.scalatest.concurrent.AsyncAssertions._
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.SharedSQLContext
+
+class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
+
+  test("basic") {
+    withTempDir { temp =>
+      val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+      assert(metadataLog.add(0, "batch0"))
+      assert(metadataLog.getLatest() === Some(0 -> "batch0"))
+      assert(metadataLog.get(0) === Some("batch0"))
+      assert(metadataLog.getLatest() === Some(0 -> "batch0"))
+      assert(metadataLog.get(None, 0) === Array(0 -> "batch0"))
+
+      assert(metadataLog.add(1, "batch1"))
+      assert(metadataLog.get(0) === Some("batch0"))
+      assert(metadataLog.get(1) === Some("batch1"))
+      assert(metadataLog.getLatest() === Some(1 -> "batch1"))
+      assert(metadataLog.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1"))
+
+      // Adding the same batch does nothing
+      metadataLog.add(1, "batch1-duplicated")
+      assert(metadataLog.get(0) === Some("batch0"))
+      assert(metadataLog.get(1) === Some("batch1"))
+      assert(metadataLog.getLatest() === Some(1 -> "batch1"))
+      assert(metadataLog.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1"))
+    }
+  }
+
+  test("restart") {
+    withTempDir { temp =>
+      val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+      assert(metadataLog.add(0, "batch0"))
+      assert(metadataLog.add(1, "batch1"))
+      assert(metadataLog.get(0) === Some("batch0"))
+      assert(metadataLog.get(1) === Some("batch1"))
+      assert(metadataLog.getLatest() === Some(1 -> "batch1"))
+      assert(metadataLog.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1"))
+
+      val metadataLog2 = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+      assert(metadataLog2.get(0) === Some("batch0"))
+      assert(metadataLog2.get(1) === Some("batch1"))
+      assert(metadataLog2.getLatest() === Some(1 -> "batch1"))
+      assert(metadataLog2.get(None, 1) === Array(0 -> "batch0", 1 -> "batch1"))
+    }
+  }
+
+  test("metadata directory collision") {
+    withTempDir { temp =>
+      val waiter = new Waiter
+      val maxBatchId = 100
+      for (id <- 0 until 10) {
+        new Thread() {
+          override def run(): Unit = waiter {
+            val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+            try {
+              var nextBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
+              nextBatchId += 1
+              while (nextBatchId <= maxBatchId) {
+                metadataLog.add(nextBatchId, nextBatchId.toString)
+                nextBatchId += 1
+              }
+            } catch {
+              case e: ConcurrentModificationException =>
+              // This is expected since there are multiple writers
+            } finally {
+              waiter.dismiss()
+            }
+          }
+        }.start()
+      }
+
+      waiter.await(timeout(10.seconds), dismissals(10))
+      val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+      assert(metadataLog.getLatest() === Some(maxBatchId -> maxBatchId.toString))
+      assert(metadataLog.get(None, maxBatchId) === (0 to maxBatchId).map(i => (i, i.toString)))
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index e6889bcc783b696f054cba98beaa6130dd5fdd20..4c18e38db828071218a96f7fe2d5c7d9ce336270 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -17,13 +17,11 @@
 
 package org.apache.spark.sql.streaming
 
-import java.io.{ByteArrayInputStream, File, FileNotFoundException, InputStream}
-import java.nio.charset.StandardCharsets
+import java.io.File
 
 import org.apache.spark.sql.{AnalysisException, StreamTest}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.FileStreamSource._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.Utils
@@ -359,60 +357,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
     Utils.deleteRecursively(tmp)
   }
 
-  test("fault tolerance with corrupted metadata file") {
-    val src = Utils.createTempDir("streaming.src")
-    assert(new File(src, "_metadata").mkdirs())
-    stringToFile(
-      new File(src, "_metadata/0"),
-      s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n")
-    stringToFile(new File(src, "_metadata/1"), s"${FileStreamSource.VERSION}\nSTART\n-")
-
-    val textSource = createFileStreamSource("text", src.getCanonicalPath)
-    // the metadata file of batch is corrupted, so currentOffset should be 0
-    assert(textSource.currentOffset === LongOffset(0))
-
-    Utils.deleteRecursively(src)
-  }
-
-  test("fault tolerance with normal metadata file") {
-    val src = Utils.createTempDir("streaming.src")
-    assert(new File(src, "_metadata").mkdirs())
-    stringToFile(
-      new File(src, "_metadata/0"),
-      s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n")
-    stringToFile(
-      new File(src, "_metadata/1"),
-      s"${FileStreamSource.VERSION}\nSTART\n-/x/y/z\nEND\n")
-
-    val textSource = createFileStreamSource("text", src.getCanonicalPath)
-    assert(textSource.currentOffset === LongOffset(1))
-
-    Utils.deleteRecursively(src)
-  }
-
-  test("readBatch") {
-    def stringToStream(str: String): InputStream =
-      new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8))
-
-    // Invalid metadata
-    assert(readBatch(stringToStream("")) === Nil)
-    assert(readBatch(stringToStream(FileStreamSource.VERSION)) === Nil)
-    assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\n")) === Nil)
-    assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART")) === Nil)
-    assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-")) === Nil)
-    assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c")) === Nil)
-    assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n")) === Nil)
-    assert(readBatch(stringToStream(s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEN")) === Nil)
-
-    // Valid metadata
-    assert(readBatch(stringToStream(
-      s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEND")) === Seq("/a/b/c"))
-    assert(readBatch(stringToStream(
-      s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\nEND\n")) === Seq("/a/b/c"))
-    assert(readBatch(stringToStream(
-      s"${FileStreamSource.VERSION}\nSTART\n-/a/b/c\n-/e/f/g\nEND\n"))
-      === Seq("/a/b/c", "/e/f/g"))
-  }
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext {