Skip to content
Snippets Groups Projects
Commit 347b5010 authored by Yin Huai's avatar Yin Huai Committed by Cheng Lian
Browse files

[SPARK-7737] [SQL] Use leaf dirs having data files to discover partitions.

https://issues.apache.org/jira/browse/SPARK-7737

cc liancheng

Author: Yin Huai <yhuai@databricks.com>

Closes #6329 from yhuai/spark-7737 and squashes the following commits:

7e0dfc7 [Yin Huai] Use leaf dirs having data files to discover partitions.
parent 147b6be3
No related branches found
No related tags found
No related merge requests found
......@@ -377,8 +377,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
var leafDirs = mutable.Map.empty[Path, FileStatus]
def refresh(): Unit = {
def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
......@@ -386,7 +384,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
}
leafDirs.clear()
leafFiles.clear()
// We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources
......@@ -399,7 +396,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
val (dirs, files) = statuses.partition(_.isDir)
leafDirs ++= dirs.map(d => d.getPath -> d).toMap
leafFiles ++= files.map(f => f.getPath -> f).toMap
leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
}
......@@ -484,7 +480,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
private def discoverPartitions(): PartitionSpec = {
val leafDirs = fileStatusCache.leafDirs.keys.toSeq
// We use leaf dirs containing data files to discover the schema.
val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
}
......
......@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.parquet
import java.io.File
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.fs.Path
......@@ -175,11 +177,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
pi <- Seq(1, 2)
ps <- Seq("foo", "bar")
} {
val dir = makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)
makeParquetFile(
(1 to 10).map(i => ParquetData(i, i.toString)),
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
dir)
// Introduce _temporary dir to test the robustness of the schema discovery process.
new File(dir.toString, "_temporary").mkdir()
}
// Introduce _temporary dir to the base dir the robustness of the schema discovery process.
new File(base.getCanonicalPath, "_temporary").mkdir()
println("load the partitioned table")
read.parquet(base.getCanonicalPath).registerTempTable("t")
withTempTable("t") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment