diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index ccbabf09a832335e0930f5bf4270424a606ee516..ce916b43bf4d37207d86f51cd883ed14d5398e16 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -227,6 +227,10 @@ class SparkHadoopUtil extends Logging { def globPath(pattern: Path): Seq[Path] = { val fs = pattern.getFileSystem(conf) + globPath(fs, pattern) + } + + def globPath(fs: FileSystem, pattern: Path): Seq[Path] = { Option(fs.globStatus(pattern)).map { statuses => statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq }.getOrElse(Seq.empty[Path]) @@ -236,6 +240,10 @@ class SparkHadoopUtil extends Logging { if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern) } + def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = { + if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern) + } + /** * Lists all the files in a directory with the specified prefix, and does not end with the * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index cbe8ce421f92b5050f0425fe559830f643c75261..567ff49773f9b97cd1fe504e0443ca7347d4a7d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil @@ -150,7 +151,7 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified) + SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) }.toArray new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) } @@ -379,22 +380,8 @@ case class DataSource( case (format: FileFormat, _) => val allPaths = caseInsensitiveOptions.get("path") ++ paths val hadoopConf = sparkSession.sessionState.newHadoopConf() - val globbedPaths = allPaths.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified) - - if (globPath.isEmpty) { - throw new AnalysisException(s"Path does not exist: $qualified") - } - // Sufficient to check head of the globPath seq for non-glob scenario - // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && !fs.exists(globPath.head)) { - throw new AnalysisException(s"Path does not exist: ${globPath.head}") - } - globPath - }.toArray + val globbedPaths = allPaths.flatMap( + DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) @@ -657,4 +644,28 @@ object DataSource extends Logging { CatalogStorageFormat.empty.copy( locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath) } + + /** + * If `path` is a file pattern, return all the files that match it. Otherwise, return itself. + * If `checkFilesExist` is `true`, also check the file existence. + */ + private def checkAndGlobPathIfNecessary( + hadoopConf: Configuration, + path: String, + checkFilesExist: Boolean): Seq[Path] = { + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(hadoopConf) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + + if (globPath.isEmpty) { + throw new AnalysisException(s"Path does not exist: $qualified") + } + // Sufficient to check head of the globPath seq for non-glob scenario + // Don't need to check once again if files exist in streaming mode + if (checkFilesExist && !fs.exists(globPath.head)) { + throw new AnalysisException(s"Path does not exist: ${globPath.head}") + } + globPath + } }