Skip to content
Snippets Groups Projects
Commit 1bb5d716 authored by Cheng Lian's avatar Cheng Lian
Browse files

[SPARK-8037] [SQL] Ignores files whose name starts with dot in HadoopFsRelation

Author: Cheng Lian <lian@databricks.com>

Closes #6581 from liancheng/spark-8037 and squashes the following commits:

d08e97b [Cheng Lian] Ignores files whose name starts with dot in HadoopFsRelation
parent bd97840d
No related branches found
No related tags found
No related merge requests found
...@@ -187,7 +187,7 @@ private[sql] object PartitioningUtils { ...@@ -187,7 +187,7 @@ private[sql] object PartitioningUtils {
Seq.empty Seq.empty
} else { } else {
assert(distinctPartitionsColNames.size == 1, { assert(distinctPartitionsColNames.size == 1, {
val list = distinctPartitionsColNames.mkString("\t", "\n", "") val list = distinctPartitionsColNames.mkString("\t", "\n\t", "")
s"Conflicting partition column names detected:\n$list" s"Conflicting partition column names detected:\n$list"
}) })
......
...@@ -379,10 +379,10 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio ...@@ -379,10 +379,10 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
def refresh(): Unit = { def refresh(): Unit = {
// We don't filter files/directories whose name start with "_" or "." here, as specific data // We don't filter files/directories whose name start with "_" except "_temporary" here, as
// sources may take advantages over them (e.g. Parquet _metadata and _common_metadata files). // specific data sources may take advantages over them (e.g. Parquet _metadata and
// But "_temporary" directories are explicitly ignored since failed tasks/jobs may leave // _common_metadata files). "_temporary" directories are explicitly ignored since failed
// partial/corrupted data files there. // tasks/jobs may leave partial/corrupted data files there.
def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = { def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
if (status.getPath.getName.toLowerCase == "_temporary") { if (status.getPath.getName.toLowerCase == "_temporary") {
Set.empty Set.empty
...@@ -400,6 +400,9 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio ...@@ -400,6 +400,9 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
val fs = hdfsPath.getFileSystem(hadoopConf) val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _)) Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
}.filterNot { status =>
// SPARK-8037: Ignores files like ".DS_Store" and other hidden files/directories
status.getPath.getName.startsWith(".")
} }
val files = statuses.filterNot(_.isDir) val files = statuses.filterNot(_.isDir)
......
...@@ -18,10 +18,11 @@ package org.apache.spark.sql.parquet ...@@ -18,10 +18,11 @@ package org.apache.spark.sql.parquet
import java.io.File import java.io.File
import java.math.BigInteger import java.math.BigInteger
import java.sql.{Timestamp, Date} import java.sql.Timestamp
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import com.google.common.io.Files
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.expressions.Literal
...@@ -432,4 +433,20 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { ...@@ -432,4 +433,20 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
checkAnswer(read.load(dir.toString).select(fields: _*), row) checkAnswer(read.load(dir.toString).select(fields: _*), row)
} }
} }
test("SPARK-8037: Ignores files whose name starts with dot") {
withTempPath { dir =>
val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
df.write
.format("parquet")
.partitionBy("b", "c", "d")
.save(dir.getCanonicalPath)
Files.touch(new File(s"${dir.getCanonicalPath}/b=1", ".DS_Store"))
Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df)
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment