diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 5a7c6b95b565fdc201a2b22d0df723f4c6d604d7..21337b2932aac6e59a449edf4c948ea042f71a4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -383,7 +383,7 @@ private[sql] class ParquetRelation( var schema: StructType = _ // Cached leaves - var cachedLeaves: Set[FileStatus] = null + var cachedLeaves: mutable.LinkedHashSet[FileStatus] = null /** * Refreshes `FileStatus`es, footers, partition spec, and table schema. @@ -396,13 +396,13 @@ private[sql] class ParquetRelation( !cachedLeaves.equals(currentLeafStatuses) if (leafStatusesChanged) { - cachedLeaves = currentLeafStatuses.toIterator.toSet + cachedLeaves = currentLeafStatuses // Lists `FileStatus`es of all leaf nodes (files) under all base directories. val leaves = currentLeafStatuses.filter { f => isSummaryFile(f.getPath) || !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) - }.toArray + }.toArray.sortBy(_.getPath.toString) dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath)) metadataStatuses = @@ -465,13 +465,30 @@ private[sql] class ParquetRelation( // You should enable this configuration only if you are very sure that for the parquet // part-files to read there are corresponding summary files containing correct schema. + // As filed in SPARK-11500, the order of files to touch is a matter, which might affect + // the ordering of the output columns. There are several things to mention here. + // + // 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from + // the first part-file so that the columns of the lexicographically first file show + // first. + // + // 2. If mergeRespectSummaries config is true, then there should be, at least, + // "_metadata"s for all given files, so that we can ensure the columns of + // the lexicographically first file show first. + // + // 3. If shouldMergeSchemas is false, but when multiple files are given, there is + // no guarantee of the output order, since there might not be a summary file for the + // lexicographically first file, which ends up putting ahead the columns of + // the other files. However, this should be okay since not enabling + // shouldMergeSchemas means (assumes) all the files have the same schemas. + val needMerged: Seq[FileStatus] = if (mergeRespectSummaries) { Seq() } else { dataStatuses } - (metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq + needMerged ++ metadataStatuses ++ commonMetadataStatuses } else { // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet // don't have this. @@ -768,10 +785,10 @@ private[sql] object ParquetRelation extends Logging { footers.map { footer => ParquetRelation.readSchemaFromFooter(footer, converter) - }.reduceOption(_ merge _).iterator + }.reduceLeftOption(_ merge _).iterator }.collect() - partiallyMergedSchemas.reduceOption(_ merge _) + partiallyMergedSchemas.reduceLeftOption(_ merge _) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index e296d631f0f300bd6018a2e50e746d745ca577c5..5b8841bc154a5c67acc26a8ad77110003b17afe4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -428,11 +428,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio private var _partitionSpec: PartitionSpec = _ private class FileStatusCache { - var leafFiles = mutable.Map.empty[Path, FileStatus] + var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus] var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] - private def listLeafFiles(paths: Array[String]): Set[FileStatus] = { + private def listLeafFiles(paths: Array[String]): mutable.LinkedHashSet[FileStatus] = { if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) { HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext) } else { @@ -450,10 +450,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio val (dirs, files) = statuses.partition(_.isDir) + // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) if (dirs.isEmpty) { - files.toSet + mutable.LinkedHashSet(files: _*) } else { - files.toSet ++ listLeafFiles(dirs.map(_.getPath.toString)) + mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath.toString)) } } } @@ -464,7 +465,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio leafFiles.clear() leafDirToChildrenFiles.clear() - leafFiles ++= files.map(f => f.getPath -> f).toMap + leafFiles ++= files.map(f => f.getPath -> f) leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent) } } @@ -475,8 +476,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio cache } - protected def cachedLeafStatuses(): Set[FileStatus] = { - fileStatusCache.leafFiles.values.toSet + protected def cachedLeafStatuses(): mutable.LinkedHashSet[FileStatus] = { + mutable.LinkedHashSet(fileStatusCache.leafFiles.values.toArray: _*) } final private[sql] def partitionSpec: PartitionSpec = { @@ -834,7 +835,7 @@ private[sql] object HadoopFsRelation extends Logging { def listLeafFilesInParallel( paths: Array[String], hadoopConf: Configuration, - sparkContext: SparkContext): Set[FileStatus] = { + sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = { logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") val serializableConfiguration = new SerializableConfiguration(hadoopConf) @@ -854,9 +855,10 @@ private[sql] object HadoopFsRelation extends Logging { status.getAccessTime) }.collect() - fakeStatuses.map { f => + val hadoopFakeStatuses = fakeStatuses.map { f => new FileStatus( f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)) - }.toSet + } + mutable.LinkedHashSet(hadoopFakeStatuses: _*) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index e2d754e806403731b543e55169d2dc2448fb81a9..e866493ee6c96d2020ad9fc5f219169b602b7b1d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -23,7 +23,7 @@ import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql.{execution, AnalysisException, SaveMode} +import org.apache.spark.sql._ import org.apache.spark.sql.types._ @@ -155,4 +155,23 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1) } } + + test("SPARK-11500: Not deterministic order of columns when using merging schemas.") { + import testImplicits._ + withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { + withTempPath { dir => + val pathOne = s"${dir.getCanonicalPath}/part=1" + Seq(1, 1).zipWithIndex.toDF("a", "b").write.parquet(pathOne) + val pathTwo = s"${dir.getCanonicalPath}/part=2" + Seq(1, 1).zipWithIndex.toDF("c", "b").write.parquet(pathTwo) + val pathThree = s"${dir.getCanonicalPath}/part=3" + Seq(1, 1).zipWithIndex.toDF("d", "b").write.parquet(pathThree) + + // The schema consists of the leading columns of the first part-file + // in the lexicographic order. + assert(sqlContext.read.parquet(dir.getCanonicalPath).schema.map(_.name) + === Seq("a", "b", "c", "d", "part")) + } + } + } }