From caea15214571d9b12dcf1553e5c1cc8b83a8ba5b Mon Sep 17 00:00:00 2001
From: Michael Armbrust <michael@databricks.com>
Date: Tue, 22 Mar 2016 10:18:42 -0700
Subject: [PATCH] [SPARK-13985][SQL] Deterministic batches with ids

This PR relaxes the requirements of a `Sink` for structured streaming to only require idempotent appending of data.  Previously the `Sink` needed to be able to transactionally append data while recording an opaque offset indicated how far in a stream we have processed.

In order to do this, a new write-ahead-log has been added to stream execution, which records the offsets that will are present in each batch.  The log is created in the newly added `checkpointLocation`, which defaults to `${spark.sql.streaming.checkpointLocation}/${queryName}` but can be overriden by setting `checkpointLocation` in `DataFrameWriter`.

In addition to making sinks easier to write the addition of batchIds and a checkpoint location is done in anticipation of integration with the the `StateStore` (#11645).

Author: Michael Armbrust <michael@databricks.com>

Closes #11804 from marmbrus/batchIds.
---
 .../spark/sql/ContinuousQueryManager.scala    |   8 +-
 .../apache/spark/sql/DataFrameWriter.scala    |  11 +-
 .../org/apache/spark/sql/SinkStatus.scala     |   2 +-
 .../execution/datasources/DataSource.scala    |   3 +-
 .../execution/streaming/CompositeOffset.scala |  12 ++
 .../streaming/FileStreamSource.scala          |  24 +--
 .../execution/streaming/HDFSMetadataLog.scala |   7 +-
 .../spark/sql/execution/streaming/Sink.scala  |  30 +--
 .../sql/execution/streaming/Source.scala      |  10 +-
 .../execution/streaming/StreamExecution.scala | 193 ++++++++++++------
 .../execution/streaming/StreamProgress.scala  |  52 ++---
 .../sql/execution/streaming/memory.scala      |  85 ++++----
 .../apache/spark/sql/internal/SQLConf.scala   |   7 +
 .../org/apache/spark/sql/StreamTest.scala     |  18 +-
 .../ContinuousQueryManagerSuite.scala         |   8 +-
 .../sql/streaming/ContinuousQuerySuite.scala  |  11 +-
 .../DataFrameReaderWriterSuite.scala          |  55 +++--
 .../sql/streaming/FileStreamSourceSuite.scala |  18 --
 .../util/ContinuousQueryListenerSuite.scala   |   6 +-
 19 files changed, 319 insertions(+), 241 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
index 0a156ea99a..fa8219bbed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
@@ -164,13 +164,17 @@ class ContinuousQueryManager(sqlContext: SQLContext) {
   }
 
   /** Start a query */
-  private[sql] def startQuery(name: String, df: DataFrame, sink: Sink): ContinuousQuery = {
+  private[sql] def startQuery(
+      name: String,
+      checkpointLocation: String,
+      df: DataFrame,
+      sink: Sink): ContinuousQuery = {
     activeQueriesLock.synchronized {
       if (activeQueries.contains(name)) {
         throw new IllegalArgumentException(
           s"Cannot start query with name $name as a query with that name is already active")
       }
-      val query = new StreamExecution(sqlContext, name, df.logicalPlan, sink)
+      val query = new StreamExecution(sqlContext, name, checkpointLocation, df.logicalPlan, sink)
       query.start()
       activeQueries.put(name, query)
       query
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 7ed1c51360..c07bd0e7b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -21,6 +21,8 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -251,8 +253,15 @@ final class DataFrameWriter private[sql](df: DataFrame) {
         options = extraOptions.toMap,
         partitionColumns = normalizedParCols.getOrElse(Nil))
 
+    val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
+    val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
+      new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString
+    })
     df.sqlContext.sessionState.continuousQueryManager.startQuery(
-      extraOptions.getOrElse("queryName", StreamExecution.nextName), df, dataSource.createSink())
+      queryName,
+      checkpointLocation,
+      df,
+      dataSource.createSink())
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala
index ce21451b2c..5a9852809c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala
@@ -31,4 +31,4 @@ import org.apache.spark.sql.execution.streaming.{Offset, Sink}
 @Experimental
 class SinkStatus private[sql](
     val description: String,
-    val offset: Option[Offset])
+    val offset: Offset)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index e2a14edc54..fac2a64726 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -162,7 +162,8 @@ case class DataSource(
                 paths = files,
                 userSpecifiedSchema = Some(dataSchema),
                 className = className,
-                options = options.filterKeys(_ != "path")).resolveRelation()))
+                options =
+                  new CaseInsensitiveMap(options.filterKeys(_ != "path"))).resolveRelation()))
         }
 
         new FileStreamSource(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
index 59a52a3d59..e48ac59892 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
@@ -52,6 +52,18 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
     case i if i == 0 => 0
     case i if i > 0 => 1
   }
