Skip to content
Snippets Groups Projects
Commit 4f7d49b9 authored by Bogdan Raducanu's avatar Bogdan Raducanu Committed by Herman van Hovell
Browse files

[SPARK-20243][TESTS] DebugFilesystem.assertNoOpenStreams thread race

## What changes were proposed in this pull request?

Synchronize access to openStreams map.

## How was this patch tested?

Existing tests.

Author: Bogdan Raducanu <bogdan@databricks.com>

Closes #17592 from bogdanrdc/SPARK-20243.
parent 3d7f201f
No related branches found
No related tags found
No related merge requests found
......@@ -20,7 +20,6 @@ package org.apache.spark
import java.io.{FileDescriptor, InputStream}
import java.lang
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable
......@@ -31,21 +30,29 @@ import org.apache.spark.internal.Logging
object DebugFilesystem extends Logging {
// Stores the set of active streams and their creation sites.
private val openStreams = new ConcurrentHashMap[FSDataInputStream, Throwable]()
private val openStreams = mutable.Map.empty[FSDataInputStream, Throwable]
def clearOpenStreams(): Unit = {
def addOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized {
openStreams.put(stream, new Throwable())
}
def clearOpenStreams(): Unit = openStreams.synchronized {
openStreams.clear()
}
def assertNoOpenStreams(): Unit = {
val numOpen = openStreams.size()
def removeOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized {
openStreams.remove(stream)
}
def assertNoOpenStreams(): Unit = openStreams.synchronized {
val numOpen = openStreams.values.size
if (numOpen > 0) {
for (exc <- openStreams.values().asScala) {
for (exc <- openStreams.values) {
logWarning("Leaked filesystem connection created at:")
exc.printStackTrace()
}
throw new IllegalStateException(s"There are $numOpen possibly leaked file streams.",
openStreams.values().asScala.head)
openStreams.values.head)
}
}
}
......@@ -60,8 +67,7 @@ class DebugFilesystem extends LocalFileSystem {
override def open(f: Path, bufferSize: Int): FSDataInputStream = {
val wrapped: FSDataInputStream = super.open(f, bufferSize)
openStreams.put(wrapped, new Throwable())
addOpenStream(wrapped)
new FSDataInputStream(wrapped.getWrappedStream) {
override def setDropBehind(dropBehind: lang.Boolean): Unit = wrapped.setDropBehind(dropBehind)
......@@ -98,7 +104,7 @@ class DebugFilesystem extends LocalFileSystem {
override def close(): Unit = {
wrapped.close()
openStreams.remove(wrapped)
removeOpenStream(wrapped)
}
override def read(): Int = wrapped.read()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment