Skip to content
Snippets Groups Projects
Commit 7c35746c authored by Davies Liu's avatar Davies Liu Committed by Reynold Xin
Browse files

[SPARK-9827] [SQL] fix fd leak in UnsafeRowSerializer

Currently, UnsafeRowSerializer does not close the InputStream, will cause fd leak if the InputStream has an open fd in it.

TODO: the fd could still be leaked, if any items in the stream is not consumed. Currently it replies on GC to close the fd in this case.

cc JoshRosen

Author: Davies Liu <davies@databricks.com>

Closes #8116 from davies/fd_leak.
parent 7b13ed27
No related branches found
No related tags found
No related merge requests found
......@@ -108,6 +108,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
override def asKeyValueIterator: Iterator[(Int, UnsafeRow)] = {
new Iterator[(Int, UnsafeRow)] {
private[this] var rowSize: Int = dIn.readInt()
if (rowSize == EOF) dIn.close()
override def hasNext: Boolean = rowSize != EOF
......@@ -119,6 +120,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, numFields, rowSize)
rowSize = dIn.readInt() // read the next row's size
if (rowSize == EOF) { // We are returning the last row in this stream
dIn.close()
val _rowTuple = rowTuple
// Null these out so that the byte array can be garbage collected once the entire
// iterator has been consumed
......
......@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.io.{DataOutputStream, ByteArrayInputStream, ByteArrayOutputStream}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
......@@ -25,6 +25,18 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.types._
/**
* used to test close InputStream in UnsafeRowSerializer
*/
class ClosableByteArrayInputStream(buf: Array[Byte]) extends ByteArrayInputStream(buf) {
var closed: Boolean = false
override def close(): Unit = {
closed = true
super.close()
}
}
class UnsafeRowSerializerSuite extends SparkFunSuite {
private def toUnsafeRow(row: Row, schema: Array[DataType]): UnsafeRow = {
......@@ -52,8 +64,8 @@ class UnsafeRowSerializerSuite extends SparkFunSuite {
serializerStream.writeValue(unsafeRow)
}
serializerStream.close()
val deserializerIter = serializer.deserializeStream(
new ByteArrayInputStream(baos.toByteArray)).asKeyValueIterator
val input = new ClosableByteArrayInputStream(baos.toByteArray)
val deserializerIter = serializer.deserializeStream(input).asKeyValueIterator
for (expectedRow <- unsafeRows) {
val actualRow = deserializerIter.next().asInstanceOf[(Integer, UnsafeRow)]._2
assert(expectedRow.getSizeInBytes === actualRow.getSizeInBytes)
......@@ -61,5 +73,18 @@ class UnsafeRowSerializerSuite extends SparkFunSuite {
assert(expectedRow.getInt(1) === actualRow.getInt(1))
}
assert(!deserializerIter.hasNext)
assert(input.closed)
}
test("close empty input stream") {
val baos = new ByteArrayOutputStream()
val dout = new DataOutputStream(baos)
dout.writeInt(-1) // EOF
dout.flush()
val input = new ClosableByteArrayInputStream(baos.toByteArray)
val serializer = new UnsafeRowSerializer(numFields = 2).newInstance()
val deserializerIter = serializer.deserializeStream(input).asKeyValueIterator
assert(!deserializerIter.hasNext)
assert(input.closed)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment