Skip to content
Snippets Groups Projects
Commit bca7ce28 authored by Bogdan Raducanu's avatar Bogdan Raducanu Committed by Herman van Hovell
Browse files

[SPARK-19946][TESTS][BACKPORT-2.1] DebugFilesystem.assertNoOpenStreams should...

[SPARK-19946][TESTS][BACKPORT-2.1] DebugFilesystem.assertNoOpenStreams should report the open streams to help debugging

## What changes were proposed in this pull request?
Backport for PR #17292
DebugFilesystem.assertNoOpenStreams throws an exception with a cause exception that actually shows the code line which leaked the stream.

## How was this patch tested?
New test in SparkContextSuite to check there is a cause exception.

Author: Bogdan Raducanu <bogdan@databricks.com>

Closes #17632 from bogdanrdc/SPARK-19946-BRANCH2.1.
parent 98ae5481
No related branches found
No related tags found
No related merge requests found
......@@ -44,7 +44,8 @@ object DebugFilesystem extends Logging {
logWarning("Leaked filesystem connection created at:")
exc.printStackTrace()
}
throw new RuntimeException(s"There are $numOpen possibly leaked file streams.")
throw new IllegalStateException(s"There are $numOpen possibly leaked file streams.",
openStreams.values().asScala.head)
}
}
}
......
......@@ -18,7 +18,7 @@
package org.apache.spark
import java.io.File
import java.net.MalformedURLException
import java.net.{MalformedURLException, URI}
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
......@@ -26,6 +26,8 @@ import scala.concurrent.Await
import scala.concurrent.duration.Duration
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
......@@ -467,4 +469,20 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
sc.stop()
}
}
test("SPARK-19446: DebugFilesystem.assertNoOpenStreams should report " +
"open streams to help debugging") {
val fs = new DebugFilesystem()
fs.initialize(new URI("file:///"), new Configuration())
val file = File.createTempFile("SPARK19446", "temp")
Files.write(Array.ofDim[Byte](1000), file)
val path = new Path("file:///" + file.getCanonicalPath)
val stream = fs.open(path)
val exc = intercept[RuntimeException] {
DebugFilesystem.assertNoOpenStreams()
}
assert(exc != null)
assert(exc.getCause() != null)
stream.close()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment