Skip to content
Snippets Groups Projects
Commit 1bc41125 authored by hyukjinkwon's avatar hyukjinkwon Committed by Cheng Lian
Browse files

[SPARK-11500][SQL] Not deterministic order of columns when using merging schemas.

https://issues.apache.org/jira/browse/SPARK-11500

As filed in SPARK-11500, if merging schemas is enabled, the order of files to touch is a matter which might affect the ordering of the output columns.

This was mostly because of the use of `Set` and `Map` so I replaced them to `LinkedHashSet` and `LinkedHashMap` to keep the insertion order.

Also, I changed `reduceOption` to `reduceLeftOption`, and replaced the order of `filesToTouch` from `metadataStatuses ++ commonMetadataStatuses ++ needMerged` to  `needMerged ++ metadataStatuses ++ commonMetadataStatuses` in order to touch the part-files first which always have the schema in footers whereas the others might not exist.

One nit is, If merging schemas is not enabled, but when multiple files are given, there is no guarantee of the output order, since there might not be a summary file for the first file, which ends up putting ahead the columns of the other files.

However, I thought this should be okay since disabling merging schemas means (assumes) all the files have the same schemas.

In addition, in the test code for this, I only checked the names of fields.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #9517 from HyukjinKwon/SPARK-11500.
parent 99f5f988
No related branches found
No related tags found
No related merge requests found
...@@ -383,7 +383,7 @@ private[sql] class ParquetRelation( ...@@ -383,7 +383,7 @@ private[sql] class ParquetRelation(
var schema: StructType = _ var schema: StructType = _
// Cached leaves // Cached leaves
var cachedLeaves: Set[FileStatus] = null var cachedLeaves: mutable.LinkedHashSet[FileStatus] = null
/** /**
* Refreshes `FileStatus`es, footers, partition spec, and table schema. * Refreshes `FileStatus`es, footers, partition spec, and table schema.
...@@ -396,13 +396,13 @@ private[sql] class ParquetRelation( ...@@ -396,13 +396,13 @@ private[sql] class ParquetRelation(
!cachedLeaves.equals(currentLeafStatuses) !cachedLeaves.equals(currentLeafStatuses)
if (leafStatusesChanged) { if (leafStatusesChanged) {
cachedLeaves = currentLeafStatuses.toIterator.toSet cachedLeaves = currentLeafStatuses
// Lists `FileStatus`es of all leaf nodes (files) under all base directories. // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = currentLeafStatuses.filter { f => val leaves = currentLeafStatuses.filter { f =>
isSummaryFile(f.getPath) || isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}.toArray }.toArray.sortBy(_.getPath.toString)
dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath)) dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses = metadataStatuses =
...@@ -465,13 +465,30 @@ private[sql] class ParquetRelation( ...@@ -465,13 +465,30 @@ private[sql] class ParquetRelation(
// You should enable this configuration only if you are very sure that for the parquet // You should enable this configuration only if you are very sure that for the parquet
// part-files to read there are corresponding summary files containing correct schema. // part-files to read there are corresponding summary files containing correct schema.
// As filed in SPARK-11500, the order of files to touch is a matter, which might affect
// the ordering of the output columns. There are several things to mention here.
//
// 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from
// the first part-file so that the columns of the lexicographically first file show
// first.
//
// 2. If mergeRespectSummaries config is true, then there should be, at least,
// "_metadata"s for all given files, so that we can ensure the columns of
// the lexicographically first file show first.
//
// 3. If shouldMergeSchemas is false, but when multiple files are given, there is
// no guarantee of the output order, since there might not be a summary file for the
// lexicographically first file, which ends up putting ahead the columns of
// the other files. However, this should be okay since not enabling
// shouldMergeSchemas means (assumes) all the files have the same schemas.
val needMerged: Seq[FileStatus] = val needMerged: Seq[FileStatus] =
if (mergeRespectSummaries) { if (mergeRespectSummaries) {
Seq() Seq()
} else { } else {
dataStatuses dataStatuses
} }
(metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq needMerged ++ metadataStatuses ++ commonMetadataStatuses
} else { } else {
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
// don't have this. // don't have this.
...@@ -768,10 +785,10 @@ private[sql] object ParquetRelation extends Logging { ...@@ -768,10 +785,10 @@ private[sql] object ParquetRelation extends Logging {
footers.map { footer => footers.map { footer =>
ParquetRelation.readSchemaFromFooter(footer, converter) ParquetRelation.readSchemaFromFooter(footer, converter)
}.reduceOption(_ merge _).iterator }.reduceLeftOption(_ merge _).iterator
}.collect() }.collect()
partiallyMergedSchemas.reduceOption(_ merge _) partiallyMergedSchemas.reduceLeftOption(_ merge _)
} }
/** /**
......
...@@ -428,11 +428,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio ...@@ -428,11 +428,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
private var _partitionSpec: PartitionSpec = _ private var _partitionSpec: PartitionSpec = _
private class FileStatusCache { private class FileStatusCache {
var leafFiles = mutable.Map.empty[Path, FileStatus] var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
private def listLeafFiles(paths: Array[String]): Set[FileStatus] = { private def listLeafFiles(paths: Array[String]): mutable.LinkedHashSet[FileStatus] = {
if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) { if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) {
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext) HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext)
} else { } else {
...@@ -450,10 +450,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio ...@@ -450,10 +450,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
val (dirs, files) = statuses.partition(_.isDir) val (dirs, files) = statuses.partition(_.isDir)
// It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
if (dirs.isEmpty) { if (dirs.isEmpty) {
files.toSet mutable.LinkedHashSet(files: _*)
} else { } else {
files.toSet ++ listLeafFiles(dirs.map(_.getPath.toString)) mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath.toString))
} }
} }
} }
...@@ -464,7 +465,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio ...@@ -464,7 +465,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
leafFiles.clear() leafFiles.clear()
leafDirToChildrenFiles.clear() leafDirToChildrenFiles.clear()
leafFiles ++= files.map(f => f.getPath -> f).toMap leafFiles ++= files.map(f => f.getPath -> f)
leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent) leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent)
} }
} }
...@@ -475,8 +476,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio ...@@ -475,8 +476,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
cache cache
} }
protected def cachedLeafStatuses(): Set[FileStatus] = { protected def cachedLeafStatuses(): mutable.LinkedHashSet[FileStatus] = {
fileStatusCache.leafFiles.values.toSet mutable.LinkedHashSet(fileStatusCache.leafFiles.values.toArray: _*)
} }
final private[sql] def partitionSpec: PartitionSpec = { final private[sql] def partitionSpec: PartitionSpec = {
...@@ -834,7 +835,7 @@ private[sql] object HadoopFsRelation extends Logging { ...@@ -834,7 +835,7 @@ private[sql] object HadoopFsRelation extends Logging {
def listLeafFilesInParallel( def listLeafFilesInParallel(
paths: Array[String], paths: Array[String],
hadoopConf: Configuration, hadoopConf: Configuration,
sparkContext: SparkContext): Set[FileStatus] = { sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = {
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
val serializableConfiguration = new SerializableConfiguration(hadoopConf) val serializableConfiguration = new SerializableConfiguration(hadoopConf)
...@@ -854,9 +855,10 @@ private[sql] object HadoopFsRelation extends Logging { ...@@ -854,9 +855,10 @@ private[sql] object HadoopFsRelation extends Logging {
status.getAccessTime) status.getAccessTime)
}.collect() }.collect()
fakeStatuses.map { f => val hadoopFakeStatuses = fakeStatuses.map { f =>
new FileStatus( new FileStatus(
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)) f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path))
}.toSet }
mutable.LinkedHashSet(hadoopFakeStatuses: _*)
} }
} }
...@@ -23,7 +23,7 @@ import com.google.common.io.Files ...@@ -23,7 +23,7 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{execution, AnalysisException, SaveMode} import org.apache.spark.sql._
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
...@@ -155,4 +155,23 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { ...@@ -155,4 +155,23 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1) assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
} }
} }
test("SPARK-11500: Not deterministic order of columns when using merging schemas.") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
withTempPath { dir =>
val pathOne = s"${dir.getCanonicalPath}/part=1"
Seq(1, 1).zipWithIndex.toDF("a", "b").write.parquet(pathOne)
val pathTwo = s"${dir.getCanonicalPath}/part=2"
Seq(1, 1).zipWithIndex.toDF("c", "b").write.parquet(pathTwo)
val pathThree = s"${dir.getCanonicalPath}/part=3"
Seq(1, 1).zipWithIndex.toDF("d", "b").write.parquet(pathThree)
// The schema consists of the leading columns of the first part-file
// in the lexicographic order.
assert(sqlContext.read.parquet(dir.getCanonicalPath).schema.map(_.name)
=== Seq("a", "b", "c", "d", "part"))
}
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment