diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0756cdb2ed8e6124ed8b94be8e09486c072defef..0d8ac1f80a9f47e6a26ec51a676b3acd95d3c23f 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.io
 
-import java.io.{InputStream, OutputStream}
+import java.io.{IOException, InputStream, OutputStream}
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
@@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
     val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
-    new SnappyOutputStream(s, blockSize)
+    new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
   }
 
   override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
 }
+
+/**
+ * Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
+ * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
+ * of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107.
+ */
+private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {
+
+  private[this] var closed: Boolean = false
+
+  override def write(b: Int): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.write(b)
+  }
+
+  override def write(b: Array[Byte]): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.write(b)
+  }
+
+  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.write(b, off, len)
+  }
+
+  override def flush(): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.flush()
+  }
+
+  override def close(): Unit = {
+    if (!closed) {
+      closed = true
+      os.close()
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 78e52643531e0823a6ce19cc1d13545896108d31..730d265c87f882b048caa3c82e34305b2155078c 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -35,7 +35,6 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.xerial.snappy.buffer.CachedBufferAllocator;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.lessThan;
@@ -97,13 +96,6 @@ public class UnsafeShuffleWriterSuite {
   @After
   public void tearDown() {
     Utils.deleteRecursively(tempDir);
-    // This call is a workaround for SPARK-7660, a snappy-java bug which is exposed by this test
-    // suite. Clearing the cached buffer allocator's pool of reusable buffers masks this bug,
-    // preventing a test failure in JavaAPISuite that would otherwise occur. The underlying bug
-    // needs to be fixed, but in the meantime this workaround avoids spurious Jenkins failures.
-    synchronized (CachedBufferAllocator.class) {
-      CachedBufferAllocator.queueTable.clear();
-    }
     final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
     if (leakedMemory != 0) {
       fail("Test leaked " + leakedMemory + " bytes of managed memory");