From f2cc6b5bccc3a70fd7d69183b1a068800831fe19 Mon Sep 17 00:00:00 2001
From: Josh Rosen <joshrosen@databricks.com>
Date: Sun, 17 May 2015 09:30:49 -0700
Subject: [PATCH] [SPARK-7660] Wrap SnappyOutputStream to work around
 snappy-java bug

This patch wraps `SnappyOutputStream` to ensure that `close()` is idempotent and to guard against write-after-`close()` bugs. This is a workaround for https://github.com/xerial/snappy-java/issues/107, a bug where a non-idempotent `close()` method can lead to stream corruption. We can remove this workaround if we upgrade to a snappy-java version that contains my fix for this bug, but in the meantime this patch offers a backportable Spark fix.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #6176 from JoshRosen/SPARK-7660-wrap-snappy and squashes the following commits:

8b77aae [Josh Rosen] Wrap SnappyOutputStream to fix SPARK-7660
---
 .../apache/spark/io/CompressionCodec.scala    | 49 ++++++++++++++++++-
 .../unsafe/UnsafeShuffleWriterSuite.java      |  8 ---
 2 files changed, 47 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0756cdb2ed..0d8ac1f80a 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.io
 
-import java.io.{InputStream, OutputStream}
+import java.io.{IOException, InputStream, OutputStream}
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
@@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
     val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
-    new SnappyOutputStream(s, blockSize)
+    new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
   }
 
   override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
 }
+
+/**
+ * Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
+ * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
+ * of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107.
+ */
+private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {
+
+  private[this] var closed: Boolean = false
+
+  override def write(b: Int): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.write(b)
+  }
+
+  override def write(b: Array[Byte]): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.write(b)
+  }
+
+  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.write(b, off, len)
+  }
+
+  override def flush(): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.flush()
+  }
+
+  override def close(): Unit = {
+    if (!closed) {
+      closed = true
+      os.close()
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 78e5264353..730d265c87 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -35,7 +35,6 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.xerial.snappy.buffer.CachedBufferAllocator;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.lessThan;
@@ -97,13 +96,6 @@ public class UnsafeShuffleWriterSuite {
   @After
   public void tearDown() {
     Utils.deleteRecursively(tempDir);
-    // This call is a workaround for SPARK-7660, a snappy-java bug which is exposed by this test
-    // suite. Clearing the cached buffer allocator's pool of reusable buffers masks this bug,
-    // preventing a test failure in JavaAPISuite that would otherwise occur. The underlying bug
-    // needs to be fixed, but in the meantime this workaround avoids spurious Jenkins failures.
-    synchronized (CachedBufferAllocator.class) {
-      CachedBufferAllocator.queueTable.clear();
-    }
     final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
     if (leakedMemory != 0) {
       fail("Test leaked " + leakedMemory + " bytes of managed memory");
-- 
GitLab