Skip to content
Snippets Groups Projects
Commit ba505805 authored by Bogdan Raducanu's avatar Bogdan Raducanu Committed by Xiao Li
Browse files

[SPARK-20407][TESTS][BACKPORT-2.1] ParquetQuerySuite 'Enabling/disabling...

[SPARK-20407][TESTS][BACKPORT-2.1] ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test

## What changes were proposed in this pull request?

SharedSQLContext.afterEach now calls DebugFilesystem.assertNoOpenStreams inside eventually.
SQLTestUtils withTempDir calls waitForTasksToFinish before deleting the directory.

## How was this patch tested?
New test but marked as ignored because it takes 30s. Can be unignored for review.

Author: Bogdan Raducanu <bogdan@databricks.com>

Closes #17720 from bogdanrdc/SPARK-20407-BACKPORT2.1.
parent fb0351a3
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.SparkException
import org.apache.spark.{DebugFilesystem, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
......@@ -242,6 +242,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
/**
* this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
* to increase the chance of failure
*/
ignore("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") {
def testIgnoreCorruptFiles(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
val df = spark.read.parquet(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
checkAnswer(
df,
Seq(Row(0), Row(1)))
}
}
for (i <- 1 to 100) {
DebugFilesystem.clearOpenStreams()
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
val exception = intercept[SparkException] {
testIgnoreCorruptFiles()
}
assert(exception.getMessage().contains("is not a Parquet file"))
}
DebugFilesystem.assertNoOpenStreams()
}
}
test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
withTempPath { dir =>
val basePath = dir.getCanonicalPath
......
......@@ -20,12 +20,14 @@ package org.apache.spark.sql.test
import java.io.File
import java.util.UUID
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.util.Try
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
......@@ -48,7 +50,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils}
* prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
*/
private[sql] trait SQLTestUtils
extends SparkFunSuite
extends SparkFunSuite with Eventually
with BeforeAndAfterAll
with SQLTestData { self =>
......@@ -122,6 +124,15 @@ private[sql] trait SQLTestUtils
try f(path) finally Utils.deleteRecursively(path)
}
/**
* Waits for all tasks on all executors to be finished.
*/
protected def waitForTasksToFinish(): Unit = {
eventually(timeout(10.seconds)) {
assert(spark.sparkContext.statusTracker
.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
}
}
/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
......@@ -130,7 +141,11 @@ private[sql] trait SQLTestUtils
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
try f(dir) finally {
// wait for all tasks to finish before deleting files
waitForTasksToFinish()
Utils.deleteRecursively(dir)
}
}
/**
......
......@@ -17,7 +17,10 @@
package org.apache.spark.sql.test
import scala.concurrent.duration._
import org.scalatest.BeforeAndAfterEach
import org.scalatest.concurrent.Eventually
import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.{SparkSession, SQLContext}
......@@ -26,7 +29,7 @@ import org.apache.spark.sql.{SparkSession, SQLContext}
/**
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
*/
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually {
protected val sparkConf = new SparkConf()
......@@ -86,6 +89,10 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
protected override def afterEach(): Unit = {
super.afterEach()
DebugFilesystem.assertNoOpenStreams()
// files can be closed from other threads, so wait a bit
// normally this doesn't take more than 1s
eventually(timeout(10.seconds)) {
DebugFilesystem.assertNoOpenStreams()
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment