From ba505805dcf17d7964ec9df7e76489bfc162949a Mon Sep 17 00:00:00 2001
From: Bogdan Raducanu <bogdan@databricks.com>
Date: Sat, 22 Apr 2017 09:58:07 -0700
Subject: [PATCH] [SPARK-20407][TESTS][BACKPORT-2.1] ParquetQuerySuite
 'Enabling/disabling ignoreCorruptFiles' flaky test

## What changes were proposed in this pull request?

SharedSQLContext.afterEach now calls DebugFilesystem.assertNoOpenStreams inside eventually.
SQLTestUtils withTempDir calls waitForTasksToFinish before deleting the directory.

## How was this patch tested?
New test but marked as ignored because it takes 30s. Can be unignored for review.

Author: Bogdan Raducanu <bogdan@databricks.com>

Closes #17720 from bogdanrdc/SPARK-20407-BACKPORT2.1.
---
 .../parquet/ParquetQuerySuite.scala           | 35 ++++++++++++++++++-
 .../apache/spark/sql/test/SQLTestUtils.scala  | 19 ++++++++--
 .../spark/sql/test/SharedSQLContext.scala     | 11 ++++--
 3 files changed, 60 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 6132376724..6033c66e2e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,7 +22,7 @@ import java.io.File
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
-import org.apache.spark.SparkException
+import org.apache.spark.{DebugFilesystem, SparkException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -242,6 +242,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     }
   }
 
+  /**
+   * this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
+   * to increase the chance of failure
+    */
+  ignore("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") {
+    def testIgnoreCorruptFiles(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
+        spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
+        spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+        val df = spark.read.parquet(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString,
+          new Path(basePath, "third").toString)
+        checkAnswer(
+          df,
+          Seq(Row(0), Row(1)))
+      }
+    }
+
+    for (i <- 1 to 100) {
+      DebugFilesystem.clearOpenStreams()
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val exception = intercept[SparkException] {
+          testIgnoreCorruptFiles()
+        }
+        assert(exception.getMessage().contains("is not a Parquet file"))
+      }
+      DebugFilesystem.assertNoOpenStreams()
+    }
+  }
+
   test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
     withTempPath { dir =>
       val basePath = dir.getCanonicalPath
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index d4afb9d8af..24ba0f571e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql.test
 import java.io.File
 import java.util.UUID
 
+import scala.concurrent.duration._
 import scala.language.implicitConversions
 import scala.util.Try
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
@@ -48,7 +50,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils}
  * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
  */
 private[sql] trait SQLTestUtils
-  extends SparkFunSuite
+  extends SparkFunSuite with Eventually
   with BeforeAndAfterAll
   with SQLTestData { self =>
 
@@ -122,6 +124,15 @@ private[sql] trait SQLTestUtils
     try f(path) finally Utils.deleteRecursively(path)
   }
 
+  /**
+   * Waits for all tasks on all executors to be finished.
+   */
+  protected def waitForTasksToFinish(): Unit = {
+    eventually(timeout(10.seconds)) {
+      assert(spark.sparkContext.statusTracker
+        .getExecutorInfos.map(_.numRunningTasks()).sum == 0)
+    }
+  }
   /**
    * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
    * returns.
@@ -130,7 +141,11 @@ private[sql] trait SQLTestUtils
    */
   protected def withTempDir(f: File => Unit): Unit = {
     val dir = Utils.createTempDir().getCanonicalFile
-    try f(dir) finally Utils.deleteRecursively(dir)
+    try f(dir) finally {
+      // wait for all tasks to finish before deleting files
+      waitForTasksToFinish()
+      Utils.deleteRecursively(dir)
+    }
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index 2239f10870..243845dfba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.sql.test
 
+import scala.concurrent.duration._
+
 import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.{DebugFilesystem, SparkConf}
 import org.apache.spark.sql.{SparkSession, SQLContext}
@@ -26,7 +29,7 @@ import org.apache.spark.sql.{SparkSession, SQLContext}
 /**
  * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
  */
-trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
+trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually {
 
   protected val sparkConf = new SparkConf()
 
@@ -86,6 +89,10 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
 
   protected override def afterEach(): Unit = {
     super.afterEach()
-    DebugFilesystem.assertNoOpenStreams()
+    // files can be closed from other threads, so wait a bit
+    // normally this doesn't take more than 1s
+    eventually(timeout(10.seconds)) {
+      DebugFilesystem.assertNoOpenStreams()
+    }
   }
 }
-- 
GitLab