diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 93154bd2ca69cc8d713cd3f9cb886278f59cd436..413976a7ef2440ab719cdaed96731f0b9b064e64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -316,8 +316,14 @@ case class DataSource( /** * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this * [[DataSource]] + * + * @param checkFilesExist Whether to confirm that the files exist when generating the + * non-streaming file based datasource. StructuredStreaming jobs already + * list file existence, and when generating incremental jobs, the batch + * is considered as a non-streaming file based data source. Since we know + * that files already exist, we don't need to check them again. */ - def resolveRelation(): BaseRelation = { + def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = { val caseInsensitiveOptions = new CaseInsensitiveMap(options) val relation = (providingClass.newInstance(), userSpecifiedSchema) match { // TODO: Throw when too much is given. @@ -368,7 +374,7 @@ case class DataSource( throw new AnalysisException(s"Path does not exist: $qualified") } // Sufficient to check head of the globPath seq for non-glob scenario - if (!fs.exists(globPath.head)) { + if (checkFilesExist && !fs.exists(globPath.head)) { throw new AnalysisException(s"Path does not exist: ${globPath.head}") } globPath diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 0dc08b1467b1473500ecf76b20d74e32b1f1ae49..5ebc083a7da9280247a48b5bce3747675bebdf71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -133,7 +133,8 @@ class FileStreamSource( userSpecifiedSchema = Some(schema), className = fileFormatClassName, options = sourceOptions.optionMapWithoutPath) - Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation())) + Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation( + checkFilesExist = false))) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala index c6db2fd3f908ed398fa8f7a40fbe12871549cb1c..e8fa6a59c57aeee62f4e5cd651762f1a3a920a88 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala @@ -17,9 +17,19 @@ package org.apache.spark.sql.execution.streaming +import java.io.File +import java.net.URI + +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} + import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType -class FileStreamSourceSuite extends SparkFunSuite { +class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext { import FileStreamSource._ @@ -73,4 +83,45 @@ class FileStreamSourceSuite extends SparkFunSuite { assert(map.isNewFile(FileEntry("b", 10))) } + testWithUninterruptibleThread("do not recheck that files exist during getBatch") { + withTempDir { temp => + spark.conf.set( + s"fs.$scheme.impl", + classOf[ExistsThrowsExceptionFileSystem].getName) + // add the metadata entries as a pre-req + val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir + val metadataLog = + new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath) + assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L)))) + + val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), + dir.getAbsolutePath, Map.empty) + // this method should throw an exception if `fs.exists` is called during resolveRelation + newSource.getBatch(None, LongOffset(1)) + } + } +} + +/** Fake FileSystem to test whether the method `fs.exists` is called during + * `DataSource.resolveRelation`. + */ +class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { + override def getUri: URI = { + URI.create(s"$scheme:///") + } + + override def exists(f: Path): Boolean = { + throw new IllegalArgumentException("Exists shouldn't have been called!") + } + + /** Simply return an empty file for now. */ + override def listStatus(file: Path): Array[FileStatus] = { + val emptyFile = new FileStatus() + emptyFile.setPath(file) + Array(emptyFile) + } +} + +object ExistsThrowsExceptionFileSystem { + val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs" }