diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index 1d588c37c5db048ac68ea96adac202c6233cb7af..d048cf7aeb5f18aad0a226d9ee06b2d64b78dfd3 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -22,15 +22,21 @@ import java.io.*;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Closeables;
 
+import org.apache.spark.SparkEnv;
 import org.apache.spark.serializer.SerializerManager;
 import org.apache.spark.storage.BlockId;
 import org.apache.spark.unsafe.Platform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description
  * of the file format).
  */
 public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
+  private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class);
+  private static final int DEFAULT_BUFFER_SIZE_BYTES = 1024 * 1024; // 1 MB
+  private static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb
 
   private InputStream in;
   private DataInputStream din;
@@ -50,7 +56,21 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
       File file,
       BlockId blockId) throws IOException {
     assert (file.length() > 0);
-    final BufferedInputStream bs = new BufferedInputStream(new FileInputStream(file));
+    long bufferSizeBytes =
+        SparkEnv.get() == null ?
+            DEFAULT_BUFFER_SIZE_BYTES:
+            SparkEnv.get().conf().getSizeAsBytes("spark.unsafe.sorter.spill.reader.buffer.size",
+                                                 DEFAULT_BUFFER_SIZE_BYTES);
+    if (bufferSizeBytes > MAX_BUFFER_SIZE_BYTES || bufferSizeBytes < DEFAULT_BUFFER_SIZE_BYTES) {
+      // fall back to a sane default value
+      logger.warn("Value of config \"spark.unsafe.sorter.spill.reader.buffer.size\" = {} not in " +
+                      "allowed range [{}, {}). Falling back to default value : {} bytes", bufferSizeBytes,
+                  DEFAULT_BUFFER_SIZE_BYTES, MAX_BUFFER_SIZE_BYTES, DEFAULT_BUFFER_SIZE_BYTES);
+      bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
+    }
+
+    final BufferedInputStream bs =
+        new BufferedInputStream(new FileInputStream(file), (int) bufferSizeBytes);
     try {
       this.in = serializerManager.wrapForCompression(blockId, bs);
       this.din = new DataInputStream(this.in);