From 4913c92c2fbfcc22b41afb8ce79687165392d7da Mon Sep 17 00:00:00 2001 From: Liwei Lin <lwlin7@gmail.com> Date: Tue, 28 Feb 2017 22:58:51 -0800 Subject: [PATCH] [SPARK-19633][SS] FileSource read from FileSink ## What changes were proposed in this pull request? Right now file source always uses `InMemoryFileIndex` to scan files from a given path. But when reading the outputs from another streaming query, the file source should use `MetadataFileIndex` to list files from the sink log. This patch adds this support. ## `MetadataFileIndex` or `InMemoryFileIndex` ```scala spark .readStream .format(...) .load("/some/path") // for a non-glob path: // - use `MetadataFileIndex` when `/some/path/_spark_meta` exists // - fall back to `InMemoryFileIndex` otherwise ``` ```scala spark .readStream .format(...) .load("/some/path/*/*") // for a glob path: always use `InMemoryFileIndex` ``` ## How was this patch tested? two newly added tests Author: Liwei Lin <lwlin7@gmail.com> Closes #16987 from lw-lin/source-read-from-sink. --- .../execution/datasources/DataSource.scala | 26 +--- .../execution/streaming/FileStreamSink.scala | 27 +++- .../streaming/FileStreamSource.scala | 63 +++++++++- .../sql/streaming/FileStreamSourceSuite.scala | 117 ++++++++++++++++-- .../spark/sql/streaming/StreamTest.scala | 8 +- 5 files changed, 200 insertions(+), 41 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d510581f90..c1353d41e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -279,28 +279,6 @@ case class DataSource( } } - /** - * Returns true if there is a single path that has a metadata log indicating which files should - * be read. - */ - def hasMetadata(path: Seq[String]): Boolean = { - path match { - case Seq(singlePath) => - try { - val hdfsPath = new Path(singlePath) - val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir) - val res = fs.exists(metadataPath) - res - } catch { - case NonFatal(e) => - logWarning(s"Error while looking for metadata directory.") - false - } - case _ => false - } - } - /** * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this * [[DataSource]] @@ -331,7 +309,9 @@ case class DataSource( // We are reading from the results of a streaming query. Load files from the metadata log // instead of listing them using HDFS APIs. case (format: FileFormat, _) - if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) => + if FileStreamSink.hasMetadata( + caseInsensitiveOptions.get("path").toSeq ++ paths, + sparkSession.sessionState.newHadoopConf()) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath) val dataSchema = userSpecifiedSchema.orElse { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 0dbe2a71ed..07ec4e9429 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.streaming +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -25,9 +28,31 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter} -object FileStreamSink { +object FileStreamSink extends Logging { // The name of the subdirectory that is used to store metadata about which files are valid. val metadataDir = "_spark_metadata" + + /** + * Returns true if there is a single path that has a metadata log indicating which files should + * be read. + */ + def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = { + path match { + case Seq(singlePath) => + try { + val hdfsPath = new Path(singlePath) + val fs = hdfsPath.getFileSystem(hadoopConf) + val metadataPath = new Path(hdfsPath, metadataDir) + val res = fs.exists(metadataPath) + res + } catch { + case NonFatal(e) => + logWarning(s"Error while looking for metadata directory.") + false + } + case _ => false + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 39c0b49796..6a7263ca45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming import scala.collection.JavaConverters._ -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging @@ -43,8 +43,10 @@ class FileStreamSource( private val sourceOptions = new FileStreamOptions(options) + private val hadoopConf = sparkSession.sessionState.newHadoopConf() + private val qualifiedBasePath: Path = { - val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) + val fs = new Path(path).getFileSystem(hadoopConf) fs.makeQualified(new Path(path)) // can contains glob patterns } @@ -157,14 +159,65 @@ class FileStreamSource( checkFilesExist = false))) } + /** + * If the source has a metadata log indicating which files should be read, then we should use it. + * Only when user gives a non-glob path that will we figure out whether the source has some + * metadata log + * + * None means we don't know at the moment + * Some(true) means we know for sure the source DOES have metadata + * Some(false) means we know for sure the source DOSE NOT have metadata + */ + @volatile private[sql] var sourceHasMetadata: Option[Boolean] = + if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None + + private def allFilesUsingInMemoryFileIndex() = { + val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) + val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) + fileIndex.allFiles() + } + + private def allFilesUsingMetadataLogFileIndex() = { + // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a + // non-glob path + new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles() + } + /** * Returns a list of files found, sorted by their timestamp. */ private def fetchAllFiles(): Seq[(String, Long)] = { val startTime = System.nanoTime - val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) - val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) - val files = catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status => + + var allFiles: Seq[FileStatus] = null + sourceHasMetadata match { + case None => + if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { + sourceHasMetadata = Some(true) + allFiles = allFilesUsingMetadataLogFileIndex() + } else { + allFiles = allFilesUsingInMemoryFileIndex() + if (allFiles.isEmpty) { + // we still cannot decide + } else { + // decide what to use for future rounds + // double check whether source has metadata, preventing the extreme corner case that + // metadata log and data files are only generated after the previous + // `FileStreamSink.hasMetadata` check + if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { + sourceHasMetadata = Some(true) + allFiles = allFilesUsingMetadataLogFileIndex() + } else { + sourceHasMetadata = Some(false) + // `allFiles` have already been fetched using InMemoryFileIndex in this round + } + } + } + case Some(true) => allFiles = allFilesUsingMetadataLogFileIndex() + case Some(false) => allFiles = allFilesUsingInMemoryFileIndex() + } + + val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status => (status.getPath.toUri.toString, status.getModificationTime) } val endTime = System.nanoTime diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 5110d89c85..1586850c77 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -52,10 +52,7 @@ abstract class FileStreamSourceTest query.nonEmpty, "Cannot add data when there is no query for finding the active file stream source") - val sources = query.get.logicalPlan.collect { - case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] => - source.asInstanceOf[FileStreamSource] - } + val sources = getSourcesFromStreamingQuery(query.get) if (sources.isEmpty) { throw new Exception( "Could not find file source in the StreamExecution logical plan to add data to") @@ -134,6 +131,14 @@ abstract class FileStreamSourceTest }.head } + protected def getSourcesFromStreamingQuery(query: StreamExecution): Seq[FileStreamSource] = { + query.logicalPlan.collect { + case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] => + source.asInstanceOf[FileStreamSource] + } + } + + protected def withTempDirs(body: (File, File) => Unit) { val src = Utils.createTempDir(namePrefix = "streaming.src") val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") @@ -388,9 +393,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { CheckAnswer("a", "b", "c", "d"), AssertOnQuery("seen files should contain only one entry") { streamExecution => - val source = streamExecution.logicalPlan.collect { case e: StreamingExecutionRelation => - e.source.asInstanceOf[FileStreamSource] - }.head + val source = getSourcesFromStreamingQuery(streamExecution).head assert(source.seenFiles.size == 1) true } @@ -662,6 +665,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("read data from outputs of another streaming query") { + withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withTempDirs { case (outputDir, checkpointDir) => + // q1 is a streaming query that reads from memory and writes to text files + val q1Source = MemoryStream[String] + val q1 = + q1Source + .toDF() + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .format("text") + .start(outputDir.getCanonicalPath) + + // q2 is a streaming query that reads q1's text outputs + val q2 = + createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep") + + def q1AddData(data: String*): StreamAction = + Execute { _ => + q1Source.addData(data) + q1.processAllAvailable() + } + def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + + testStream(q2)( + // batch 0 + q1AddData("drop1", "keep2"), + q2ProcessAllAvailable(), + CheckAnswer("keep2"), + + // batch 1 + Assert { + // create a text file that won't be on q1's sink log + // thus even if its content contains "keep", it should NOT appear in q2's answer + val shouldNotKeep = new File(outputDir, "should_not_keep.txt") + stringToFile(shouldNotKeep, "should_not_keep!!!") + shouldNotKeep.exists() + }, + q1AddData("keep3"), + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3"), + + // batch 2: check that things work well when the sink log gets compacted + q1AddData("keep4"), + Assert { + // compact interval is 3, so file "2.compact" should exist + new File(outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists() + }, + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3", "keep4"), + + Execute { _ => q1.stop() } + ) + } + } + } + + test("start before another streaming query, and read its output") { + withTempDirs { case (outputDir, checkpointDir) => + // q1 is a streaming query that reads from memory and writes to text files + val q1Source = MemoryStream[String] + // define q1, but don't start it for now + val q1Write = + q1Source + .toDF() + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .format("text") + var q1: StreamingQuery = null + + val q2 = createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep") + + testStream(q2)( + AssertOnQuery { q2 => + val fileSource = getSourcesFromStreamingQuery(q2).head + // q1 has not started yet, verify that q2 doesn't know whether q1 has metadata + fileSource.sourceHasMetadata === None + }, + Execute { _ => + q1 = q1Write.start(outputDir.getCanonicalPath) + q1Source.addData("drop1", "keep2") + q1.processAllAvailable() + }, + AssertOnQuery { q2 => + q2.processAllAvailable() + val fileSource = getSourcesFromStreamingQuery(q2).head + // q1 has started, verify that q2 knows q1 has metadata by now + fileSource.sourceHasMetadata === Some(true) + }, + CheckAnswer("keep2"), + Execute { _ => q1.stop() } + ) + } + } + test("when schema inference is turned on, should read partition data") { def createFile(content: String, src: File, tmp: File): Unit = { val tempFile = Utils.tempFileWith(new File(tmp, "text")) @@ -755,10 +853,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { .streamingQuery q.processAllAvailable() val memorySink = q.sink.asInstanceOf[MemorySink] - val fileSource = q.logicalPlan.collect { - case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] => - source.asInstanceOf[FileStreamSource] - }.head + val fileSource = getSourcesFromStreamingQuery(q).head /** Check the data read in the last batch */ def checkLastBatchData(data: Int*): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index af2f31a34d..60e2375a98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -208,6 +208,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } } + /** Execute arbitrary code */ + object Execute { + def apply(func: StreamExecution => Any): AssertOnQuery = + AssertOnQuery(query => { func(query); true }) + } + class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable { private var waitStartTime: Option[Long] = None @@ -472,7 +478,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case a: AssertOnQuery => verify(currentStream != null || lastStream != null, - "cannot assert when not stream has been started") + "cannot assert when no stream has been started") val streamToAssert = Option(currentStream).getOrElse(lastStream) verify(a.condition(streamToAssert), s"Assert on query failed: ${a.message}") -- GitLab