diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index be2d98c46d3b078be37d7b11aa5a98c9a17bcaca..601f944fb6363e79efbbdadc3264d1a9c384a761 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -522,7 +522,7 @@ class HDFSFileCatalog(
       }.filterNot { status =>
         val name = status.getPath.getName
-        name.toLowerCase == "_temporary" || name.startsWith(".")
+        HadoopFsRelation.shouldFilterOut(name)
       val (dirs, files) = statuses.partition(_.isDirectory)
@@ -616,6 +616,16 @@ class HDFSFileCatalog(
  * Helper methods for gathering metadata from HDFS.
 private[sql] object HadoopFsRelation extends Logging {
+  /** Checks if we should filter out this path name. */
+  def shouldFilterOut(pathName: String): Boolean = {
+    // TODO: We should try to filter out all files/dirs starting with "." or "_".
+    // The only reason that we are not doing it now is that Parquet needs to find those
+    // metadata files from leaf files returned by this methods. We should refactor
+    // this logic to not mix metadata files with data files.
+    pathName == "_SUCCESS" || pathName == "_temporary" || pathName.startsWith(".")
+  }
   // We don't filter files/directories whose name start with "_" except "_temporary" here, as
   // specific data sources may take advantages over them (e.g. Parquet _metadata and
   // _common_metadata files). "_temporary" directories are explicitly ignored since failed
@@ -624,19 +634,21 @@ private[sql] object HadoopFsRelation extends Logging {
   def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = {
     logInfo(s"Listing ${status.getPath}")
     val name = status.getPath.getName.toLowerCase
-    if (name == "_temporary" || name.startsWith(".")) {
+    if (shouldFilterOut(name)) {
     } else {
       // Dummy jobconf to get to the pathFilter defined in configuration
       val jobConf = new JobConf(fs.getConf, this.getClass())
       val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
-      if (pathFilter != null) {
-        val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory)
-        files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
-      } else {
-        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
-        files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
-      }
+      val statuses =
+        if (pathFilter != null) {
+          val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory)
+          files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+        } else {
+          val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
+          files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+        }
+      statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index b74b9d3f3bbcacc15a1b71c554b1d572fafe9ef1..026191528ede43ccbe77a4351f886253765c3eb6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -706,6 +706,29 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
+  test("_SUCCESS should not break partitioning discovery") {
+    Seq(1, 32).foreach { threshold =>
+      // We have two paths to list files, one at driver side, another one that we use
+      // a Spark job. We need to test both ways.
+      withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> threshold.toString) {
+        withTempPath { dir =>
+          val tablePath = new File(dir, "table")
+          val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+          df.write
+            .format("parquet")
+            .partitionBy("b", "c", "d")
+            .save(tablePath.getCanonicalPath)
+          Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1", "_SUCCESS"))
+          Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1/c=1", "_SUCCESS"))
+          Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1/c=1/d=1", "_SUCCESS"))
+          checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df)
+        }
+      }
+    }
+  }
   test("listConflictingPartitionColumns") {
     def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
       val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>