diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
index 88118e2837741f7b880be82d32cf97eef4673e78..d044e1d01d429f505f98c49c9bc3232d3b438c88 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
@@ -40,7 +40,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
   private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
 
   private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
-  private val blockHandler = new ExternalShuffleBlockHandler()
+  private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
   private val transportContext: TransportContext = {
     val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
     new TransportContext(transportConf, handler)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index f03e8e4bf1b7eb6dda3b445c3b5e56f01a6749e8..7de2f9cbb2866acf954f070a50de1e09684a53d8 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConversions._
 import org.apache.spark.{Logging, SparkConf, SparkEnv}
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
+import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
 import org.apache.spark.storage._
@@ -68,6 +69,8 @@ private[spark]
 class FileShuffleBlockManager(conf: SparkConf)
   extends ShuffleBlockManager with Logging {
 
+  private val transportConf = SparkTransportConf.fromSparkConf(conf)
+
   private lazy val blockManager = SparkEnv.get.blockManager
 
   // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@@ -182,13 +185,14 @@ class FileShuffleBlockManager(conf: SparkConf)
         val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
         if (segmentOpt.isDefined) {
           val segment = segmentOpt.get
-          return new FileSegmentManagedBuffer(segment.file, segment.offset, segment.length)
+          return new FileSegmentManagedBuffer(
+            transportConf, segment.file, segment.offset, segment.length)
         }
       }
       throw new IllegalStateException("Failed to find shuffle block: " + blockId)
     } else {
       val file = blockManager.diskBlockManager.getFile(blockId)
-      new FileSegmentManagedBuffer(file, 0, file.length)
+      new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index a48f0c9eceb5e51b0d59a63aee29be3f93def122..b292587d37028eb7b7dacabf3becbbb81dd9f054 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -22,8 +22,9 @@ import java.nio.ByteBuffer
 
 import com.google.common.io.ByteStreams
 
-import org.apache.spark.SparkEnv
+import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
+import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.storage._
 
 /**
@@ -38,10 +39,12 @@ import org.apache.spark.storage._
 // Note: Changes to the format in this file should be kept in sync with
 // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
 private[spark]
-class IndexShuffleBlockManager extends ShuffleBlockManager {
+class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
 
   private lazy val blockManager = SparkEnv.get.blockManager
 
+  private val transportConf = SparkTransportConf.fromSparkConf(conf)
+
   /**
    * Mapping to a single shuffleBlockId with reduce ID 0.
    * */
@@ -109,6 +112,7 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {
       val offset = in.readLong()
       val nextOffset = in.readLong()
       new FileSegmentManagedBuffer(
+        transportConf,
         getDataFile(blockId.shuffleId, blockId.mapId),
         offset,
         nextOffset - offset)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index b727438ae7e47c13daf8f451e46db1c011f2f96c..bda30a56d808e3278e6de3057ebdf4d265b753d2 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader
 
 private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {
 
-  private val indexShuffleBlockManager = new IndexShuffleBlockManager()
+  private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
   private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 6608ed1e57b383b1de0f017d8a9510728688fe8e..9623d665177ef848986cff489addec87362da1ff 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -39,7 +39,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
 
   override def beforeAll() {
     val transportConf = SparkTransportConf.fromSparkConf(conf)
-    rpcHandler = new ExternalShuffleBlockHandler()
+    rpcHandler = new ExternalShuffleBlockHandler(transportConf)
     val transportContext = new TransportContext(transportConf, rpcHandler)
     server = transportContext.createServer()
 
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
index 5fa1527ddff926dc428e9024bc80cadef141373c..844eff4f4c701d433dd51017609f5007cc1e1a16 100644
--- a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -31,24 +31,19 @@ import io.netty.channel.DefaultFileRegion;
 
 import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.network.util.LimitedInputStream;
+import org.apache.spark.network.util.TransportConf;
 
 /**
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-
-  /**
-   * Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
-   * Avoid unless there's a good reason not to.
-   */
-  // TODO: Make this configurable
-  private static final long MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;
-
+  private final TransportConf conf;
   private final File file;
   private final long offset;
   private final long length;
 
-  public FileSegmentManagedBuffer(File file, long offset, long length) {
+  public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
+    this.conf = conf;
     this.file = file;
     this.offset = offset;
     this.length = length;
@@ -65,7 +60,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
     try {
       channel = new RandomAccessFile(file, "r").getChannel();
       // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
-      if (length < MIN_MEMORY_MAP_BYTES) {
+      if (length < conf.memoryMapBytes()) {
         ByteBuffer buf = ByteBuffer.allocate((int) length);
         channel.position(offset);
         while (buf.remaining() != 0) {
@@ -134,8 +129,12 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
 
   @Override
   public Object convertToNetty() throws IOException {
-    FileChannel fileChannel = new FileInputStream(file).getChannel();
-    return new DefaultFileRegion(fileChannel, offset, length);
+    if (conf.lazyFileDescriptor()) {
+      return new LazyFileRegion(file, offset, length);
+    } else {
+      FileChannel fileChannel = new FileInputStream(file).getChannel();
+      return new DefaultFileRegion(fileChannel, offset, length);
+    }
   }
 
   public File getFile() { return file; }
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java b/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
new file mode 100644
index 0000000000000000000000000000000000000000..81bc8ec40fc82d59479c1b91dd4a20d3998add06
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.FileInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+import com.google.common.base.Objects;
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+import org.apache.spark.network.util.JavaUtils;
+
+/**
+ * A FileRegion implementation that only creates the file descriptor when the region is being
+ * transferred. This cannot be used with Epoll because there is no native support for it.
+ * 
+ * This is mostly copied from DefaultFileRegion implementation in Netty. In the future, we
+ * should push this into Netty so the native Epoll transport can support this feature.
+ */
+public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {
+
+  private final File file;
+  private final long position;
+  private final long count;
+
+  private FileChannel channel;
+
+  private long numBytesTransferred = 0L;
+
+  /**
+   * @param file file to transfer.
+   * @param position start position for the transfer.
+   * @param count number of bytes to transfer starting from position.
+   */
+  public LazyFileRegion(File file, long position, long count) {
+    this.file = file;
+    this.position = position;
+    this.count = count;
+  }
+
+  @Override
+  protected void deallocate() {
+    JavaUtils.closeQuietly(channel);
+  }
+
+  @Override
+  public long position() {
+    return position;
+  }
+
+  @Override
+  public long transfered() {
+    return numBytesTransferred;
+  }
+
+  @Override
+  public long count() {
+    return count;
+  }
+
+  @Override
+  public long transferTo(WritableByteChannel target, long position) throws IOException {
+    if (channel == null) {
+      channel = new FileInputStream(file).getChannel();
+    }
+
+    long count = this.count - position;
+    if (count < 0 || position < 0) {
+      throw new IllegalArgumentException(
+          "position out of range: " + position + " (expected: 0 - " + (count - 1) + ')');
+    }
+
+    if (count == 0) {
+      return 0L;
+    }
+
+    long written = channel.transferTo(this.position + position, count, target);
+    if (written > 0) {
+      numBytesTransferred += written;
+    }
+    return written;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("file", file)
+        .add("position", position)
+        .add("count", count)
+        .toString();
+  }
+}
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 787a8f0031af1023fd9edee0d51c0776b7c7e8b4..621427d8cba5ebab967b08e3067f6bbfdb7b62c7 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -75,4 +75,21 @@ public class TransportConf {
    * Only relevant if maxIORetries > 0.
    */
   public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); }
+
+  /**
+   * Minimum size of a block that we should start using memory map rather than reading in through
+   * normal IO operations. This prevents Spark from memory mapping very small blocks. In general,
+   * memory mapping has high overhead for blocks close to or below the page size of the OS.
+   */
+  public int memoryMapBytes() {
+    return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024);
+  }
+
+  /**
+   * Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are
+   * created only when data is going to be transferred. This can reduce the number of open files.
+   */
+  public boolean lazyFileDescriptor() {
+    return conf.getBoolean("spark.shuffle.io.lazyFD", true);
+  }
 }
diff --git a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index c4158833976aa6f99f068bb35fa16499cf0c0816..dfb7740344ed0a9301a3840bd2153cc898e1de66 100644
--- a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -63,6 +63,8 @@ public class ChunkFetchIntegrationSuite {
   static ManagedBuffer bufferChunk;
   static ManagedBuffer fileChunk;
 
+  private TransportConf transportConf;
+
   @BeforeClass
   public static void setUp() throws Exception {
     int bufSize = 100000;
@@ -80,9 +82,10 @@ public class ChunkFetchIntegrationSuite {
     new Random().nextBytes(fileContent);
     fp.write(fileContent);
     fp.close();
-    fileChunk = new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25);
 
-    TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+    final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+    fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
+
     streamManager = new StreamManager() {
       @Override
       public ManagedBuffer getChunk(long streamId, int chunkIndex) {
@@ -90,7 +93,7 @@ public class ChunkFetchIntegrationSuite {
         if (chunkIndex == BUFFER_CHUNK_INDEX) {
           return new NioManagedBuffer(buf);
         } else if (chunkIndex == FILE_CHUNK_INDEX) {
-          return new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25);
+          return new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
         } else {
           throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);
         }
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index a6db4b2abd6c9e19d359e01606b6729dea89b597..46ca9708621b94548598bda8e4e5b2074259ff2e 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import org.apache.spark.network.util.TransportConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,8 +49,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
   private final ExternalShuffleBlockManager blockManager;
   private final OneForOneStreamManager streamManager;
 
-  public ExternalShuffleBlockHandler() {
-    this(new OneForOneStreamManager(), new ExternalShuffleBlockManager());
+  public ExternalShuffleBlockHandler(TransportConf conf) {
+    this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
   }
 
   /** Enables mocking out the StreamManager and BlockManager. */
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
index ffb7faa3dbdca4358a13820ec166f80844bc5bcb..dfe0ba059509098fde1d1c7626e04901c5594716 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
@@ -37,6 +37,7 @@ import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
 import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
 import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.TransportConf;
 
 /**
  * Manages converting shuffle BlockIds into physical segments of local files, from a process outside
@@ -56,14 +57,17 @@ public class ExternalShuffleBlockManager {
   // Single-threaded Java executor used to perform expensive recursive directory deletion.
   private final Executor directoryCleaner;
 
-  public ExternalShuffleBlockManager() {
+  private final TransportConf conf;
+
+  public ExternalShuffleBlockManager(TransportConf conf) {
     // TODO: Give this thread a name.
-    this(Executors.newSingleThreadExecutor());
+    this(conf, Executors.newSingleThreadExecutor());
   }
 
   // Allows tests to have more control over when directories are cleaned up.
   @VisibleForTesting
-  ExternalShuffleBlockManager(Executor directoryCleaner) {
+  ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) {
+    this.conf = conf;
     this.executors = Maps.newConcurrentMap();
     this.directoryCleaner = directoryCleaner;
   }
@@ -167,7 +171,7 @@ public class ExternalShuffleBlockManager {
   // TODO: Support consolidated hash shuffle files
   private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
     File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
-    return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length());
+    return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
   }
 
   /**
@@ -187,6 +191,7 @@ public class ExternalShuffleBlockManager {
       long offset = in.readLong();
       long nextOffset = in.readLong();
       return new FileSegmentManagedBuffer(
+        conf,
         getFile(executor.localDirs, executor.subDirsPerLocalDir,
           "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
         offset,
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
index da54797e8923cafd1257bc72e7e8485f10009bd3..dad6428a836fc4b2714f3573084edb8a7c3cb67c 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
@@ -22,6 +22,8 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 
 import com.google.common.io.CharStreams;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -37,6 +39,8 @@ public class ExternalShuffleBlockManagerSuite {
 
   static TestShuffleDataContext dataContext;
 
+  static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+
   @BeforeClass
   public static void beforeAll() throws IOException {
     dataContext = new TestShuffleDataContext(2, 5);
@@ -56,7 +60,7 @@ public class ExternalShuffleBlockManagerSuite {
 
   @Test
   public void testBadRequests() {
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
     // Unregistered executor
     try {
       manager.getBlockData("app0", "exec1", "shuffle_1_1_0");
@@ -87,7 +91,7 @@ public class ExternalShuffleBlockManagerSuite {
 
   @Test
   public void testSortShuffleBlocks() throws IOException {
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
     manager.registerExecutor("app0", "exec0",
       dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
 
@@ -106,7 +110,7 @@ public class ExternalShuffleBlockManagerSuite {
 
   @Test
   public void testHashShuffleBlocks() throws IOException {
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
     manager.registerExecutor("app0", "exec0",
       dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
 
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index c8ece3bc53ac385ccac044537634dba73623258c..254e3a7a32b983fe5ccf8265209e5761052291f4 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -25,20 +25,23 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.Test;
-
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
 public class ExternalShuffleCleanupSuite {
 
   // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
   Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+  TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
 
   @Test
   public void noCleanupAndCleanup() throws IOException {
     TestShuffleDataContext dataContext = createSomeData();
 
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
     manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
     manager.applicationRemoved("app", false /* cleanup */);
 
@@ -61,7 +64,7 @@ public class ExternalShuffleCleanupSuite {
       @Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
     };
 
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(noThreadExecutor);
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, noThreadExecutor);
 
     manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
     manager.applicationRemoved("app", true);
@@ -78,7 +81,7 @@ public class ExternalShuffleCleanupSuite {
     TestShuffleDataContext dataContext0 = createSomeData();
     TestShuffleDataContext dataContext1 = createSomeData();
 
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
 
     manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
     manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
@@ -93,7 +96,7 @@ public class ExternalShuffleCleanupSuite {
     TestShuffleDataContext dataContext0 = createSomeData();
     TestShuffleDataContext dataContext1 = createSomeData();
 
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
 
     manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
     manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 687bde59fdae49f42d202acbc0a30858166c760d..02c10bcb7b2612237a1727188061646b91c1c1ce 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -92,7 +92,7 @@ public class ExternalShuffleIntegrationSuite {
     dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
 
     conf = new TransportConf(new SystemPropertyConfigProvider());
-    handler = new ExternalShuffleBlockHandler();
+    handler = new ExternalShuffleBlockHandler(conf);
     TransportContext transportContext = new TransportContext(conf, handler);
     server = transportContext.createServer();
   }
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index 8afceab1d585af9896d2bff9a00b7c0b30c9123e..759a12910c94d42c2ab62d5bc84a37d1fc35397d 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -42,7 +42,7 @@ public class ExternalShuffleSecuritySuite {
 
   @Before
   public void beforeEach() {
-    RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(),
+    RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(conf),
       new TestSecretKeyHolder("my-app-id", "secret"));
     TransportContext context = new TransportContext(conf, handler);
     this.server = context.createServer();
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index bb0b8f7e6cba6e353c1cdc9830333299ea57b651..a34aabe9e78a64af0086aa470205ea5007b4fb1c 100644
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -95,10 +95,11 @@ public class YarnShuffleService extends AuxiliaryService {
    */
   @Override
   protected void serviceInit(Configuration conf) {
+    TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
     // If authentication is enabled, set up the shuffle server to use a
     // special RPC handler that filters out unauthenticated fetch requests
     boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
-    RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
+    RpcHandler rpcHandler = new ExternalShuffleBlockHandler(transportConf);
     if (authEnabled) {
       secretManager = new ShuffleSecretManager();
       rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
@@ -106,7 +107,6 @@ public class YarnShuffleService extends AuxiliaryService {
 
     int port = conf.getInt(
       SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
-    TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
     TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
     shuffleServer = transportContext.createServer(port);
     String authEnabledString = authEnabled ? "enabled" : "not enabled";