diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 073d2b1512b954b7a6b34174065c620dd63e67df..7dccbbd3f0a5b1abb91493986cc6835eed944083 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index fdd1fa3648251baaf9f8bc4d041646fc45f7c44e..623d2be55dcec7e5b4dd54388882e35d5e9cde22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -431,7 +431,7 @@ case class FileSourceScanExec( private def createBucketedReadRDD( bucketSpec: BucketSpec, readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Seq[PartitionDirectory], + selectedPartitions: Seq[Partition], fsRelation: HadoopFsRelation): RDD[InternalRow] = { logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val bucketed = @@ -463,7 +463,7 @@ case class FileSourceScanExec( */ private def createNonBucketedReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Seq[PartitionDirectory], + selectedPartitions: Seq[Partition], fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala deleted file mode 100644 index 2bc66ceeebdb4c4d1ff15086a9c18419a7707d90..0000000000000000000000000000000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import org.apache.hadoop.fs._ - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ - -/** - * A collection of data files from a partitioned relation, along with the partition values in the - * form of an [[InternalRow]]. - */ -case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus]) - -/** - * An interface for objects capable of enumerating the root paths of a relation as well as the - * partitions of a relation subject to some pruning expressions. - */ -trait FileCatalog { - - /** - * Returns the list of root input paths from which the catalog will get files. There may be a - * single root path from which partitions are discovered, or individual partitions may be - * specified by each path. - */ - def rootPaths: Seq[Path] - - /** - * Returns all valid files grouped into partitions when the data is partitioned. If the data is - * unpartitioned, this will return a single partition with no partition values. - * - * @param filters The filters used to prune which partitions are returned. These filters must - * only refer to partition columns and this method will only return files - * where these predicates are guaranteed to evaluate to `true`. Thus, these - * filters will not need to be evaluated again on the returned data. - */ - def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] - - /** - * Returns the list of files that will be read when scanning this relation. This call may be - * very expensive for large tables. - */ - def inputFiles: Array[String] - - /** Refresh any cached file listings */ - def refresh(): Unit - - /** Sum of table file sizes, in bytes */ - def sizeInBytes: Long -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 9d153cec731a8dcbd9a4cb831cf9a1f4505b41b0..e7239ef91b326d8bf1a905672f4758f22d84ee0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -175,3 +175,64 @@ abstract class TextBasedFileFormat extends FileFormat { codec == null || codec.isInstanceOf[SplittableCompressionCodec] } } + +/** + * A collection of data files from a partitioned relation, along with the partition values in the + * form of an [[InternalRow]]. + */ +case class Partition(values: InternalRow, files: Seq[FileStatus]) + +/** + * An interface for objects capable of enumerating the root paths of a relation as well as the + * partitions of a relation subject to some pruning expressions. + */ +trait BasicFileCatalog { + + /** + * Returns the list of root input paths from which the catalog will get files. There may be a + * single root path from which partitions are discovered, or individual partitions may be + * specified by each path. + */ + def rootPaths: Seq[Path] + + /** + * Returns all valid files grouped into partitions when the data is partitioned. If the data is + * unpartitioned, this will return a single partition with no partition values. + * + * @param filters The filters used to prune which partitions are returned. These filters must + * only refer to partition columns and this method will only return files + * where these predicates are guaranteed to evaluate to `true`. Thus, these + * filters will not need to be evaluated again on the returned data. + */ + def listFiles(filters: Seq[Expression]): Seq[Partition] + + /** Returns the list of files that will be read when scanning this relation. */ + def inputFiles: Array[String] + + /** Refresh any cached file listings */ + def refresh(): Unit + + /** Sum of table file sizes, in bytes */ + def sizeInBytes: Long +} + +/** + * A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from + * those, infer the relation's partition specification. + */ +// TODO: Consider a more descriptive, appropriate name which suggests this is a file catalog for +// which it is safe to list all of its files? +trait FileCatalog extends BasicFileCatalog { + + /** Returns the specification of the partitions inferred from the data. */ + def partitionSpec(): PartitionSpec + + /** Returns all the valid files. */ + def allFiles(): Seq[FileStatus] + + /** Returns the list of files that will be read when scanning this relation. */ + override def inputFiles: Array[String] = + allFiles().map(_.getPath.toUri.toString).toArray + + override def sizeInBytes: Long = allFiles().map(_.getLen).sum +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index afad8898089bda2807670e48ec1d74375e3e5165..db889edf032d6fc3b4f49e937da524694aa9e0e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType * Acts as a container for all of the metadata required to read from a datasource. All discovery, * resolution and merging logic for schemas and partitions has been removed. * - * @param location A [[FileCatalog]] that can enumerate the locations of all the files that + * @param location A [[BasicFileCatalog]] that can enumerate the locations of all the files that * comprise this relation. * @param partitionSchema The schema of the columns (if any) that are used to partition the relation * @param dataSchema The schema of any remaining columns. Note that if any partition columns are @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType * @param options Configuration used when reading / writing data. */ case class HadoopFsRelation( - location: FileCatalog, + location: BasicFileCatalog, partitionSchema: StructType, dataSchema: StructType, bucketSpec: Option[BucketSpec], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index 5c8eff7ec46b440f977214d0831d872831510d2e..b2508115c282fe5c6fdd51a0fec76849e44f6fde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -17,21 +17,14 @@ package org.apache.spark.sql.execution.datasources -import java.io.FileNotFoundException - import scala.collection.mutable -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs._ -import org.apache.hadoop.mapred.{FileInputFormat, JobConf} +import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.internal.Logging -import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{StringType, StructType} -import org.apache.spark.util.SerializableConfiguration /** @@ -45,24 +38,22 @@ import org.apache.spark.util.SerializableConfiguration abstract class PartitioningAwareFileCatalog( sparkSession: SparkSession, parameters: Map[String, String], - partitionSchema: Option[StructType]) extends FileCatalog with Logging { + partitionSchema: Option[StructType]) + extends SessionFileCatalog(sparkSession) with FileCatalog { import PartitioningAwareFileCatalog.BASE_PATH_PARAM - /** Returns the specification of the partitions inferred from the data. */ - def partitionSpec(): PartitionSpec - - protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) + override protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] - override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { + override def listFiles(filters: Seq[Expression]): Seq[Partition] = { val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { - PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil + Partition(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil } else { prunePartitions(filters, partitionSpec()).map { - case PartitionPath(values, path) => + case PartitionDirectory(values, path) => val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { case Some(existingDir) => // Directory has children files in it, return them @@ -72,20 +63,14 @@ abstract class PartitioningAwareFileCatalog( // Directory does not exist, or has no children files Nil } - PartitionDirectory(values, files) + Partition(values, files) } } logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) selectedPartitions } - /** Returns the list of files that will be read when scanning this relation. */ - override def inputFiles: Array[String] = - allFiles().map(_.getPath.toUri.toString).toArray - - override def sizeInBytes: Long = allFiles().map(_.getLen).sum - - def allFiles(): Seq[FileStatus] = { + override def allFiles(): Seq[FileStatus] = { if (partitionSpec().partitionColumns.isEmpty) { // For each of the root input paths, get the list of files inside them rootPaths.flatMap { path => @@ -154,7 +139,7 @@ abstract class PartitioningAwareFileCatalog( private def prunePartitions( predicates: Seq[Expression], - partitionSpec: PartitionSpec): Seq[PartitionPath] = { + partitionSpec: PartitionSpec): Seq[PartitionDirectory] = { val PartitionSpec(partitionColumns, partitions) = partitionSpec val partitionColumnNames = partitionColumns.map(_.name).toSet val partitionPruningPredicates = predicates.filter { @@ -171,7 +156,7 @@ abstract class PartitioningAwareFileCatalog( }) val selected = partitions.filter { - case PartitionPath(values, _) => boundPredicate(values) + case PartitionDirectory(values, _) => boundPredicate(values) } logInfo { val total = partitions.length @@ -229,186 +214,8 @@ abstract class PartitioningAwareFileCatalog( val name = path.getName !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) } - - /** - * List leaf files of given paths. This method will submit a Spark job to do parallel - * listing whenever there is a path having more files than the parallel partition discovery - * discovery threshold. - * - * This is publicly visible for testing. - */ - def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { - val files = - if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - PartitioningAwareFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) - } else { - PartitioningAwareFileCatalog.listLeafFilesInSerial(paths, hadoopConf) - } - - HiveCatalogMetrics.incrementFilesDiscovered(files.size) - mutable.LinkedHashSet(files: _*) - } } -object PartitioningAwareFileCatalog extends Logging { +object PartitioningAwareFileCatalog { val BASE_PATH_PARAM = "basePath" - - /** A serializable variant of HDFS's BlockLocation. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) - - /** - * List a collection of path recursively. - */ - private def listLeafFilesInSerial( - paths: Seq[Path], - hadoopConf: Configuration): Seq[FileStatus] = { - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val filter = FileInputFormat.getInputPathFilter(jobConf) - - paths.flatMap { path => - val fs = path.getFileSystem(hadoopConf) - listLeafFiles0(fs, path, filter) - } - } - - /** - * List a collection of path recursively in parallel (using Spark executors). - * Each task launched will use [[listLeafFilesInSerial]] to list. - */ - private def listLeafFilesInParallel( - paths: Seq[Path], - hadoopConf: Configuration, - sparkSession: SparkSession): Seq[FileStatus] = { - assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) - logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") - - val sparkContext = sparkSession.sparkContext - val serializableConfiguration = new SerializableConfiguration(hadoopConf) - val serializedPaths = paths.map(_.toString) - - // Set the number of parallelism to prevent following file listing from generating many tasks - // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, 10000) - - val statuses = sparkContext - .parallelize(serializedPaths, numParallelism) - .mapPartitions { paths => - val hadoopConf = serializableConfiguration.value - listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator - }.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - }.collect() - - // Turn SerializableFileStatus back to Status - statuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) - } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)), - blockLocations) - } - } - - /** - * List a single path, provided as a FileStatus, in serial. - */ - private def listLeafFiles0( - fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = { - logTrace(s"Listing $path") - val name = path.getName.toLowerCase - if (shouldFilterOut(name)) { - Seq.empty[FileStatus] - } else { - // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist - // Note that statuses only include FileStatus for the files and dirs directly under path, - // and does not include anything else recursively. - val statuses = try fs.listStatus(path) catch { - case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } - - val allLeafStatuses = { - val (dirs, files) = statuses.partition(_.isDirectory) - val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter)) - if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats - } - - allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { - case f: LocatedFileStatus => - f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of - // paths exceeds threshold. - case f => - // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), - // which is very slow on some file system (RawLocalFileSystem, which is launch a - // subprocess and parse the stdout). - val locations = fs.getFileBlockLocations(f, 0, f.getLen) - val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) - if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) - } - lfs - } - } - } - - /** Checks if we should filter out this path name. */ - def shouldFilterOut(pathName: String): Boolean = { - // We filter everything that starts with _ and ., except _common_metadata and _metadata - // because Parquet needs to find those metadata files from leaf files returned by this method. - // We should refactor this logic to not mix metadata files with data files. - ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) && - !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index ac6795b9a2e7b2da45b6cb0131901f0476884c19..504464216e5a41f9a327fe99296297f095c4b558 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -33,8 +33,8 @@ import org.apache.spark.sql.types._ // TODO: We should tighten up visibility of the classes here once we clean up Hive coupling. -object PartitionPath { - def apply(values: InternalRow, path: String): PartitionPath = +object PartitionDirectory { + def apply(values: InternalRow, path: String): PartitionDirectory = apply(values, new Path(path)) } @@ -42,14 +42,14 @@ object PartitionPath { * Holds a directory in a partitioned collection of files as well as as the partition values * in the form of a Row. Before scanning, the files at `path` need to be enumerated. */ -case class PartitionPath(values: InternalRow, path: Path) +case class PartitionDirectory(values: InternalRow, path: Path) case class PartitionSpec( partitionColumns: StructType, - partitions: Seq[PartitionPath]) + partitions: Seq[PartitionDirectory]) object PartitionSpec { - val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionPath]) + val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionDirectory]) } object PartitioningUtils { @@ -141,7 +141,7 @@ object PartitioningUtils { // Finally, we create `Partition`s based on paths and resolved partition values. val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map { case (PartitionValues(_, literals), (path, _)) => - PartitionPath(InternalRow.fromSeq(literals.map(_.value)), path) + PartitionDirectory(InternalRow.fromSeq(literals.map(_.value)), path) } PartitionSpec(StructType(fields), partitions) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala new file mode 100644 index 0000000000000000000000000000000000000000..4807a92c2e6b8c12e2faba13f8ea77c8a050d10d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.FileNotFoundException + +import scala.collection.mutable + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} + +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.SerializableConfiguration + + +/** + * A base class for [[BasicFileCatalog]]s that need a [[SparkSession]] and the ability to find leaf + * files in a list of HDFS paths. + * + * @param sparkSession a [[SparkSession]] + * @param ignoreFileNotFound (see [[ListingFileCatalog]]) + */ +abstract class SessionFileCatalog(sparkSession: SparkSession) + extends BasicFileCatalog with Logging { + protected val hadoopConf: Configuration + + /** + * List leaf files of given paths. This method will submit a Spark job to do parallel + * listing whenever there is a path having more files than the parallel partition discovery + * discovery threshold. + * + * This is publicly visible for testing. + */ + def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + val files = + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + SessionFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) + } else { + SessionFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + } + + HiveCatalogMetrics.incrementFilesDiscovered(files.size) + mutable.LinkedHashSet(files: _*) + } +} + +object SessionFileCatalog extends Logging { + + /** A serializable variant of HDFS's BlockLocation. */ + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + /** A serializable variant of HDFS's FileStatus. */ + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** + * List a collection of path recursively. + */ + private def listLeafFilesInSerial( + paths: Seq[Path], + hadoopConf: Configuration): Seq[FileStatus] = { + // Dummy jobconf to get to the pathFilter defined in configuration + val jobConf = new JobConf(hadoopConf, this.getClass) + val filter = FileInputFormat.getInputPathFilter(jobConf) + + paths.flatMap { path => + val fs = path.getFileSystem(hadoopConf) + listLeafFiles0(fs, path, filter) + } + } + + /** + * List a collection of path recursively in parallel (using Spark executors). + * Each task launched will use [[listLeafFilesInSerial]] to list. + */ + private def listLeafFilesInParallel( + paths: Seq[Path], + hadoopConf: Configuration, + sparkSession: SparkSession): Seq[FileStatus] = { + assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) + logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") + + val sparkContext = sparkSession.sparkContext + val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val serializedPaths = paths.map(_.toString) + + // Set the number of parallelism to prevent following file listing from generating many tasks + // in case of large #defaultParallelism. + val numParallelism = Math.min(paths.size, 10000) + + val statuses = sparkContext + .parallelize(serializedPaths, numParallelism) + .mapPartitions { paths => + val hadoopConf = serializableConfiguration.value + listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator + }.map { status => + // Turn FileStatus into SerializableFileStatus so we can send it back to the driver + val blockLocations = status match { + case f: LocatedFileStatus => + f.getBlockLocations.map { loc => + SerializableBlockLocation( + loc.getNames, + loc.getHosts, + loc.getOffset, + loc.getLength) + } + + case _ => + Array.empty[SerializableBlockLocation] + } + + SerializableFileStatus( + status.getPath.toString, + status.getLen, + status.isDirectory, + status.getReplication, + status.getBlockSize, + status.getModificationTime, + status.getAccessTime, + blockLocations) + }.collect() + + // Turn SerializableFileStatus back to Status + statuses.map { f => + val blockLocations = f.blockLocations.map { loc => + new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) + } + new LocatedFileStatus( + new FileStatus( + f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)), + blockLocations) + } + } + + /** + * List a single path, provided as a FileStatus, in serial. + */ + private def listLeafFiles0( + fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = { + logTrace(s"Listing $path") + val name = path.getName.toLowerCase + if (shouldFilterOut(name)) { + Seq.empty[FileStatus] + } else { + // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist + // Note that statuses only include FileStatus for the files and dirs directly under path, + // and does not include anything else recursively. + val statuses = try fs.listStatus(path) catch { + case _: FileNotFoundException => + logWarning(s"The directory $path was not found. Was it deleted very recently?") + Array.empty[FileStatus] + } + + val allLeafStatuses = { + val (dirs, files) = statuses.partition(_.isDirectory) + val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter)) + if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats + } + + allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { + case f: LocatedFileStatus => + f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `listLeafFilesInParallel` when the number of + // paths exceeds threshold. + case f => + // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), + // which is very slow on some file system (RawLocalFileSystem, which is launch a + // subprocess and parse the stdout). + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, + f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { + lfs.setSymlink(f.getSymlink) + } + lfs + } + } + } + + /** Checks if we should filter out this path name. */ + def shouldFilterOut(pathName: String): Boolean = { + // We filter everything that starts with _ and ., except _common_metadata and _metadata + // because Parquet needs to find those metadata files from leaf files returned by this method. + // We should refactor this logic to not mix metadata files with data files. + ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) && + !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala index 5648ab480a98af0f1b467008670b22431968c154..a5c41b244589b02ce96b290d42a0375795649f8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.types.StructType /** - * A [[FileCatalog]] for a metastore catalog table. + * A [[BasicFileCatalog]] for a metastore catalog table. * * @param sparkSession a [[SparkSession]] * @param db the table's database name @@ -38,9 +38,10 @@ class TableFileCatalog( db: String, table: String, partitionSchema: Option[StructType], - override val sizeInBytes: Long) extends FileCatalog { + override val sizeInBytes: Long) + extends SessionFileCatalog(sparkSession) { - protected val hadoopConf = sparkSession.sessionState.newHadoopConf + override protected val hadoopConf = sparkSession.sessionState.newHadoopConf private val externalCatalog = sparkSession.sharedState.externalCatalog @@ -50,7 +51,7 @@ class TableFileCatalog( override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq - override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { + override def listFiles(filters: Seq[Expression]): Seq[Partition] = { filterPartitions(filters).listFiles(Nil) } @@ -78,7 +79,7 @@ class TableFileCatalog( case Some(schema) => val selectedPartitions = externalCatalog.listPartitionsByFilter(db, table, filters) val partitions = selectedPartitions.map { p => - PartitionPath(p.toRow(schema), p.storage.locationUri.get) + PartitionDirectory(p.toRow(schema), p.storage.locationUri.get) } val partitionSpec = PartitionSpec(schema, partitions) new PrunedTableFileCatalog( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index 9c43169cbf8984323f80978964a1f47f40b2345a..2695974b84b000a0cca547ce9a68b2256e3cff14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -81,16 +81,6 @@ class FileCatalogSuite extends SharedSQLContext { } } - test("PartitioningAwareFileCatalog - file filtering") { - assert(!PartitioningAwareFileCatalog.shouldFilterOut("abcd")) - assert(PartitioningAwareFileCatalog.shouldFilterOut(".ab")) - assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd")) - assert(!PartitioningAwareFileCatalog.shouldFilterOut("_metadata")) - assert(!PartitioningAwareFileCatalog.shouldFilterOut("_common_metadata")) - assert(PartitioningAwareFileCatalog.shouldFilterOut("_ab_metadata")) - assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd_common_metadata")) - } - test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") { class MockCatalog( override val rootPaths: Seq[Path]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..df509583377ae302db1240edcd7f7ec4b9bae5c4 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.SparkFunSuite + +class SessionFileCatalogSuite extends SparkFunSuite { + + test("file filtering") { + assert(!SessionFileCatalog.shouldFilterOut("abcd")) + assert(SessionFileCatalog.shouldFilterOut(".ab")) + assert(SessionFileCatalog.shouldFilterOut("_cd")) + + assert(!SessionFileCatalog.shouldFilterOut("_metadata")) + assert(!SessionFileCatalog.shouldFilterOut("_common_metadata")) + assert(SessionFileCatalog.shouldFilterOut("_ab_metadata")) + assert(SessionFileCatalog.shouldFilterOut("_cd_common_metadata")) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 36d4df0015ffd5290de02bf0780ef3a93064c11d..43357c97c395af60e88ab1e59bc9c7c3c1b31e43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -30,7 +30,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionPath => Partition, PartitioningAwareFileCatalog, PartitioningUtils, PartitionSpec} +import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -626,11 +626,10 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution queryExecution.analyzed.collectFirst { - case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileCatalog, _, _, _, _, _), _, _) => - assert(location.partitionSpec() === PartitionSpec.emptySpec) + case LogicalRelation(HadoopFsRelation(location: FileCatalog, _, _, _, _, _), _, _) => + assert(location.partitionSpec === PartitionSpec.emptySpec) }.getOrElse { - fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution") + fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution") } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 16e1e37b2fb02ba0f9cafe11c3fce840c0e262fb..4a2aaa7d4f6cacb1bf07631c7b4eb6d14c8cfdec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.types._