diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index c36609586c807d7ac06a3331d79d69eb9a307c46..2efff3f57d7d336fa9b3cc810b05559b122de46e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -23,7 +23,7 @@ import java.sql.Timestamp
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
-import org.apache.spark.SparkException
+import org.apache.spark.{DebugFilesystem, SparkException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -316,6 +316,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     }
   }
 
+  /**
+   * this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
+   * to increase the chance of failure
+    */
+  ignore("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") {
+    def testIgnoreCorruptFiles(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
+        spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
+        spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+        val df = spark.read.parquet(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString,
+          new Path(basePath, "third").toString)
+        checkAnswer(
+          df,
+          Seq(Row(0), Row(1)))
+      }
+    }
+
+    for (i <- 1 to 100) {
+      DebugFilesystem.clearOpenStreams()
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val exception = intercept[SparkException] {
+          testIgnoreCorruptFiles()
+        }
+        assert(exception.getMessage().contains("is not a Parquet file"))
+      }
+      DebugFilesystem.assertNoOpenStreams()
+    }
+  }
+
   test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
     withTempPath { dir =>
       val basePath = dir.getCanonicalPath
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index b5ad73b746a8bfeb755162f99f6556fe7a68d6b8..44c0fc70d066ba69f540225a8e237c20c8840a82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -22,11 +22,13 @@ import java.net.URI
 import java.nio.file.Files
 import java.util.{Locale, UUID}
 
+import scala.concurrent.duration._
 import scala.language.implicitConversions
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
@@ -49,7 +51,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils}
  * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
  */
 private[sql] trait SQLTestUtils
-  extends SparkFunSuite
+  extends SparkFunSuite with Eventually
   with BeforeAndAfterAll
   with SQLTestData { self =>
 
@@ -138,6 +140,15 @@ private[sql] trait SQLTestUtils
     }
   }
 
+  /**
+   * Waits for all tasks on all executors to be finished.
+   */
+  protected def waitForTasksToFinish(): Unit = {
+    eventually(timeout(10.seconds)) {
+      assert(spark.sparkContext.statusTracker
+        .getExecutorInfos.map(_.numRunningTasks()).sum == 0)
+    }
+  }
   /**
    * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
    * returns.
@@ -146,7 +157,11 @@ private[sql] trait SQLTestUtils
    */
   protected def withTempDir(f: File => Unit): Unit = {
     val dir = Utils.createTempDir().getCanonicalFile
-    try f(dir) finally Utils.deleteRecursively(dir)
+    try f(dir) finally {
+      // wait for all tasks to finish before deleting files
+      waitForTasksToFinish()
+      Utils.deleteRecursively(dir)
+    }
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index e122b39f6fc40be5564e3dde04c29230d7f0626a..3d76e05f616d5300177f9503a06895fe0211be38 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -17,17 +17,18 @@
 
 package org.apache.spark.sql.test
 
+import scala.concurrent.duration._
+
 import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.{DebugFilesystem, SparkConf}
 import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.internal.SQLConf
-
 
 /**
  * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
  */
-trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
+trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually {
 
   protected val sparkConf = new SparkConf()
 
@@ -84,6 +85,10 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
 
   protected override def afterEach(): Unit = {
     super.afterEach()
-    DebugFilesystem.assertNoOpenStreams()
+    // files can be closed from other threads, so wait a bit
+    // normally this doesn't take more than 1s
+    eventually(timeout(10.seconds)) {
+      DebugFilesystem.assertNoOpenStreams()
+    }
   }
 }