Skip to content
Snippets Groups Projects
Commit 4913c92c authored by Liwei Lin's avatar Liwei Lin Committed by Shixiong Zhu
Browse files

[SPARK-19633][SS] FileSource read from FileSink

## What changes were proposed in this pull request?

Right now file source always uses `InMemoryFileIndex` to scan files from a given path.

But when reading the outputs from another streaming query, the file source should use `MetadataFileIndex` to list files from the sink log. This patch adds this support.

## `MetadataFileIndex` or `InMemoryFileIndex`
```scala
spark
  .readStream
  .format(...)
  .load("/some/path") // for a non-glob path:
                      //   - use `MetadataFileIndex` when `/some/path/_spark_meta` exists
                      //   - fall back to `InMemoryFileIndex` otherwise
```
```scala
spark
  .readStream
  .format(...)
  .load("/some/path/*/*") // for a glob path: always use `InMemoryFileIndex`
```

## How was this patch tested?

two newly added tests

Author: Liwei Lin <lwlin7@gmail.com>

Closes #16987 from lw-lin/source-read-from-sink.
parent 89cd3845
No related branches found
No related tags found
No related merge requests found
...@@ -279,28 +279,6 @@ case class DataSource( ...@@ -279,28 +279,6 @@ case class DataSource(
} }
} }
/**
* Returns true if there is a single path that has a metadata log indicating which files should
* be read.
*/
def hasMetadata(path: Seq[String]): Boolean = {
path match {
case Seq(singlePath) =>
try {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
val res = fs.exists(metadataPath)
res
} catch {
case NonFatal(e) =>
logWarning(s"Error while looking for metadata directory.")
false
}
case _ => false
}
}
/** /**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]] * [[DataSource]]
...@@ -331,7 +309,9 @@ case class DataSource( ...@@ -331,7 +309,9 @@ case class DataSource(
// We are reading from the results of a streaming query. Load files from the metadata log // We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs. // instead of listing them using HDFS APIs.
case (format: FileFormat, _) case (format: FileFormat, _)
if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) => if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath) val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath)
val dataSchema = userSpecifiedSchema.orElse { val dataSchema = userSpecifiedSchema.orElse {
......
...@@ -17,6 +17,9 @@ ...@@ -17,6 +17,9 @@
package org.apache.spark.sql.execution.streaming package org.apache.spark.sql.execution.streaming
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
...@@ -25,9 +28,31 @@ import org.apache.spark.sql.{DataFrame, SparkSession} ...@@ -25,9 +28,31 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter} import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter}
object FileStreamSink { object FileStreamSink extends Logging {
// The name of the subdirectory that is used to store metadata about which files are valid. // The name of the subdirectory that is used to store metadata about which files are valid.
val metadataDir = "_spark_metadata" val metadataDir = "_spark_metadata"
/**
* Returns true if there is a single path that has a metadata log indicating which files should
* be read.
*/
def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
path match {
case Seq(singlePath) =>
try {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(hadoopConf)
val metadataPath = new Path(hdfsPath, metadataDir)
val res = fs.exists(metadataPath)
res
} catch {
case NonFatal(e) =>
logWarning(s"Error while looking for metadata directory.")
false
}
case _ => false
}
}
} }
/** /**
......
...@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming ...@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
...@@ -43,8 +43,10 @@ class FileStreamSource( ...@@ -43,8 +43,10 @@ class FileStreamSource(
private val sourceOptions = new FileStreamOptions(options) private val sourceOptions = new FileStreamOptions(options)
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
private val qualifiedBasePath: Path = { private val qualifiedBasePath: Path = {
val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) val fs = new Path(path).getFileSystem(hadoopConf)
fs.makeQualified(new Path(path)) // can contains glob patterns fs.makeQualified(new Path(path)) // can contains glob patterns
} }
...@@ -157,14 +159,65 @@ class FileStreamSource( ...@@ -157,14 +159,65 @@ class FileStreamSource(
checkFilesExist = false))) checkFilesExist = false)))
} }
/**
* If the source has a metadata log indicating which files should be read, then we should use it.
* Only when user gives a non-glob path that will we figure out whether the source has some
* metadata log
*
* None means we don't know at the moment
* Some(true) means we know for sure the source DOES have metadata
* Some(false) means we know for sure the source DOSE NOT have metadata
*/
@volatile private[sql] var sourceHasMetadata: Option[Boolean] =
if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None
private def allFilesUsingInMemoryFileIndex() = {
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
fileIndex.allFiles()
}
private def allFilesUsingMetadataLogFileIndex() = {
// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a
// non-glob path
new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles()
}
/** /**
* Returns a list of files found, sorted by their timestamp. * Returns a list of files found, sorted by their timestamp.
*/ */
private def fetchAllFiles(): Seq[(String, Long)] = { private def fetchAllFiles(): Seq[(String, Long)] = {
val startTime = System.nanoTime val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) var allFiles: Seq[FileStatus] = null
val files = catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status => sourceHasMetadata match {
case None =>
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
sourceHasMetadata = Some(true)
allFiles = allFilesUsingMetadataLogFileIndex()
} else {
allFiles = allFilesUsingInMemoryFileIndex()
if (allFiles.isEmpty) {
// we still cannot decide
} else {
// decide what to use for future rounds
// double check whether source has metadata, preventing the extreme corner case that
// metadata log and data files are only generated after the previous
// `FileStreamSink.hasMetadata` check
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
sourceHasMetadata = Some(true)
allFiles = allFilesUsingMetadataLogFileIndex()
} else {
sourceHasMetadata = Some(false)
// `allFiles` have already been fetched using InMemoryFileIndex in this round
}
}
}
case Some(true) => allFiles = allFilesUsingMetadataLogFileIndex()
case Some(false) => allFiles = allFilesUsingInMemoryFileIndex()
}
val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status =>
(status.getPath.toUri.toString, status.getModificationTime) (status.getPath.toUri.toString, status.getModificationTime)
} }
val endTime = System.nanoTime val endTime = System.nanoTime
......
...@@ -52,10 +52,7 @@ abstract class FileStreamSourceTest ...@@ -52,10 +52,7 @@ abstract class FileStreamSourceTest
query.nonEmpty, query.nonEmpty,
"Cannot add data when there is no query for finding the active file stream source") "Cannot add data when there is no query for finding the active file stream source")
val sources = query.get.logicalPlan.collect { val sources = getSourcesFromStreamingQuery(query.get)
case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
source.asInstanceOf[FileStreamSource]
}
if (sources.isEmpty) { if (sources.isEmpty) {
throw new Exception( throw new Exception(
"Could not find file source in the StreamExecution logical plan to add data to") "Could not find file source in the StreamExecution logical plan to add data to")
...@@ -134,6 +131,14 @@ abstract class FileStreamSourceTest ...@@ -134,6 +131,14 @@ abstract class FileStreamSourceTest
}.head }.head
} }
protected def getSourcesFromStreamingQuery(query: StreamExecution): Seq[FileStreamSource] = {
query.logicalPlan.collect {
case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
source.asInstanceOf[FileStreamSource]
}
}
protected def withTempDirs(body: (File, File) => Unit) { protected def withTempDirs(body: (File, File) => Unit) {
val src = Utils.createTempDir(namePrefix = "streaming.src") val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
...@@ -388,9 +393,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { ...@@ -388,9 +393,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
CheckAnswer("a", "b", "c", "d"), CheckAnswer("a", "b", "c", "d"),
AssertOnQuery("seen files should contain only one entry") { streamExecution => AssertOnQuery("seen files should contain only one entry") { streamExecution =>
val source = streamExecution.logicalPlan.collect { case e: StreamingExecutionRelation => val source = getSourcesFromStreamingQuery(streamExecution).head
e.source.asInstanceOf[FileStreamSource]
}.head
assert(source.seenFiles.size == 1) assert(source.seenFiles.size == 1)
true true
} }
...@@ -662,6 +665,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest { ...@@ -662,6 +665,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
} }
} }
test("read data from outputs of another streaming query") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withTempDirs { case (outputDir, checkpointDir) =>
// q1 is a streaming query that reads from memory and writes to text files
val q1Source = MemoryStream[String]
val q1 =
q1Source
.toDF()
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("text")
.start(outputDir.getCanonicalPath)
// q2 is a streaming query that reads q1's text outputs
val q2 =
createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep")
def q1AddData(data: String*): StreamAction =
Execute { _ =>
q1Source.addData(data)
q1.processAllAvailable()
}
def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
testStream(q2)(
// batch 0
q1AddData("drop1", "keep2"),
q2ProcessAllAvailable(),
CheckAnswer("keep2"),
// batch 1
Assert {
// create a text file that won't be on q1's sink log
// thus even if its content contains "keep", it should NOT appear in q2's answer
val shouldNotKeep = new File(outputDir, "should_not_keep.txt")
stringToFile(shouldNotKeep, "should_not_keep!!!")
shouldNotKeep.exists()
},
q1AddData("keep3"),
q2ProcessAllAvailable(),
CheckAnswer("keep2", "keep3"),
// batch 2: check that things work well when the sink log gets compacted
q1AddData("keep4"),
Assert {
// compact interval is 3, so file "2.compact" should exist
new File(outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
},
q2ProcessAllAvailable(),
CheckAnswer("keep2", "keep3", "keep4"),
Execute { _ => q1.stop() }
)
}
}
}
test("start before another streaming query, and read its output") {
withTempDirs { case (outputDir, checkpointDir) =>
// q1 is a streaming query that reads from memory and writes to text files
val q1Source = MemoryStream[String]
// define q1, but don't start it for now
val q1Write =
q1Source
.toDF()
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("text")
var q1: StreamingQuery = null
val q2 = createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep")
testStream(q2)(
AssertOnQuery { q2 =>
val fileSource = getSourcesFromStreamingQuery(q2).head
// q1 has not started yet, verify that q2 doesn't know whether q1 has metadata
fileSource.sourceHasMetadata === None
},
Execute { _ =>
q1 = q1Write.start(outputDir.getCanonicalPath)
q1Source.addData("drop1", "keep2")
q1.processAllAvailable()
},
AssertOnQuery { q2 =>
q2.processAllAvailable()
val fileSource = getSourcesFromStreamingQuery(q2).head
// q1 has started, verify that q2 knows q1 has metadata by now
fileSource.sourceHasMetadata === Some(true)
},
CheckAnswer("keep2"),
Execute { _ => q1.stop() }
)
}
}
test("when schema inference is turned on, should read partition data") { test("when schema inference is turned on, should read partition data") {
def createFile(content: String, src: File, tmp: File): Unit = { def createFile(content: String, src: File, tmp: File): Unit = {
val tempFile = Utils.tempFileWith(new File(tmp, "text")) val tempFile = Utils.tempFileWith(new File(tmp, "text"))
...@@ -755,10 +853,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { ...@@ -755,10 +853,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
.streamingQuery .streamingQuery
q.processAllAvailable() q.processAllAvailable()
val memorySink = q.sink.asInstanceOf[MemorySink] val memorySink = q.sink.asInstanceOf[MemorySink]
val fileSource = q.logicalPlan.collect { val fileSource = getSourcesFromStreamingQuery(q).head
case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
source.asInstanceOf[FileStreamSource]
}.head
/** Check the data read in the last batch */ /** Check the data read in the last batch */
def checkLastBatchData(data: Int*): Unit = { def checkLastBatchData(data: Int*): Unit = {
......
...@@ -208,6 +208,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { ...@@ -208,6 +208,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
} }
} }
/** Execute arbitrary code */
object Execute {
def apply(func: StreamExecution => Any): AssertOnQuery =
AssertOnQuery(query => { func(query); true })
}
class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable { class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
private var waitStartTime: Option[Long] = None private var waitStartTime: Option[Long] = None
...@@ -472,7 +478,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { ...@@ -472,7 +478,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
case a: AssertOnQuery => case a: AssertOnQuery =>
verify(currentStream != null || lastStream != null, verify(currentStream != null || lastStream != null,
"cannot assert when not stream has been started") "cannot assert when no stream has been started")
val streamToAssert = Option(currentStream).getOrElse(lastStream) val streamToAssert = Option(currentStream).getOrElse(lastStream)
verify(a.condition(streamToAssert), s"Assert on query failed: ${a.message}") verify(a.condition(streamToAssert), s"Assert on query failed: ${a.message}")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment