Skip to content
Snippets Groups Projects
Commit 5936bf9f authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Sean Owen
Browse files

[SPARK-12961][CORE] Prevent snappy-java memory leak

JIRA: https://issues.apache.org/jira/browse/SPARK-12961

To prevent memory leak in snappy-java, just call the method once and cache the result. After the library releases new version, we can remove this object.

JoshRosen

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #10875 from viirya/prevent-snappy-memory-leak.
parent 6743de3a
No related branches found
No related tags found
No related merge requests found
...@@ -149,12 +149,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { ...@@ -149,12 +149,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
*/ */
@DeveloperApi @DeveloperApi
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
val version = SnappyCompressionCodec.version
try {
Snappy.getNativeLibraryVersion
} catch {
case e: Error => throw new IllegalArgumentException(e)
}
override def compressedOutputStream(s: OutputStream): OutputStream = { override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
...@@ -164,6 +159,19 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { ...@@ -164,6 +159,19 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
} }
/**
* Object guards against memory leak bug in snappy-java library:
* (https://github.com/xerial/snappy-java/issues/131).
* Before a new version of the library, we only call the method once and cache the result.
*/
private final object SnappyCompressionCodec {
private lazy val version: String = try {
Snappy.getNativeLibraryVersion
} catch {
case e: Error => throw new IllegalArgumentException(e)
}
}
/** /**
* Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close * Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
* issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment