Skip to content
Snippets Groups Projects
Commit 7cbe2164 authored by Burak Yavuz's avatar Burak Yavuz Committed by Shixiong Zhu
Browse files

[SPARK-17569] Make StructuredStreaming FileStreamSource batch generation faster

## What changes were proposed in this pull request?

While getting the batch for a `FileStreamSource` in StructuredStreaming, we know which files we must take specifically. We already have verified that they exist, and have committed them to a metadata log. When creating the FileSourceRelation however for an incremental execution, the code checks the existence of every single file once again!

When you have 100,000s of files in a folder, creating the first batch takes 2 hours+ when working with S3! This PR disables that check

## How was this patch tested?

Added a unit test to `FileStreamSource`.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15122 from brkyvz/SPARK-17569.
parent 8c3ee2bc
No related branches found
No related tags found
No related merge requests found
......@@ -316,8 +316,14 @@ case class DataSource(
/**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]]
*
* @param checkFilesExist Whether to confirm that the files exist when generating the
* non-streaming file based datasource. StructuredStreaming jobs already
* list file existence, and when generating incremental jobs, the batch
* is considered as a non-streaming file based data source. Since we know
* that files already exist, we don't need to check them again.
*/
def resolveRelation(): BaseRelation = {
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
......@@ -368,7 +374,7 @@ case class DataSource(
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
if (!fs.exists(globPath.head)) {
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
......
......@@ -133,7 +133,8 @@ class FileStreamSource(
userSpecifiedSchema = Some(schema),
className = fileFormatClassName,
options = sourceOptions.optionMapWithoutPath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
checkFilesExist = false)))
}
/**
......
......@@ -17,9 +17,19 @@
package org.apache.spark.sql.execution.streaming
import java.io.File
import java.net.URI
import scala.util.Random
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType
class FileStreamSourceSuite extends SparkFunSuite {
class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
import FileStreamSource._
......@@ -73,4 +83,45 @@ class FileStreamSourceSuite extends SparkFunSuite {
assert(map.isNewFile(FileEntry("b", 10)))
}
testWithUninterruptibleThread("do not recheck that files exist during getBatch") {
withTempDir { temp =>
spark.conf.set(
s"fs.$scheme.impl",
classOf[ExistsThrowsExceptionFileSystem].getName)
// add the metadata entries as a pre-req
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
val metadataLog =
new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath)
assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L))))
val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil),
dir.getAbsolutePath, Map.empty)
// this method should throw an exception if `fs.exists` is called during resolveRelation
newSource.getBatch(None, LongOffset(1))
}
}
}
/** Fake FileSystem to test whether the method `fs.exists` is called during
* `DataSource.resolveRelation`.
*/
class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
override def getUri: URI = {
URI.create(s"$scheme:///")
}
override def exists(f: Path): Boolean = {
throw new IllegalArgumentException("Exists shouldn't have been called!")
}
/** Simply return an empty file for now. */
override def listStatus(file: Path): Array[FileStatus] = {
val emptyFile = new FileStatus()
emptyFile.setPath(file)
Array(emptyFile)
}
}
object ExistsThrowsExceptionFileSystem {
val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment