diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index aaabbadcd651b4fb3641b9254031615ed4c0151d..c06026e042d9f4bc32803bff4536b0600ab0bd7e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -31,7 +31,7 @@ import org.apache.spark.SerializableWritable
 import org.apache.spark.sql.{Row, _}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.StructType
 
 /**
  * ::DeveloperApi::
@@ -378,16 +378,22 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
     var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
 
     def refresh(): Unit = {
+      // We don't filter files/directories whose name start with "_" or "." here, as specific data
+      // sources may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
+      // But "_temporary" directories are explicitly ignored since failed tasks/jobs may leave
+      // partial/corrupted data files there.
       def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
-        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
-        val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
-        files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
+        if (status.getPath.getName.toLowerCase == "_temporary") {
+          Set.empty
+        } else {
+          val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+          val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
+          files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
+        }
       }
 
       leafFiles.clear()
 
-      // We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources
-      // may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
       val statuses = paths.flatMap { path =>
         val hdfsPath = new Path(path)
         val fs = hdfsPath.getFileSystem(hadoopConf)
@@ -395,7 +401,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
         Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
       }
 
-      val (dirs, files) = statuses.partition(_.isDir)
+      val files = statuses.filterNot(_.isDir)
       leafFiles ++= files.map(f => f.getPath -> f).toMap
       leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 7c02d563f8d9ac536fc31c9486e7b74ee36b21cb..cf5ae88dc4bee20fc73f0092bbb7e2cd67b9b71a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -548,4 +548,20 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
       checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
     }
   }
+
+  test("SPARK-7868: _temporary directories should be ignored") {
+    withTempPath { dir =>
+      val df = Seq("a", "b", "c").zipWithIndex.toDF()
+
+      df.write
+        .format("parquet")
+        .save(dir.getCanonicalPath)
+
+      df.write
+        .format("parquet")
+        .save(s"${dir.getCanonicalPath}/_temporary")
+
+      checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df.collect())
+    }
+  }
 }