diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index 22668fd6faaa92419e8c57e8675040c84be04235..10b35c74f473e37440840b8dea924acedccf6139 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -90,7 +90,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
     }
   }
 
-  test("read Spark 2.1.0 log format") {
+  test("read Spark 2.1.0 offset format") {
     val offset = readFromResource("kafka-source-offset-version-2.1.0.txt")
     assert(KafkaSourceOffset(offset) ===
       KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L)))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 8494aef004bb5b82fddfb92dc2fd697baebcdf95..20e0dcef8ffda0649be8c54399879d5c85e088f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -57,7 +57,7 @@ class FileStreamSource(
 
   private val metadataLog =
     new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath)
-  private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
+  private var metadataLogCurrentOffset = metadataLog.getLatest().map(_._1).getOrElse(-1L)
 
   /** Maximum number of new files to be considered in each batch */
   private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger
@@ -79,7 +79,7 @@ class FileStreamSource(
    * `synchronized` on this method is for solving race conditions in tests. In the normal usage,
    * there is no race here, so the cost of `synchronized` should be rare.
    */
-  private def fetchMaxOffset(): LongOffset = synchronized {
+  private def fetchMaxOffset(): FileStreamSourceOffset = synchronized {
     // All the new files found - ignore aged files and files that we have seen.
     val newFiles = fetchAllFiles().filter {
       case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
@@ -104,14 +104,14 @@ class FileStreamSource(
        """.stripMargin)
 
     if (batchFiles.nonEmpty) {
-      maxBatchId += 1
-      metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) =>
-        FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId)
+      metadataLogCurrentOffset += 1
+      metadataLog.add(metadataLogCurrentOffset, batchFiles.map { case (p, timestamp) =>
+        FileEntry(path = p, timestamp = timestamp, batchId = metadataLogCurrentOffset)
       }.toArray)
-      logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
+      logInfo(s"Log offset set to $metadataLogCurrentOffset with ${batchFiles.size} new files")
     }
 
-    new LongOffset(maxBatchId)
+    FileStreamSourceOffset(metadataLogCurrentOffset)
   }
 
   /**
@@ -122,21 +122,19 @@ class FileStreamSource(
     func
   }
 
-  /** Return the latest offset in the source */
-  def currentOffset: LongOffset = synchronized {
-    new LongOffset(maxBatchId)
-  }
+  /** Return the latest offset in the [[FileStreamSourceLog]] */
+  def currentLogOffset: Long = synchronized { metadataLogCurrentOffset }
 
   /**
    * Returns the data that is between the offsets (`start`, `end`].
    */
   override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
-    val startId = start.flatMap(LongOffset.convert(_)).getOrElse(LongOffset(-1L)).offset
-    val endId = LongOffset.convert(end).getOrElse(LongOffset(0)).offset
+    val startOffset = start.map(FileStreamSourceOffset(_).logOffset).getOrElse(-1L)
+    val endOffset = FileStreamSourceOffset(end).logOffset
 
-    assert(startId <= endId)
-    val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
-    logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
+    assert(startOffset <= endOffset)
+    val files = metadataLog.get(Some(startOffset + 1), Some(endOffset)).flatMap(_._2)
+    logInfo(s"Processing ${files.length} files from ${startOffset + 1}:$endOffset")
     logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
     val newDataSource =
       DataSource(
@@ -172,7 +170,7 @@ class FileStreamSource(
     files
   }
 
-  override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
+  override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.logOffset == -1)
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 327b3ac2677660b8429a071d3fb4421d9b14d303..81908c0cefdfaa7724a065b292e72e3604ac6d08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -78,7 +78,7 @@ class FileStreamSourceLog(
 
   override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = {
     val startBatchId = startId.getOrElse(0L)
-    val endBatchId = getLatest().map(_._1).getOrElse(0L)
+    val endBatchId = endId.orElse(getLatest().map(_._1)).getOrElse(0L)
 
     val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id =>
       if (isCompactionBatch(id, compactInterval) && fileEntryCache.containsKey(id)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
new file mode 100644
index 0000000000000000000000000000000000000000..06d0fe6c18c1e7c28932ff65c9db993cb0c0489b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.Exception._
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Offset for the [[FileStreamSource]].
+ * @param logOffset  Position in the [[FileStreamSourceLog]]
+ */
+case class FileStreamSourceOffset(logOffset: Long) extends Offset {
+  override def json: String = {
+    Serialization.write(this)(FileStreamSourceOffset.format)
+  }
+}
+
+object FileStreamSourceOffset {
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  def apply(offset: Offset): FileStreamSourceOffset = {
+    offset match {
+      case f: FileStreamSourceOffset => f
+      case SerializedOffset(str) =>
+        catching(classOf[NumberFormatException]).opt {
+          FileStreamSourceOffset(str.toLong)
+        }.getOrElse {
+          Serialization.read[FileStreamSourceOffset](str)
+        }
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Invalid conversion from offset of ${offset.getClass} to FileStreamSourceOffset")
+    }
+  }
+}
+
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-json.txt b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-json.txt
new file mode 100644
index 0000000000000000000000000000000000000000..e266a47368e1c33bbf36d506f41772a8ec328acd
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-json.txt
@@ -0,0 +1 @@
+{"logOffset":345}
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-long.txt
similarity index 100%
rename from sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt
rename to sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-long.txt
diff --git a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
index fe5c1d44a6e26e66d3ee50494b43b64367d507dc..988a98a7587d4e6881a70431378538601b12f606 100644
--- a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
+++ b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
@@ -1,4 +1,4 @@
 v1
 {"batchWatermarkMs":0,"batchTimestampMs":1480981499528}
-0
-{"topic-0":{"0":1}}
\ No newline at end of file
+{"logOffset":345}
+{"topic-0":{"0":1}}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
index 4a47c04d3f08423605b3e6e79592b4c8b19c94df..40d0643ba87719293c85b063c559e32271f32d73 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -97,7 +97,7 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
       val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), Nil,
         dir.getAbsolutePath, Map.empty)
       // this method should throw an exception if `fs.exists` is called during resolveRelation
-      newSource.getBatch(None, LongOffset(1))
+      newSource.getBatch(None, FileStreamSourceOffset(1))
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index d139efaaf824f8521b307b2b50b8a41be9613eb8..bb4274a162e8e2e98797dc2e654e8db077d614ff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -74,7 +74,7 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
     val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0")
     assert(batchId === 0)
     assert(offsetSeq.offsets === Seq(
-      Some(SerializedOffset("0")),
+      Some(SerializedOffset("""{"logOffset":345}""")),
       Some(SerializedOffset("""{"topic-0":{"0":1}}"""))
     ))
     assert(offsetSeq.metadata === Some(OffsetSeqMetadata(0L, 1480981499528L)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 267c462484a32a2e7d1a0d17f3c17f2258e57458..bcb68520407bc3ef80d6a322f772a2ed2927d9f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -61,7 +61,7 @@ abstract class FileStreamSourceTest
       val source = sources.head
       val newOffset = source.withBatchingLocked {
         addData(source)
-        source.currentOffset + 1
+        new FileStreamSourceOffset(source.currentLogOffset + 1)
       }
       logInfo(s"Added file to $source at offset $newOffset")
       (source, newOffset)
@@ -987,12 +987,17 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
             val _sources = PrivateMethod[Seq[Source]]('sources)
             val fileSource =
               (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
-            assert(fileSource.getBatch(None, LongOffset(2)).as[String].collect() ===
-              List("keep1", "keep2", "keep3"))
-            assert(fileSource.getBatch(Some(LongOffset(0)), LongOffset(2)).as[String].collect() ===
-              List("keep2", "keep3"))
-            assert(fileSource.getBatch(Some(LongOffset(1)), LongOffset(2)).as[String].collect() ===
-              List("keep3"))
+
+            def verify(startId: Option[Int], endId: Int, expected: String*): Unit = {
+              val start = startId.map(new FileStreamSourceOffset(_))
+              val end = FileStreamSourceOffset(endId)
+              assert(fileSource.getBatch(start, end).as[String].collect().toSeq === expected)
+            }
+
+            verify(startId = None, endId = 2, "keep1", "keep2", "keep3")
+            verify(startId = Some(0), endId = 1, "keep2")
+            verify(startId = Some(0), endId = 2, "keep2", "keep3")
+            verify(startId = Some(1), endId = 2, "keep3")
             true
           }
         )
@@ -1023,9 +1028,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     assert(options.maxFilesPerTrigger == Some(1))
   }
 
-  test("FileStreamSource offset - read Spark 2.1.0 log format") {
-    val offset = readOffsetFromResource("file-source-offset-version-2.1.0.txt")
-    assert(LongOffset.convert(offset) === Some(LongOffset(345)))
+  test("FileStreamSource offset - read Spark 2.1.0 offset json format") {
+    val offset = readOffsetFromResource("file-source-offset-version-2.1.0-json.txt")
+    assert(FileStreamSourceOffset(offset) === FileStreamSourceOffset(345))
+  }
+
+  test("FileStreamSource offset - read Spark 2.1.0 offset long format") {
+    val offset = readOffsetFromResource("file-source-offset-version-2.1.0-long.txt")
+    assert(FileStreamSourceOffset(offset) === FileStreamSourceOffset(345))
   }
 
   test("FileStreamSourceLog - read Spark 2.1.0 log format") {