diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 88125a2b4da787edb898fc996d30e6fd578db731..e0e569bca48b4c9714c0a5fcad4a2bde9523bea0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -341,11 +341,11 @@ private[sql] object HadoopFsRelation extends Logging {
 
   /** Checks if we should filter out this path name. */
   def shouldFilterOut(pathName: String): Boolean = {
-    // TODO: We should try to filter out all files/dirs starting with "." or "_".
-    // The only reason that we are not doing it now is that Parquet needs to find those
-    // metadata files from leaf files returned by this methods. We should refactor
-    // this logic to not mix metadata files with data files.
-    pathName == "_SUCCESS" || pathName == "_temporary" || pathName.startsWith(".")
+    // We filter everything that starts with _ and ., except _common_metadata and _metadata
+    // because Parquet needs to find those metadata files from leaf files returned by this method.
+    // We should refactor this logic to not mix metadata files with data files.
+    (pathName.startsWith("_") || pathName.startsWith(".")) &&
+      !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
index 89d57653adcbd3dea1dd350c4ba17f4cf41239e5..3c68dc8bb98d8d58992248f2a493cb8e2d2907d3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
@@ -39,4 +39,15 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
       assert(df.queryExecution.logical.statistics.sizeInBytes === BigInt(totalSize))
     }
   }
+
+  test("file filtering") {
+    assert(!HadoopFsRelation.shouldFilterOut("abcd"))
+    assert(HadoopFsRelation.shouldFilterOut(".ab"))
+    assert(HadoopFsRelation.shouldFilterOut("_cd"))
+
+    assert(!HadoopFsRelation.shouldFilterOut("_metadata"))
+    assert(!HadoopFsRelation.shouldFilterOut("_common_metadata"))
+    assert(HadoopFsRelation.shouldFilterOut("_ab_metadata"))
+    assert(HadoopFsRelation.shouldFilterOut("_cd_common_metadata"))
+  }
 }