From b463e6d618e69c535297e51f41eca4f91bd33cc8 Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian@databricks.com>
Date: Tue, 26 May 2015 20:48:56 -0700
Subject: [PATCH] [SPARK-7868] [SQL] Ignores _temporary directories in
 HadoopFsRelation

So that potential partial/corrupted data files left by failed tasks/jobs won't affect normal data scan.

Author: Cheng Lian <lian@databricks.com>

Closes #6411 from liancheng/spark-7868 and squashes the following commits:

273ea36 [Cheng Lian] Ignores _temporary directories
---
 .../apache/spark/sql/sources/interfaces.scala | 20 ++++++++++++-------
 .../sql/sources/hadoopFsRelationSuites.scala  | 16 +++++++++++++++
 2 files changed, 29 insertions(+), 7 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index aaabbadcd6..c06026e042 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -31,7 +31,7 @@ import org.apache.spark.SerializableWritable
 import org.apache.spark.sql.{Row, _}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.StructType
 
 /**
  * ::DeveloperApi::
@@ -378,16 +378,22 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
     var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
 
     def refresh(): Unit = {
+      // We don't filter files/directories whose name start with "_" or "." here, as specific data
+      // sources may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
+      // But "_temporary" directories are explicitly ignored since failed tasks/jobs may leave
+      // partial/corrupted data files there.
       def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
-        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
-        val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
-        files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
+        if (status.getPath.getName.toLowerCase == "_temporary") {
+          Set.empty
+        } else {
+          val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+          val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
+          files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
+        }
       }
 
       leafFiles.clear()
 
-      // We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources
-      // may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
       val statuses = paths.flatMap { path =>
         val hdfsPath = new Path(path)
         val fs = hdfsPath.getFileSystem(hadoopConf)
@@ -395,7 +401,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
         Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
       }
 
-      val (dirs, files) = statuses.partition(_.isDir)
+      val files = statuses.filterNot(_.isDir)
       leafFiles ++= files.map(f => f.getPath -> f).toMap
       leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 7c02d563f8..cf5ae88dc4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -548,4 +548,20 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
       checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
     }
   }
+
+  test("SPARK-7868: _temporary directories should be ignored") {
+    withTempPath { dir =>
+      val df = Seq("a", "b", "c").zipWithIndex.toDF()
+
+      df.write
+        .format("parquet")
+        .save(dir.getCanonicalPath)
+
+      df.write
+        .format("parquet")
+        .save(s"${dir.getCanonicalPath}/_temporary")
+
+      checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df.collect())
+    }
+  }
 }
-- 
GitLab