+
+  /**
+   * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
+   * sources.
+   *
+   * This method is typically used to associate a serialized offset with actual sources (which
+   * cannot be serialized).
+   */
+  def toStreamProgress(sources: Seq[Source]): StreamProgress = {
+    assert(sources.size == offsets.size)
+    new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) }
+  }
 }
 
 object CompositeOffset {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 787e93f543..d13b1a6166 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -109,20 +109,16 @@ class FileStreamSource(
   /**
    * Returns the next batch of data that is available after `start`, if any is available.
    */
-  override def getNextBatch(start: Option[Offset]): Option[Batch] = {
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
     val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L)
-    val end = fetchMaxOffset()
-    val endId = end.offset
-
-    if (startId + 1 <= endId) {
-      val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
-      logDebug(s"Return files from batches ${startId + 1}:$endId")
-      logDebug(s"Streaming ${files.mkString(", ")}")
-      Some(new Batch(end, dataFrameBuilder(files)))
-    }
-    else {
-      None
-    }
+    val endId = end.asInstanceOf[LongOffset].offset
+
+    assert(startId <= endId)
+    val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
+    logDebug(s"Return files from batches ${startId + 1}:$endId")
+    logDebug(s"Streaming ${files.mkString(", ")}")
+    dataFrameBuilder(files)
+
   }
 
   private def fetchAllFiles(): Seq[String] = {
@@ -130,4 +126,6 @@ class FileStreamSource(
       .filterNot(_.getPath.getName.startsWith("_"))
       .map(_.getPath.toUri.toString)
   }
+
+  override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index ac2842b6d5..298b5d292e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.sql.SQLContext
@@ -42,7 +43,9 @@ import org.apache.spark.sql.SQLContext
  * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
  * files in a directory always shows the latest files.
  */
-class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends MetadataLog[T] {
+class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
+  extends MetadataLog[T]
+  with Logging {
 
   private val metadataPath = new Path(path)
 
@@ -113,6 +116,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends
         try {
           // Try to commit the batch
           // It will fail if there is an existing file (someone has committed the batch)
+          logDebug(s"Attempting to write log #${batchFile(batchId)}")
           fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE)
           return
         } catch {
@@ -161,6 +165,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends
       val bytes = IOUtils.toByteArray(input)
       Some(serializer.deserialize[T](ByteBuffer.wrap(bytes)))
     } else {
+      logDebug(s"Unable to find batch $batchMetadataFile")
       None
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
index e3b2d2f67e..25015d58f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
@@ -17,31 +17,19 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import org.apache.spark.sql.DataFrame
+
 /**
- * An interface for systems that can collect the results of a streaming query.
- *
- * When new data is produced by a query, a [[Sink]] must be able to transactionally collect the
- * data and update the [[Offset]]. In the case of a failure, the sink will be recreated
- * and must be able to return the [[Offset]] for all of the data that is made durable.
- * This contract allows Spark to process data with exactly-once semantics, even in the case
- * of failures that require the computation to be restarted.
+ * An interface for systems that can collect the results of a streaming query. In order to preserve
+ * exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same
+ * batch.
  */
 trait Sink {
-  /**
-   * Returns the [[Offset]] for all data that is currently present in the sink, if any. This
-   * function will be called by Spark when restarting execution in order to determine at which point
-   * in the input stream computation should be resumed from.
-   */
-  def currentOffset: Option[Offset]
 
   /**
-   * Accepts a new batch of data as well as a [[Offset]] that denotes how far in the input
-   * data computation has progressed to.  When computation restarts after a failure, it is important
-   * that a [[Sink]] returns the same [[Offset]] as the most recent batch of data that
-   * has been persisted durably.  Note that this does not necessarily have to be the
-   * [[Offset]] for the most recent batch of data that was given to the sink.  For example,
-   * it is valid to buffer data before persisting, as long as the [[Offset]] is stored
-   * transactionally as data is eventually persisted.
+   * Adds a batch of data to this sink.  The data for a given `batchId` is deterministic and if
+   * this method is called more than once with the same batchId (which will happen in the case of
+   * failures), then `data` should only be added once.
    */
-  def addBatch(batch: Batch): Unit
+  def addBatch(batchId: Long, data: DataFrame): Unit
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index 25922979ac..6457f928ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -29,8 +30,13 @@ trait Source  {
   /** Returns the schema of the data from this source */
   def schema: StructType
 
+  /** Returns the maximum available offset for this source. */
+  def getOffset: Option[Offset]
+
   /**
-   * Returns the next batch of data that is available after `start`, if any is available.
+   * Returns the data that is is between the offsets (`start`, `end`].  When `start` is `None` then
+   * the batch should begin with the first available record.  This method must always return the
+   * same data for a particular `start` and `end` pair.
    */
-  def getNextBatch(start: Option[Offset]): Option[Batch]
+  def getBatch(start: Option[Offset], end: Offset): DataFrame
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 0062b7fc75..c5fefb5346 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
@@ -41,6 +43,7 @@ import org.apache.spark.sql.util.ContinuousQueryListener._
 class StreamExecution(
     val sqlContext: SQLContext,
     override val name: String,
+    val checkpointRoot: String,
     private[sql] val logicalPlan: LogicalPlan,
     val sink: Sink) extends ContinuousQuery with Logging {
 
@@ -52,13 +55,28 @@ class StreamExecution(
   /** Minimum amount of time in between the start of each batch. */
   private val minBatchTime = 10
 
-  /** Tracks how much data we have processed from each input source. */
-  private[sql] val streamProgress = new StreamProgress
+  /**
+   * Tracks how much data we have processed and committed to the sink or state store from each
+   * input source.
+   */
+  private[sql] var committedOffsets = new StreamProgress
+
+  /**
+   * Tracks the offsets that are available to be processed, but have not yet be committed to the
+   * sink.
+   */
+  private var availableOffsets = new StreamProgress
+
+  /** The current batchId or -1 if execution has not yet been initialized. */
+  private var currentBatchId: Long = -1
 
   /** All stream sources present the query plan. */
   private val sources =
     logicalPlan.collect { case s: StreamingRelation => s.source }
 
+  /** A list of unique sources in the query plan. */
+  private val uniqueSources = sources.distinct
+
   /** Defines the internal state of execution */
   @volatile
   private var state: State = INITIALIZED
@@ -74,20 +92,34 @@ class StreamExecution(
     override def run(): Unit = { runBatches() }
   }
 
+  /**
+   * A write-ahead-log that records the offsets that are present in each batch. In order to ensure
+   * that a given batch will always consist of the same data, we write to this log *before* any
+   * processing is done.  Thus, the Nth record in this log indicated data that is currently being
+   * processed and the N-1th entry indicates which offsets have been durably committed to the sink.
+   */
+  private val offsetLog =
+    new HDFSMetadataLog[CompositeOffset](sqlContext, checkpointFile("offsets"))
+
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
 
   /** Returns current status of all the sources. */
   override def sourceStatuses: Array[SourceStatus] = {
-    sources.map(s => new SourceStatus(s.toString, streamProgress.get(s))).toArray
+    sources.map(s => new SourceStatus(s.toString, availableOffsets.get(s))).toArray
   }
 
   /** Returns current status of the sink. */
-  override def sinkStatus: SinkStatus = new SinkStatus(sink.toString, sink.currentOffset)
+  override def sinkStatus: SinkStatus =
+    new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources))
 
   /** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */
   override def exception: Option[ContinuousQueryException] = Option(streamDeathCause)
 
+  /** Returns the path of a file with `name` in the checkpoint directory. */
+  private def checkpointFile(name: String): String =
+    new Path(new Path(checkpointRoot), name).toUri.toString
+
   /**
    * Starts the execution. This returns only after the thread has started and [[QueryStarted]] event
    * has been posted to all the listeners.
@@ -102,7 +134,7 @@ class StreamExecution(
    * Repeatedly attempts to run batches as data arrives.
    *
    * Note that this method ensures that [[QueryStarted]] and [[QueryTerminated]] events are posted
-   * so that listeners are guaranteed to get former event before the latter. Furthermore, this
+   * such that listeners are guaranteed to get a start event before a termination. Furthermore, this
    * method also ensures that [[QueryStarted]] event is posted before the `start()` method returns.
    */
   private def runBatches(): Unit = {
@@ -118,9 +150,10 @@ class StreamExecution(
       // While active, repeatedly attempt to run batches.
       SQLContext.setActive(sqlContext)
       populateStartOffsets()
-      logInfo(s"Stream running at $streamProgress")
+      logDebug(s"Stream running from $committedOffsets to $availableOffsets")
       while (isActive) {
-        attemptBatch()
+        if (dataAvailable) runBatch()
+        commitAndConstructNextBatch()
         Thread.sleep(minBatchTime) // TODO: Could be tighter
       }
     } catch {
@@ -130,7 +163,7 @@ class StreamExecution(
           this,
           s"Query $name terminated with exception: ${e.getMessage}",
           e,
-          Some(streamProgress.toCompositeOffset(sources)))
+          Some(committedOffsets.toCompositeOffset(sources)))
         logError(s"Query $name terminated with error", e)
     } finally {
       state = TERMINATED
@@ -142,48 +175,99 @@ class StreamExecution(
 
   /**
    * Populate the start offsets to start the execution at the current offsets stored in the sink
-   * (i.e. avoid reprocessing data that we have already processed).
+   * (i.e. avoid reprocessing data that we have already processed). This function must be called
+   * before any processing occurs and will populate the following fields:
+   *  - currentBatchId
+   *  - committedOffsets
+   *  - availableOffsets
    */
   private def populateStartOffsets(): Unit = {
-    sink.currentOffset match {
-      case Some(c: CompositeOffset) =>
-        val storedProgress = c.offsets
-        val sources = logicalPlan collect {
-          case StreamingRelation(source, _) => source
+    offsetLog.getLatest() match {
+      case Some((batchId, nextOffsets)) =>
+        logInfo(s"Resuming continuous query, starting with batch $batchId")
+        currentBatchId = batchId + 1
+        availableOffsets = nextOffsets.toStreamProgress(sources)
+        logDebug(s"Found possibly uncommitted offsets $availableOffsets")
+
+        offsetLog.get(batchId - 1).foreach {
+          case lastOffsets =>
+            committedOffsets = lastOffsets.toStreamProgress(sources)
+            logDebug(s"Resuming with committed offsets: $committedOffsets")
         }
 
-        assert(sources.size == storedProgress.size)
-        sources.zip(storedProgress).foreach { case (source, offset) =>
-          offset.foreach(streamProgress.update(source, _))
-        }
       case None => // We are starting this stream for the first time.
-      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
+        logInfo(s"Starting new continuous query.")
+        currentBatchId = 0
+        commitAndConstructNextBatch()
     }
   }
 
   /**
-   * Checks to see if any new data is present in any of the sources. When new data is available,
-   * a batch is executed and passed to the sink, updating the currentOffsets.
+   * Returns true if there is any new data available to be processed.
    */
-  private def attemptBatch(): Unit = {
+  private def dataAvailable: Boolean = {
+    availableOffsets.exists {
+      case (source, available) =>
+        committedOffsets
+            .get(source)
+            .map(committed => committed < available)
+            .getOrElse(true)
+    }
+  }
+
+  /**
+   * Queries all of the sources to see if any new data is available. When there is new data the
+   * batchId counter is incremented and a new log entry is written with the newest offsets.
+   *
+   * Note that committing the offsets for a new batch implicitly marks the previous batch as
+   * finished and thus this method should only be called when all currently available data
+   * has been written to the sink.
+   */
+  private def commitAndConstructNextBatch(): Boolean = {
+    // Update committed offsets.
+    committedOffsets ++= availableOffsets
+
+    // Check to see what new data is available.
+    val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
+    availableOffsets ++= newData
+
+    if (dataAvailable) {
+      assert(
+        offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
+        s"Concurrent update to the log.  Multiple streaming jobs detected for $currentBatchId")
+      currentBatchId += 1
+      logInfo(s"Committed offsets for batch $currentBatchId.")
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Processes any data available between `availableOffsets` and `committedOffsets`.
+   */
+  private def runBatch(): Unit = {
     val startTime = System.nanoTime()
 
-    // A list of offsets that need to be updated if this batch is successful.
-    // Populated while walking the tree.
-    val newOffsets = new ArrayBuffer[(Source, Offset)]
+    // Request unprocessed data from all sources.
+    val newData = availableOffsets.flatMap {
+      case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) =>
+        val current = committedOffsets.get(source)
+        val batch = source.getBatch(current, available)
+        logDebug(s"Retrieving data from $source: $current -> $available")
+        Some(source -> batch)
+      case _ => None
+    }.toMap
+
     // A list of attributes that will need to be updated.
     var replacements = new ArrayBuffer[(Attribute, Attribute)]
     // Replace sources in the logical plan with data that has arrived since the last batch.
     val withNewSources = logicalPlan transform {
       case StreamingRelation(source, output) =>
-        val prevOffset = streamProgress.get(source)
-        val newBatch = source.getNextBatch(prevOffset)
-
-        newBatch.map { batch =>
-          newOffsets += ((source, batch.end))
-          val newPlan = batch.data.logicalPlan
-
-          assert(output.size == newPlan.output.size)
+        newData.get(source).map { data =>
+          val newPlan = data.logicalPlan
+          assert(output.size == newPlan.output.size,
+            s"Invalid batch: ${output.mkString(",")} != ${newPlan.output.mkString(",")}")
           replacements ++= output.zip(newPlan.output)
           newPlan
         }.getOrElse {
@@ -197,35 +281,24 @@ class StreamExecution(
       case a: Attribute if replacementMap.contains(a) => replacementMap(a)
     }
 
-    if (newOffsets.nonEmpty) {
-      val optimizerStart = System.nanoTime()
-
-      lastExecution = new QueryExecution(sqlContext, newPlan)
-      val executedPlan = lastExecution.executedPlan
-      val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
-      logDebug(s"Optimized batch in ${optimizerTime}ms")
+    val optimizerStart = System.nanoTime()
 
-      streamProgress.synchronized {
-        // Update the offsets and calculate a new composite offset
-        newOffsets.foreach(streamProgress.update)
+    lastExecution = new QueryExecution(sqlContext, newPlan)
+    val executedPlan = lastExecution.executedPlan
+    val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
+    logDebug(s"Optimized batch in ${optimizerTime}ms")
 
-        // Construct the batch and send it to the sink.
-        val batchOffset = streamProgress.toCompositeOffset(sources)
-        val nextBatch = new Batch(batchOffset, Dataset.newDataFrame(sqlContext, newPlan))
-        sink.addBatch(nextBatch)
-      }
-
-      awaitBatchLock.synchronized {
-        // Wake up any threads that are waiting for the stream to progress.
-        awaitBatchLock.notifyAll()
-      }
+    val nextBatch = Dataset.newDataFrame(sqlContext, newPlan)
+    sink.addBatch(currentBatchId - 1, nextBatch)
 
-      val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
-      logInfo(s"Completed up to $newOffsets in ${batchTime}ms")
-      postEvent(new QueryProgress(this))
+    awaitBatchLock.synchronized {
+      // Wake up any threads that are waiting for the stream to progress.
+      awaitBatchLock.notifyAll()
     }
 
-    logDebug(s"Waiting for data, current: $streamProgress")
+    val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
+    logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
+    postEvent(new QueryProgress(this))
   }
 
   private def postEvent(event: ContinuousQueryListener.Event) {
@@ -252,9 +325,7 @@ class StreamExecution(
    * least the given `Offset`. This method is indented for use primarily when writing tests.
    */
   def awaitOffset(source: Source, newOffset: Offset): Unit = {
-    def notDone = streamProgress.synchronized {
-      !streamProgress.contains(source) || streamProgress(source) < newOffset
-    }
+    def notDone = !committedOffsets.contains(source) || committedOffsets(source) < newOffset
 
     while (notDone) {
       logInfo(s"Waiting until $newOffset at $source")
@@ -297,7 +368,7 @@ class StreamExecution(
     s"""
        |=== Continuous Query ===
        |Name: $name
-       |Current Offsets: $streamProgress
+       |Current Offsets: $committedOffsets
        |
        |Current State: $state
        |Thread State: ${microBatchThread.getState}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
index d45b9bd983..405a5f0387 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
@@ -17,55 +17,31 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import scala.collection.mutable
+import scala.collection.{immutable, GenTraversableOnce}
 
 /**
  * A helper class that looks like a Map[Source, Offset].
  */
-class StreamProgress {
-  private val currentOffsets = new mutable.HashMap[Source, Offset]
+class StreamProgress(
+    val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset])
+  extends scala.collection.immutable.Map[Source, Offset] {
 
-  private[streaming] def update(source: Source, newOffset: Offset): Unit = {
-    currentOffsets.get(source).foreach(old =>
-      assert(newOffset > old, s"Stream going backwards $newOffset -> $old"))
-    currentOffsets.put(source, newOffset)
+  private[sql] def toCompositeOffset(source: Seq[Source]): CompositeOffset = {
+    CompositeOffset(source.map(get))
   }
 
-  private[streaming] def update(newOffset: (Source, Offset)): Unit =
-    update(newOffset._1, newOffset._2)
+  override def toString: String =
+    baseMap.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")
 
-  private[streaming] def apply(source: Source): Offset = currentOffsets(source)
-  private[streaming] def get(source: Source): Option[Offset] = currentOffsets.get(source)
-  private[streaming] def contains(source: Source): Boolean = currentOffsets.contains(source)
+  override def +[B1 >: Offset](kv: (Source, B1)): Map[Source, B1] = baseMap + kv
 
-  private[streaming] def ++(updates: Map[Source, Offset]): StreamProgress = {
-    val updated = new StreamProgress
-    currentOffsets.foreach(updated.update)
-    updates.foreach(updated.update)
-    updated
-  }
+  override def get(key: Source): Option[Offset] = baseMap.get(key)
 
-  /**
-   * Used to create a new copy of this [[StreamProgress]]. While this class is currently mutable,
-   * it should be copied before being passed to user code.
-   */
-  private[streaming] def copy(): StreamProgress = {
-    val copied = new StreamProgress
-    currentOffsets.foreach(copied.update)
-    copied
-  }
+  override def iterator: Iterator[(Source, Offset)] = baseMap.iterator
 
-  private[sql] def toCompositeOffset(source: Seq[Source]): CompositeOffset = {
-    CompositeOffset(source.map(get))
-  }
+  override def -(key: Source): Map[Source, Offset] = baseMap - key
 
-  override def toString: String =
-    currentOffsets.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")
-
-  override def equals(other: Any): Boolean = other match {
-    case s: StreamProgress => currentOffsets == s.currentOffsets
-    case _ => false
+  def ++(updates: GenTraversableOnce[(Source, Offset)]): StreamProgress = {
+    new StreamProgress(baseMap ++ updates)
   }
-
-  override def hashCode: Int = currentOffsets.hashCode()
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index a6504cd088..8c2bb4abd5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -51,8 +51,6 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
 
   protected var currentOffset: LongOffset = new LongOffset(-1)
 
-  protected def blockManager = SparkEnv.get.blockManager
-
   def schema: StructType = encoder.schema
 
   def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
@@ -78,25 +76,32 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
     }
   }
 
-  override def getNextBatch(start: Option[Offset]): Option[Batch] = synchronized {
-    val newBlocks =
-      batches.drop(
-        start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1)
-
-    if (newBlocks.nonEmpty) {
-      logDebug(s"Running [$start, $currentOffset] on blocks ${newBlocks.mkString(", ")}")
-      val df = newBlocks
-          .map(_.toDF())
-          .reduceOption(_ unionAll _)
-          .getOrElse(sqlContext.emptyDataFrame)
+  override def toString: String = s"MemoryStream[${output.mkString(",")}]"
 
-      Some(new Batch(currentOffset, df))
-    } else {
-      None
-    }
+  override def getOffset: Option[Offset] = if (batches.isEmpty) {
+    None
+  } else {
+    Some(currentOffset)
   }
 
-  override def toString: String = s"MemoryStream[${output.mkString(",")}]"
+  /**
+   * Returns the next batch of data that is available after `start`, if any is available.
+   */
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+    val startOrdinal =
+      start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
+    val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
+    val newBlocks = batches.slice(startOrdinal, endOrdinal)
+
+    logDebug(
+      s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}")
+    newBlocks
+      .map(_.toDF())
+      .reduceOption(_ unionAll _)
+      .getOrElse {
+        sys.error("No data selected!")
+      }
+  }
 }
 
 /**
@@ -105,45 +110,29 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
  */
 class MemorySink(schema: StructType) extends Sink with Logging {
   /** An order list of batches that have been written to this [[Sink]]. */
-  private var batches = new ArrayBuffer[Batch]()
-
-  /** Used to convert an [[InternalRow]] to an external [[Row]] for comparison in testing. */
-  private val externalRowConverter = RowEncoder(schema)
-
-  override def currentOffset: Option[Offset] = synchronized {
-    batches.lastOption.map(_.end)
-  }
-
-  override def addBatch(nextBatch: Batch): Unit = synchronized {
-    nextBatch.data.collect()  // 'compute' the batch's data and record the batch
-    batches.append(nextBatch)
-  }
+  private val batches = new ArrayBuffer[Array[Row]]()
 
   /** Returns all rows that are stored in this [[Sink]]. */
   def allData: Seq[Row] = synchronized {
-    batches
-        .map(_.data)
-        .reduceOption(_ unionAll _)
-        .map(_.collect().toSeq)
-        .getOrElse(Seq.empty)
-  }
-
-  /**
-   * Atomically drops the most recent `num` batches and resets the [[StreamProgress]] to the
-   * corresponding point in the input. This function can be used when testing to simulate data
-   * that has been lost due to buffering.
-   */
-  def dropBatches(num: Int): Unit = synchronized {
-    batches.dropRight(num)
+    batches.flatten
   }
 
   def toDebugString: String = synchronized {
-    batches.map { b =>
-      val dataStr = try b.data.collect().mkString(" ") catch {
+    batches.zipWithIndex.map { case (b, i) =>
+      val dataStr = try b.mkString(" ") catch {
         case NonFatal(e) => "[Error converting to string]"
       }
-      s"${b.end}: $dataStr"
+      s"$i: $dataStr"
     }.mkString("\n")
   }
+
+  override def addBatch(batchId: Long, data: DataFrame): Unit = {
+    if (batchId == batches.size) {
+      logDebug(s"Committing batch $batchId")
+      batches.append(data.collect())
+    } else {
+      logDebug(s"Skipping already committed batch: $batchId")
+    }
+  }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 70d1a8b071..fd1d77f514 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -524,6 +524,11 @@ object SQLConf {
     doc = "When true, the planner will try to find out duplicated exchanges and re-use them.",
     isPublic = false)
 
+  val CHECKPOINT_LOCATION = stringConf("spark.sql.streaming.checkpointLocation",
+    defaultValue = None,
+    doc = "The default location for storing checkpoint data for continuously executing queries.",
+    isPublic = true)
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
     val EXTERNAL_SORT = "spark.sql.planner.externalSort"
@@ -554,6 +559,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin
 
   /** ************************ Spark SQL Params/Hints ******************* */
 
+  def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)
+
   def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
 
   def useCompression: Boolean = getConf(COMPRESS_CACHED)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
index 81078dc6a0..f356cde9cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.util.Utils
 
 /**
  * A framework for implementing tests for streaming queries and sources.
@@ -64,6 +65,12 @@ import org.apache.spark.sql.execution.streaming._
  */
 trait StreamTest extends QueryTest with Timeouts {
 
+  implicit class RichContinuousQuery(cq: ContinuousQuery) {
+    def stopQuietly(): Unit = quietly {
+      cq.stop()
+    }
+  }
+
   implicit class RichSource(s: Source) {
     def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s))
 
@@ -126,8 +133,6 @@ trait StreamTest extends QueryTest with Timeouts {
     override def toString: String = s"CheckAnswer: ${expectedAnswer.mkString(",")}"
   }
 
-  case class DropBatches(num: Int) extends StreamAction
-
   /** Stops the stream.  It must currently be running. */
   case object StopStream extends StreamAction with StreamMustBeRunning
 
@@ -202,7 +207,7 @@ trait StreamTest extends QueryTest with Timeouts {
     }.mkString("\n")
 
     def currentOffsets =
-      if (currentStream != null) currentStream.streamProgress.toString else "not started"
+      if (currentStream != null) currentStream.committedOffsets.toString else "not started"
 
     def threadState =
       if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
@@ -266,6 +271,7 @@ trait StreamTest extends QueryTest with Timeouts {
     }
 
     val testThread = Thread.currentThread()
+    val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
 
     try {
       startedTest.foreach { action =>
@@ -276,7 +282,7 @@ trait StreamTest extends QueryTest with Timeouts {
             currentStream =
               sqlContext
                 .streams
-                .startQuery(StreamExecution.nextName, stream, sink)
+                .startQuery(StreamExecution.nextName, metadataRoot, stream, sink)
                 .asInstanceOf[StreamExecution]
             currentStream.microBatchThread.setUncaughtExceptionHandler(
               new UncaughtExceptionHandler {
@@ -308,10 +314,6 @@ trait StreamTest extends QueryTest with Timeouts {
               currentStream = null
             }
 
-          case DropBatches(num) =>
-            verify(currentStream == null, "dropping batches while running leads to corruption")
-            sink.dropBatches(num)
-
           case ef: ExpectFailure[_] =>
             verify(currentStream != null, "can not expect failure when stream is not running")
             try failAfter(streamingTimeout) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index 45e824ad63..54ce98d195 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest}
 import org.apache.spark.sql.execution.streaming.{MemorySink, MemoryStream, StreamExecution, StreamingRelation}
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
 
 class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
 
@@ -235,9 +236,14 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
           @volatile var query: StreamExecution = null
           try {
             val df = ds.toDF
+            val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
             query = sqlContext
               .streams
-              .startQuery(StreamExecution.nextName, df, new MemorySink(df.schema))
+              .startQuery(
+                StreamExecution.nextName,
+                metadataRoot,
+                df,
+                new MemorySink(df.schema))
               .asInstanceOf[StreamExecution]
           } catch {
             case NonFatal(e) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
index 84ed017a9d..3be0ea481d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
@@ -54,7 +54,8 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext {
       TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
       TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
       AssertOnQuery(
-        q => q.exception.get.startOffset.get === q.streamProgress.toCompositeOffset(Seq(inputData)),
+        q =>
+          q.exception.get.startOffset.get === q.committedOffsets.toCompositeOffset(Seq(inputData)),
         "incorrect start offset on exception")
     )
   }
@@ -68,19 +69,19 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext {
       AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
       AssertOnQuery(_.sourceStatuses(0).offset === None),
       AssertOnQuery(_.sinkStatus.description.contains("Memory")),
-      AssertOnQuery(_.sinkStatus.offset === None),
+      AssertOnQuery(_.sinkStatus.offset === new CompositeOffset(None :: Nil)),
       AddData(inputData, 1, 2),
       CheckAnswer(6, 3),
       AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(0))),
-      AssertOnQuery(_.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(0)))),
+      AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))),
       AddData(inputData, 1, 2),
       CheckAnswer(6, 3, 6, 3),
       AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(1))),
-      AssertOnQuery(_.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(1)))),
+      AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1))),
       AddData(inputData, 0),
       ExpectFailure[SparkException],
       AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(2))),
-      AssertOnQuery(_.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(1))))
+      AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1)))
     )
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
index 0878277811..e485aa837b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.streaming.test
 
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.sql.{AnalysisException, ContinuousQuery, SQLContext, StreamTest}
-import org.apache.spark.sql.execution.streaming.{Batch, Offset, Sink, Source}
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.util.Utils
 
 object LastOptions {
   var parameters: Map[String, String] = null
@@ -41,8 +42,15 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
     LastOptions.parameters = parameters
     LastOptions.schema = schema
     new Source {
-      override def getNextBatch(start: Option[Offset]): Option[Batch] = None
       override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)
+
+      override def getOffset: Option[Offset] = Some(new LongOffset(0))
+
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+        import sqlContext.implicits._
+
+        Seq[Int]().toDS().toDF()
+      }
     }
   }
 
@@ -53,8 +61,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
     LastOptions.parameters = parameters
     LastOptions.partitionColumns = partitionColumns
     new Sink {
-      override def addBatch(batch: Batch): Unit = {}
-      override def currentOffset: Option[Offset] = None
+      override def addBatch(batchId: Long, data: DataFrame): Unit = {}
     }
   }
 }
@@ -62,8 +69,10 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
 class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
   import testImplicits._
 
+  private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath
+
   after {
-    sqlContext.streams.active.foreach(_.stop())
+    sqlContext.streams.active.foreach(_.stopQuietly())
   }
 
   test("resolve default source") {
@@ -72,8 +81,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
       .stream()
       .write
       .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
       .startStream()
-      .stop()
+      .stopQuietly()
   }
 
   test("resolve full class") {
@@ -82,8 +92,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
       .stream()
       .write
       .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
       .startStream()
-      .stop()
+      .stopQuietly()
   }
 
   test("options") {
@@ -108,8 +119,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
       .option("opt1", "1")
       .options(Map("opt2" -> "2"))
       .options(map)
+      .option("checkpointLocation", newMetadataDir)
       .startStream()
-      .stop()
+      .stopQuietly()
 
     assert(LastOptions.parameters("opt1") == "1")
     assert(LastOptions.parameters("opt2") == "2")
@@ -123,38 +135,43 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
 
     df.write
       .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
       .startStream()
-      .stop()
+      .stopQuietly()
     assert(LastOptions.partitionColumns == Nil)
 
     df.write
       .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
       .partitionBy("a")
       .startStream()
-      .stop()
+      .stopQuietly()
     assert(LastOptions.partitionColumns == Seq("a"))
 
     withSQLConf("spark.sql.caseSensitive" -> "false") {
       df.write
         .format("org.apache.spark.sql.streaming.test")
+        .option("checkpointLocation", newMetadataDir)
         .partitionBy("A")
         .startStream()
-        .stop()
+        .stopQuietly()
       assert(LastOptions.partitionColumns == Seq("a"))
     }
 
     intercept[AnalysisException] {
       df.write
         .format("org.apache.spark.sql.streaming.test")
+        .option("checkpointLocation", newMetadataDir)
         .partitionBy("b")
         .startStream()
-        .stop()
+        .stopQuietly()
     }
   }
 
   test("stream paths") {
     val df = sqlContext.read
       .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
       .stream("/test")
 
     assert(LastOptions.parameters("path") == "/test")
@@ -163,8 +180,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
 
     df.write
       .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
       .startStream("/test")
-      .stop()
+      .stopQuietly()
 
     assert(LastOptions.parameters("path") == "/test")
   }
@@ -187,8 +205,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
       .option("intOpt", 56)
       .option("boolOpt", false)
       .option("doubleOpt", 6.7)
+      .option("checkpointLocation", newMetadataDir)
       .startStream("/test")
-      .stop()
+      .stopQuietly()
 
     assert(LastOptions.parameters("intOpt") == "56")
     assert(LastOptions.parameters("boolOpt") == "false")
@@ -204,6 +223,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
         .stream("/test")
         .write
         .format("org.apache.spark.sql.streaming.test")
+        .option("checkpointLocation", newMetadataDir)
         .queryName(name)
         .startStream()
     }
@@ -215,6 +235,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
         .stream("/test")
         .write
         .format("org.apache.spark.sql.streaming.test")
+        .option("checkpointLocation", newMetadataDir)
         .startStream()
     }
 
@@ -248,9 +269,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
     }
 
     // Should be able to start query with that name after stopping the previous query
-    q1.stop()
+    q1.stopQuietly()
     val q5 = startQueryWithName("name")
     assert(activeStreamNames.contains("name"))
-    sqlContext.streams.active.foreach(_.stop())
+    sqlContext.streams.active.foreach(_.stopQuietly())
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 4c18e38db8..89de15acf5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -318,16 +318,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
   }
 
   test("fault tolerance") {
-    def assertBatch(batch1: Option[Batch], batch2: Option[Batch]): Unit = {
-      (batch1, batch2) match {
-        case (Some(b1), Some(b2)) =>
-          assert(b1.end === b2.end)
-          assert(b1.data.as[String].collect() === b2.data.as[String].collect())
-        case (None, None) =>
-        case _ => fail(s"batch ($batch1) is not equal to batch ($batch2)")
-      }
-    }
-
     val src = Utils.createTempDir("streaming.src")
     val tmp = Utils.createTempDir("streaming.tmp")
 
@@ -345,14 +335,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
       CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
     )
 
-    val textSource2 = createFileStreamSource("text", src.getCanonicalPath)
-    assert(textSource2.currentOffset === textSource.currentOffset)
-    assertBatch(textSource2.getNextBatch(None), textSource.getNextBatch(None))
-    for (f <- 0L to textSource.currentOffset.offset) {
-      val offset = LongOffset(f)
-      assertBatch(textSource2.getNextBatch(Some(offset)), textSource.getNextBatch(Some(offset)))
-    }
-
     Utils.deleteRecursively(src)
     Utils.deleteRecursively(tmp)
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
index 52783281ab..d04783ecac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
@@ -61,7 +61,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
           // The source and sink offsets must be None as this must be called before the
           // batches have started
           assert(status.sourceStatuses(0).offset === None)
-          assert(status.sinkStatus.offset === None)
+          assert(status.sinkStatus.offset === CompositeOffset(None :: Nil))
 
           // No progress events or termination events
           assert(listener.progressStatuses.isEmpty)
@@ -78,7 +78,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
             assert(status != null)
             assert(status.active == true)
             assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
-            assert(status.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(0))))
+            assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
 
             // No termination events
             assert(listener.terminationStatus === null)
@@ -92,7 +92,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
 
             assert(status.active === false) // must be inactive by the time onQueryTerm is called
             assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
-            assert(status.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(0))))
+            assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
           }
           listener.checkAsyncErrors()
         }
-- 
GitLab