From 1bb5d716c0351cd0b4c11b397fd778f30db39bd9 Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian@databricks.com>
Date: Wed, 3 Jun 2015 00:59:50 +0800
Subject: [PATCH] [SPARK-8037] [SQL] Ignores files whose name starts with dot
 in HadoopFsRelation

Author: Cheng Lian <lian@databricks.com>

Closes #6581 from liancheng/spark-8037 and squashes the following commits:

d08e97b [Cheng Lian] Ignores files whose name starts with dot in HadoopFsRelation
---
 .../spark/sql/sources/PartitioningUtils.scala |  2 +-
 .../apache/spark/sql/sources/interfaces.scala | 11 +++++++----
 .../ParquetPartitionDiscoverySuite.scala      | 19 ++++++++++++++++++-
 3 files changed, 26 insertions(+), 6 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index dafdf0f8b4..c4c99de5a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -187,7 +187,7 @@ private[sql] object PartitioningUtils {
       Seq.empty
     } else {
       assert(distinctPartitionsColNames.size == 1, {
-        val list = distinctPartitionsColNames.mkString("\t", "\n", "")
+        val list = distinctPartitionsColNames.mkString("\t", "\n\t", "")
         s"Conflicting partition column names detected:\n$list"
       })
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index b1b997c030..c4ffa8de52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -379,10 +379,10 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
     var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
 
     def refresh(): Unit = {
-      // We don't filter files/directories whose name start with "_" or "." here, as specific data
-      // sources may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
-      // But "_temporary" directories are explicitly ignored since failed tasks/jobs may leave
-      // partial/corrupted data files there.
+      // We don't filter files/directories whose name start with "_" except "_temporary" here, as
+      // specific data sources may take advantages over them (e.g. Parquet _metadata and
+      // _common_metadata files). "_temporary" directories are explicitly ignored since failed
+      // tasks/jobs may leave partial/corrupted data files there.
       def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
         if (status.getPath.getName.toLowerCase == "_temporary") {
           Set.empty
@@ -400,6 +400,9 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
         val fs = hdfsPath.getFileSystem(hadoopConf)
         val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
         Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
+      }.filterNot { status =>
+        // SPARK-8037: Ignores files like ".DS_Store" and other hidden files/directories
+        status.getPath.getName.startsWith(".")
       }
 
       val files = statuses.filterNot(_.isDir)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index f231589e96..3b29979452 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -18,10 +18,11 @@ package org.apache.spark.sql.parquet
 
 import java.io.File
 import java.math.BigInteger
-import java.sql.{Timestamp, Date}
+import java.sql.Timestamp
 
 import scala.collection.mutable.ArrayBuffer
 
+import com.google.common.io.Files
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.catalyst.expressions.Literal
@@ -432,4 +433,20 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
       checkAnswer(read.load(dir.toString).select(fields: _*), row)
     }
   }
+
+  test("SPARK-8037: Ignores files whose name starts with dot") {
+    withTempPath { dir =>
+      val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+      df.write
+        .format("parquet")
+        .partitionBy("b", "c", "d")
+        .save(dir.getCanonicalPath)
+
+      Files.touch(new File(s"${dir.getCanonicalPath}/b=1", ".DS_Store"))
+      Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
+
+      checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df)
+    }
+  }
 }
-- 
GitLab