diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
index d7f71bd4b08954dd8e11af7dec7dcdba636c9f27..1343e81569cbde7299b95442350f12e39e3df6f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
@@ -178,10 +178,13 @@ class ContinuousQueryManager(sqlContext: SQLContext) {
         throw new IllegalArgumentException(
           s"Cannot start query with name $name as a query with that name is already active")
       }
+      var nextSourceId = 0L
       val logicalPlan = df.logicalPlan.transform {
         case StreamingRelation(dataSource, _, output) =>
           // Materialize source to avoid creating it in every batch
-          val source = dataSource.createSource()
+          val metadataPath = s"$checkpointLocation/sources/$nextSourceId"
+          val source = dataSource.createSource(metadataPath)
+          nextSourceId += 1
           // We still need to use the previous `output` instead of `source.schema` as attributes in
           // "df.logicalPlan" has already used attributes of the previous `output`.
           StreamingExecutionRelation(source, output)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index f55cedb1b6d77e00638651693b751126463d9ca1..10fde152ab2a9f1ec578dc6dd5005e00ebb12e9c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -123,36 +123,58 @@ case class DataSource(
     }
   }
 
-  /** Returns a source that can be used to continually read data. */
-  def createSource(): Source = {
+  private def inferFileFormatSchema(format: FileFormat): StructType = {
+    val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+    val allPaths = caseInsensitiveOptions.get("path")
+    val globbedPaths = allPaths.toSeq.flatMap { path =>
+      val hdfsPath = new Path(path)
+      val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+      val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+      SparkHadoopUtil.get.globPathIfNecessary(qualified)
+    }.toArray
+
+    val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None)
+    userSpecifiedSchema.orElse {
+      format.inferSchema(
+        sqlContext,
+        caseInsensitiveOptions,
+        fileCatalog.allFiles())
+    }.getOrElse {
+      throw new AnalysisException("Unable to infer schema.  It must be specified manually.")
+    }
+  }
+
+  /** Returns the name and schema of the source that can be used to continually read data. */
+  def sourceSchema(): (String, StructType) = {
     providingClass.newInstance() match {
       case s: StreamSourceProvider =>
-        s.createSource(sqlContext, userSpecifiedSchema, className, options)
+        s.sourceSchema(sqlContext, userSpecifiedSchema, className, options)
 
       case format: FileFormat =>
         val caseInsensitiveOptions = new CaseInsensitiveMap(options)
         val path = caseInsensitiveOptions.getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
         })
-        val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata")
+        (s"FileSource[$path]", inferFileFormatSchema(format))
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Data source $className does not support streamed reading")
+    }
+  }
 
-        val allPaths = caseInsensitiveOptions.get("path")
-        val globbedPaths = allPaths.toSeq.flatMap { path =>
-          val hdfsPath = new Path(path)
-          val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-          val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-          SparkHadoopUtil.get.globPathIfNecessary(qualified)
-        }.toArray
+  /** Returns a source that can be used to continually read data. */
+  def createSource(metadataPath: String): Source = {
+    providingClass.newInstance() match {
+      case s: StreamSourceProvider =>
+        s.createSource(sqlContext, metadataPath, userSpecifiedSchema, className, options)
 
-        val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None)
-        val dataSchema = userSpecifiedSchema.orElse {
-          format.inferSchema(
-            sqlContext,
-            caseInsensitiveOptions,
-            fileCatalog.allFiles())
-        }.getOrElse {
-          throw new AnalysisException("Unable to infer schema.  It must be specified manually.")
-        }
+      case format: FileFormat =>
+        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+        val path = caseInsensitiveOptions.getOrElse("path", {
+          throw new IllegalArgumentException("'path' is not specified")
+        })
+
+        val dataSchema = inferFileFormatSchema(format)
 
         def dataFrameBuilder(files: Array[String]): DataFrame = {
           Dataset.ofRows(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index f951dea735d9c8253a051b3ec231c5cfd0f35999..d2872e49ce28ac7f0933c80dcaaa135856bc3879 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.execution.datasources.DataSource
 
 object StreamingRelation {
   def apply(dataSource: DataSource): StreamingRelation = {
-    val source = dataSource.createSource()
-    StreamingRelation(dataSource, source.toString, source.schema.toAttributes)
+    val (name, schema) = dataSource.sourceSchema()
+    StreamingRelation(dataSource, name, schema.toAttributes)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 65b1f61349ff9e8eb77d8768e07b740f625bba18..bea243a3be58cce3c0abc5445cc73c40136f26ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -129,8 +129,17 @@ trait SchemaRelationProvider {
  * Implemented by objects that can produce a streaming [[Source]] for a specific format or system.
  */
 trait StreamSourceProvider {
+
+  /** Returns the name and schema of the source that can be used to continually read data. */
+  def sourceSchema(
+      sqlContext: SQLContext,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): (String, StructType)
+
   def createSource(
       sqlContext: SQLContext,
+      metadataPath: String,
       schema: Option[StructType],
       providerName: String,
       parameters: Map[String, String]): Source
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
index 28c558208f6b269d46fdfb43aab20314fe548b13..00efe21d39de48a1b1e2588467542dfe0b6a6adf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
 
 import scala.concurrent.duration._
 
+import org.mockito.Mockito._
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql._
@@ -31,22 +32,50 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
 
 object LastOptions {
+
+  var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
+  var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
   var parameters: Map[String, String] = null
   var schema: Option[StructType] = null
   var partitionColumns: Seq[String] = Nil
+
+  def clear(): Unit = {
+    parameters = null
+    schema = null
+    partitionColumns = null
+    reset(mockStreamSourceProvider)
+    reset(mockStreamSinkProvider)
+  }
 }
 
 /** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */
 class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
+
+  private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+
+  override def sourceSchema(
+      sqlContext: SQLContext,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): (String, StructType) = {
+    LastOptions.parameters = parameters
+    LastOptions.schema = schema
+    LastOptions.mockStreamSourceProvider.sourceSchema(sqlContext, schema, providerName, parameters)
+    ("dummySource", fakeSchema)
+  }
+
   override def createSource(
       sqlContext: SQLContext,
+      metadataPath: String,
       schema: Option[StructType],
       providerName: String,
       parameters: Map[String, String]): Source = {
     LastOptions.parameters = parameters
     LastOptions.schema = schema
+    LastOptions.mockStreamSourceProvider.createSource(
+      sqlContext, metadataPath, schema, providerName, parameters)
     new Source {
-      override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)
+      override def schema: StructType = fakeSchema
 
       override def getOffset: Option[Offset] = Some(new LongOffset(0))
 
@@ -64,6 +93,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
       partitionColumns: Seq[String]): Sink = {
     LastOptions.parameters = parameters
     LastOptions.partitionColumns = partitionColumns
+    LastOptions.mockStreamSinkProvider.createSink(sqlContext, parameters, partitionColumns)
     new Sink {
       override def addBatch(batchId: Long, data: DataFrame): Unit = {}
     }
@@ -117,7 +147,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
     assert(LastOptions.parameters("opt2") == "2")
     assert(LastOptions.parameters("opt3") == "3")
 
-    LastOptions.parameters = null
+    LastOptions.clear()
 
     df.write
       .format("org.apache.spark.sql.streaming.test")
@@ -181,7 +211,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
 
     assert(LastOptions.parameters("path") == "/test")
 
-    LastOptions.parameters = null
+    LastOptions.clear()
 
     df.write
       .format("org.apache.spark.sql.streaming.test")
@@ -204,7 +234,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
     assert(LastOptions.parameters("boolOpt") == "false")
     assert(LastOptions.parameters("doubleOpt") == "6.7")
 
-    LastOptions.parameters = null
+    LastOptions.clear()
     df.write
       .format("org.apache.spark.sql.streaming.test")
       .option("intOpt", 56)
@@ -303,4 +333,39 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
 
     assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000))
   }
+
+  test("source metadataPath") {
+    LastOptions.clear()
+
+    val checkpointLocation = newMetadataDir
+
+    val df1 = sqlContext.read
+      .format("org.apache.spark.sql.streaming.test")
+      .stream()
+
+    val df2 = sqlContext.read
+      .format("org.apache.spark.sql.streaming.test")
+      .stream()
+
+    val q = df1.union(df2).write
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", checkpointLocation)
+      .trigger(ProcessingTime(10.seconds))
+      .startStream()
+    q.stop()
+
+    verify(LastOptions.mockStreamSourceProvider).createSource(
+      sqlContext,
+      checkpointLocation + "/sources/0",
+      None,
+      "org.apache.spark.sql.streaming.test",
+      Map.empty)
+
+    verify(LastOptions.mockStreamSourceProvider).createSource(
+      sqlContext,
+      checkpointLocation + "/sources/1",
+      None,
+      "org.apache.spark.sql.streaming.test",
+      Map.empty)
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 09daa7f81a979aa235e3de13481533b54fe071a8..73d1b1b1d507d2cfa8387285063d8e9f977f6a4c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -63,6 +63,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
       format: String,
       path: String,
       schema: Option[StructType] = None): FileStreamSource = {
+    val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
     val reader =
       if (schema.isDefined) {
         sqlContext.read.format(format).schema(schema.get)
@@ -72,7 +73,8 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
     reader.stream(path)
       .queryExecution.analyzed
       .collect { case StreamingRelation(dataSource, _, _) =>
-        dataSource.createSource().asInstanceOf[FileStreamSource]
+        // There is only one source in our tests so just set sourceId to 0
+        dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource]
       }.head
   }
 
@@ -98,9 +100,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
       }
     df.queryExecution.analyzed
       .collect { case StreamingRelation(dataSource, _, _) =>
-        dataSource.createSource().asInstanceOf[FileStreamSource]
-      }.head
-      .schema
+        dataSource.sourceSchema()
+      }.head._2
   }
 
   test("FileStreamSource schema: no path") {
@@ -340,7 +341,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
     Utils.deleteRecursively(src)
     Utils.deleteRecursively(tmp)
   }
-
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
index 5249aa28dd6cac29d3cd2049e629dbce8178258c..1f2834054519bdfd86e39209e5a482c996c49bec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -59,7 +59,7 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext {
   }
 
   test("error if attempting to resume specific checkpoint") {
-    val location = Utils.createTempDir("steaming.checkpoint").getCanonicalPath
+    val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath
 
     val input = MemoryStream[Int]
     val query = input.toDF().write
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index e4ea55552691d06dc10c0d3b5b8fce08cc64d8ca..2bd27c7efdbdca38e171e948a10d49d7225eb611 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -115,8 +115,17 @@ class StreamSuite extends StreamTest with SharedSQLContext {
  */
 class FakeDefaultSource extends StreamSourceProvider {
 
+  private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+
+  override def sourceSchema(
+      sqlContext: SQLContext,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): (String, StructType) = ("fakeSource", fakeSchema)
+
   override def createSource(
       sqlContext: SQLContext,
+      metadataPath: String,
       schema: Option[StructType],
       providerName: String,
       parameters: Map[String, String]): Source = {