diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index a039d543c35e71da6ae6c6ea17834587adb7d6da..e8a1e35c3fc4834f97f48818ad6e64b54f38ee50 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -45,7 +45,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
   private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
   private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
 
-  private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
+  private val transportConf =
+    SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
   private val blockHandler = newShuffleBlockHandler(transportConf)
   private val transportContext: TransportContext =
     new TransportContext(transportConf, blockHandler, true)
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 70a42f9045e6bd2395e22641bf7770079cc5ee78..b0694e3c6c8af2c75f11bf653d53cdf8f73bc7ba 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -41,7 +41,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
   // TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
   private val serializer = new JavaSerializer(conf)
   private val authEnabled = securityManager.isAuthenticationEnabled()
-  private val transportConf = SparkTransportConf.fromSparkConf(conf, numCores)
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numCores)
 
   private[this] var transportContext: TransportContext = _
   private[this] var server: TransportServer = _
diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
index cef203006d685b72e01e099c7f0cf8fca21a4336..84833f59d7afecadf2334ca3b7d45432dc8cb35b 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -40,23 +40,23 @@ object SparkTransportConf {
 
   /**
    * Utility for creating a [[TransportConf]] from a [[SparkConf]].
+   * @param _conf the [[SparkConf]]
+   * @param module the module name
    * @param numUsableCores if nonzero, this will restrict the server and client threads to only
    *                       use the given number of cores, rather than all of the machine's cores.
    *                       This restriction will only occur if these properties are not already set.
    */
-  def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
+  def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
     val conf = _conf.clone
 
     // Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
     // assuming we have all the machine's cores).
     // NB: Only set if serverThreads/clientThreads not already set.
     val numThreads = defaultNumThreads(numUsableCores)
-    conf.set("spark.shuffle.io.serverThreads",
-      conf.get("spark.shuffle.io.serverThreads", numThreads.toString))
-    conf.set("spark.shuffle.io.clientThreads",
-      conf.get("spark.shuffle.io.clientThreads", numThreads.toString))
+    conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
+    conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
 
-    new TransportConf(new ConfigProvider {
+    new TransportConf(module, new ConfigProvider {
       override def get(name: String): String = conf.get(name)
     })
   }
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 09093819bb22c232aa31a367819eb7f7c541969a..3e0c4979695023c3876dd198fa06768de3bffd91 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -22,16 +22,13 @@ import java.net.{InetSocketAddress, URI}
 import java.nio.ByteBuffer
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy
+import javax.annotation.Nullable
 
-import scala.collection.mutable
 import scala.concurrent.{Future, Promise}
 import scala.reflect.ClassTag
 import scala.util.{DynamicVariable, Failure, Success}
 import scala.util.control.NonFatal
 
-import com.google.common.base.Preconditions
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.network.TransportContext
 import org.apache.spark.network.client._
@@ -49,7 +46,8 @@ private[netty] class NettyRpcEnv(
     securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
 
   private val transportConf = SparkTransportConf.fromSparkConf(
-    conf.clone.set("spark.shuffle.io.numConnectionsPerPeer", "1"),
+    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
+    "rpc",
     conf.getInt("spark.rpc.io.threads", 0))
 
   private val dispatcher: Dispatcher = new Dispatcher(this)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 2de9b6a6516925fd3fe6ec2b4d8ed7a0bcef64c6..7d08eae0b4871f69a138df099f8f3d2139cbfb92 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -109,7 +109,7 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
     if (shuffleServiceEnabled) {
       Some(new MesosExternalShuffleClient(
-        SparkTransportConf.fromSparkConf(conf),
+        SparkTransportConf.fromSparkConf(conf, "shuffle"),
         securityManager,
         securityManager.isAuthenticationEnabled(),
         securityManager.isSaslEncryptionEnabled()))
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index 39fadd87835180e160268db45f733d95a42a51fc..cc5f933393adf3b1a8a6444c02e5baaf5ee06eb2 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -46,7 +46,7 @@ private[spark] trait ShuffleWriterGroup {
 private[spark] class FileShuffleBlockResolver(conf: SparkConf)
   extends ShuffleBlockResolver with Logging {
 
-  private val transportConf = SparkTransportConf.fromSparkConf(conf)
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
 
   private lazy val blockManager = SparkEnv.get.blockManager
 
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 05b1eed7f3bef9ead03de6ee43ef83d683c7cff1..fadb8fe7ed0abd9bd33fa7d501058bb1162b26c3 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -47,7 +47,7 @@ private[spark] class IndexShuffleBlockResolver(
 
   private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager)
 
-  private val transportConf = SparkTransportConf.fromSparkConf(conf)
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
 
   def getDataFile(shuffleId: Int, mapId: Int): File = {
     blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 661c706af32b1a9dd7c7e29ba9b8dd7e70b9658f..ab0007fb7899383e37b82fa12c858a738534f12c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -122,7 +122,7 @@ private[spark] class BlockManager(
   // Client to read other executors' shuffle files. This is either an external service, or just the
   // standard BlockTransferService to directly connect to other Executors.
   private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
-    val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
+    val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
     new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
       securityManager.isSaslEncryptionEnabled())
   } else {
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 231f4631e0a47a48779eb9eb7b9443ae06ce0cc2..1c775bcb3d9c1df062ce049cf6f2c49c318b6373 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -35,7 +35,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
   var rpcHandler: ExternalShuffleBlockHandler = _
 
   override def beforeAll() {
-    val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2)
+    val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 2)
     rpcHandler = new ExternalShuffleBlockHandler(transportConf, null)
     val transportContext = new TransportContext(transportConf, rpcHandler)
     server = transportContext.createServer()
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 3b2eff377955ab3d31b0c84bff92762ebd8f3bbd..115135d44adbd935647d613ea2880ea3b5f45afb 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -23,18 +23,53 @@ import com.google.common.primitives.Ints;
  * A central location that tracks all the settings we expose to users.
  */
 public class TransportConf {
+
+  private final String SPARK_NETWORK_IO_MODE_KEY;
+  private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY;
+  private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY;
+  private final String SPARK_NETWORK_IO_BACKLOG_KEY;
+  private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY;
+  private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY;
+  private final String SPARK_NETWORK_IO_CLIENTTHREADS_KEY;
+  private final String SPARK_NETWORK_IO_RECEIVEBUFFER_KEY;
+  private final String SPARK_NETWORK_IO_SENDBUFFER_KEY;
+  private final String SPARK_NETWORK_SASL_TIMEOUT_KEY;
+  private final String SPARK_NETWORK_IO_MAXRETRIES_KEY;
+  private final String SPARK_NETWORK_IO_RETRYWAIT_KEY;
+  private final String SPARK_NETWORK_IO_LAZYFD_KEY;
+
   private final ConfigProvider conf;
 
-  public TransportConf(ConfigProvider conf) {
+  private final String module;
+
+  public TransportConf(String module, ConfigProvider conf) {
+    this.module = module;
     this.conf = conf;
+    SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
+    SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
+    SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
+    SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");
+    SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY =  getConfKey("io.numConnectionsPerPeer");
+    SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");
+    SPARK_NETWORK_IO_CLIENTTHREADS_KEY = getConfKey("io.clientThreads");
+    SPARK_NETWORK_IO_RECEIVEBUFFER_KEY = getConfKey("io.receiveBuffer");
+    SPARK_NETWORK_IO_SENDBUFFER_KEY = getConfKey("io.sendBuffer");
+    SPARK_NETWORK_SASL_TIMEOUT_KEY = getConfKey("sasl.timeout");
+    SPARK_NETWORK_IO_MAXRETRIES_KEY = getConfKey("io.maxRetries");
+    SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
+    SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
+  }
+
+  private String getConfKey(String suffix) {
+    return "spark." + module + "." + suffix;
   }
 
   /** IO mode: nio or epoll */
-  public String ioMode() { return conf.get("spark.shuffle.io.mode", "NIO").toUpperCase(); }
+  public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(); }
 
   /** If true, we will prefer allocating off-heap byte buffers within Netty. */
   public boolean preferDirectBufs() {
-    return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true);
+    return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true);
   }
 
   /** Connect timeout in milliseconds. Default 120 secs. */
@@ -42,23 +77,23 @@ public class TransportConf {
     long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
       conf.get("spark.network.timeout", "120s"));
     long defaultTimeoutMs = JavaUtils.timeStringAsSec(
-      conf.get("spark.shuffle.io.connectionTimeout", defaultNetworkTimeoutS + "s")) * 1000;
+      conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
     return (int) defaultTimeoutMs;
   }
 
   /** Number of concurrent connections between two nodes for fetching data. */
   public int numConnectionsPerPeer() {
-    return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 1);
+    return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
   }
 
   /** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */
-  public int backLog() { return conf.getInt("spark.shuffle.io.backLog", -1); }
+  public int backLog() { return conf.getInt(SPARK_NETWORK_IO_BACKLOG_KEY, -1); }
 
   /** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. */
-  public int serverThreads() { return conf.getInt("spark.shuffle.io.serverThreads", 0); }
+  public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }
 
   /** Number of threads used in the client thread pool. Default to 0, which is 2x#cores. */
-  public int clientThreads() { return conf.getInt("spark.shuffle.io.clientThreads", 0); }
+  public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0); }
 
   /**
    * Receive buffer size (SO_RCVBUF).
@@ -67,28 +102,28 @@ public class TransportConf {
    * Assuming latency = 1ms, network_bandwidth = 10Gbps
    *  buffer size should be ~ 1.25MB
    */
-  public int receiveBuf() { return conf.getInt("spark.shuffle.io.receiveBuffer", -1); }
+  public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); }
 
   /** Send buffer size (SO_SNDBUF). */
-  public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); }
+  public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); }
 
   /** Timeout for a single round trip of SASL token exchange, in milliseconds. */
   public int saslRTTimeoutMs() {
-    return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.sasl.timeout", "30s")) * 1000;
+    return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_SASL_TIMEOUT_KEY, "30s")) * 1000;
   }
 
   /**
    * Max number of times we will try IO exceptions (such as connection timeouts) per request.
    * If set to 0, we will not do any retries.
    */
-  public int maxIORetries() { return conf.getInt("spark.shuffle.io.maxRetries", 3); }
+  public int maxIORetries() { return conf.getInt(SPARK_NETWORK_IO_MAXRETRIES_KEY, 3); }
 
   /**
    * Time (in milliseconds) that we will wait in order to perform a retry after an IOException.
    * Only relevant if maxIORetries > 0.
    */
   public int ioRetryWaitTimeMs() {
-    return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.io.retryWait", "5s")) * 1000;
+    return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_IO_RETRYWAIT_KEY, "5s")) * 1000;
   }
 
   /**
@@ -101,11 +136,11 @@ public class TransportConf {
   }
 
   /**
-   * Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are
+   * Whether to initialize FileDescriptor lazily or not. If true, file descriptors are
    * created only when data is going to be transferred. This can reduce the number of open files.
    */
   public boolean lazyFileDescriptor() {
-    return conf.getBoolean("spark.shuffle.io.lazyFD", true);
+    return conf.getBoolean(SPARK_NETWORK_IO_LAZYFD_KEY, true);
   }
 
   /**
diff --git a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index dfb7740344ed0a9301a3840bd2153cc898e1de66..dc5fa1cee69bcecac94da39f60e186948b44a307 100644
--- a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -83,7 +83,7 @@ public class ChunkFetchIntegrationSuite {
     fp.write(fileContent);
     fp.close();
 
-    final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+    final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
 
     streamManager = new StreamManager() {
diff --git a/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
index 84ebb337e6d54165d393ff19fa9bd09105f57666..42955ef69235a926ef205bb491ba53719f3590b4 100644
--- a/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
@@ -60,7 +60,7 @@ public class RequestTimeoutIntegrationSuite {
   public void setUp() throws Exception {
     Map<String, String> configMap = Maps.newHashMap();
     configMap.put("spark.shuffle.io.connectionTimeout", "2s");
-    conf = new TransportConf(new MapConfigProvider(configMap));
+    conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
 
     defaultManager = new StreamManager() {
       @Override
diff --git a/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
index 64b457b4b3f016320ba4b2a0aa0ef88ae3528076..8eb56bdd9846f9501e80caef0f0c38246f3c475e 100644
--- a/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
@@ -49,7 +49,7 @@ public class RpcIntegrationSuite {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+    TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     rpcHandler = new RpcHandler() {
       @Override
       public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) {
diff --git a/network/common/src/test/java/org/apache/spark/network/StreamSuite.java b/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
index 6dcec831dec71b56bb436b000a9a02a5d333bdce..00158fd0816262806936c8fa034b9821aed995e4 100644
--- a/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
@@ -89,7 +89,7 @@ public class StreamSuite {
       fp.close();
     }
 
-    final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+    final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     final StreamManager streamManager = new StreamManager() {
       @Override
       public ManagedBuffer getChunk(long streamId, int chunkIndex) {
diff --git a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
index f4471374193060d05464a47c33a4be5445a2505d..dac7d4a5b0a09e4eb211cd18eca9d09b415a2602 100644
--- a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
@@ -52,7 +52,7 @@ public class TransportClientFactorySuite {
 
   @Before
   public void setUp() {
-    conf = new TransportConf(new SystemPropertyConfigProvider());
+    conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     RpcHandler rpcHandler = new NoOpRpcHandler();
     context = new TransportContext(conf, rpcHandler);
     server1 = context.createServer();
@@ -76,7 +76,7 @@ public class TransportClientFactorySuite {
 
     Map<String, String> configMap = Maps.newHashMap();
     configMap.put("spark.shuffle.io.numConnectionsPerPeer", Integer.toString(maxConnections));
-    TransportConf conf = new TransportConf(new MapConfigProvider(configMap));
+    TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
 
     RpcHandler rpcHandler = new NoOpRpcHandler();
     TransportContext context = new TransportContext(conf, rpcHandler);
@@ -182,7 +182,7 @@ public class TransportClientFactorySuite {
 
   @Test
   public void closeIdleConnectionForRequestTimeOut() throws IOException, InterruptedException {
-    TransportConf conf = new TransportConf(new ConfigProvider() {
+    TransportConf conf = new TransportConf("shuffle", new ConfigProvider() {
 
       @Override
       public String get(String name) {
diff --git a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index 3469e84e7f4dab11deecd2b503422b5a89b24987..b1468996701807d488ae64247f84f7cd40bf77d4 100644
--- a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -207,7 +207,7 @@ public class SparkSaslSuite {
   public void testEncryptedMessageChunking() throws Exception {
     File file = File.createTempFile("sasltest", ".txt");
     try {
-      TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+      TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
 
       byte[] data = new byte[8 * 1024];
       new Random().nextBytes(data);
@@ -242,7 +242,7 @@ public class SparkSaslSuite {
     final File file = File.createTempFile("sasltest", ".txt");
     SaslTestCtx ctx = null;
     try {
-      final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+      final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
       StreamManager sm = mock(StreamManager.class);
       when(sm.getChunk(anyLong(), anyInt())).thenAnswer(new Answer<ManagedBuffer>() {
           @Override
@@ -368,7 +368,7 @@ public class SparkSaslSuite {
         boolean disableClientEncryption)
       throws Exception {
 
-      TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+      TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
 
       SecretKeyHolder keyHolder = mock(SecretKeyHolder.class);
       when(keyHolder.getSaslUser(anyString())).thenReturn("user");
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index c393a5e1e68101f8cc2d85916470b0975e9f4e84..1c2fa4d0d462cf71ca041df2efc87705033a0a49 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -70,7 +70,7 @@ public class SaslIntegrationSuite {
 
   @BeforeClass
   public static void beforeAll() throws IOException {
-    conf = new TransportConf(new SystemPropertyConfigProvider());
+    conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     context = new TransportContext(conf, new TestRpcHandler());
 
     secretKeyHolder = mock(SecretKeyHolder.class);
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index 3c6cb367dea46c678216ea199cd9fd00b8f75904..a9958232a1d2826d6ef59ee5d09e39d92a9ae8cb 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -42,7 +42,7 @@ public class ExternalShuffleBlockResolverSuite {
 
   static TestShuffleDataContext dataContext;
 
-  static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+  static TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
 
   @BeforeClass
   public static void beforeAll() throws IOException {
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index 2f4f1d0df478b30b38e2725fe9490fec5de41866..532d7ab8d01bd60e608f4d0a658d157e68a76034 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -35,7 +35,7 @@ public class ExternalShuffleCleanupSuite {
 
   // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
   Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
-  TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+  TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
 
   @Test
   public void noCleanupAndCleanup() throws IOException {
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index a3f9a38b1aeb94d04c909195cf9d92628dbca60c..2095f41d79c16fc0fc2db79d19dbcc94690502e3 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -91,7 +91,7 @@ public class ExternalShuffleIntegrationSuite {
     dataContext1.create();
     dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
 
-    conf = new TransportConf(new SystemPropertyConfigProvider());
+    conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     handler = new ExternalShuffleBlockHandler(conf, null);
     TransportContext transportContext = new TransportContext(conf, handler);
     server = transportContext.createServer();
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index aa99efda9494893dda46712ecc51aad810edecb9..08ddb3755bd08fd412d7a5e8cd5bdd5af7d641aa 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -39,7 +39,7 @@ import org.apache.spark.network.util.TransportConf;
 
 public class ExternalShuffleSecuritySuite {
 
-  TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+  TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
   TransportServer server;
 
   @Before
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
index 06e46f92410949cd68368476a2fc860497498093..3a6ef0d3f84763ddcd779b77af6934e22bef234e 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
@@ -254,7 +254,7 @@ public class RetryingBlockFetcherSuite {
                                           BlockFetchingListener listener)
     throws IOException {
 
-    TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+    TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);
 
     Stubber stub = null;
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 11ea7f3fd3cfeadf78cb00e03484e3a1bffd04a8..ba6d30a74c673fee7ee3141a53b8a90f24ed78a8 100644
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -120,7 +120,7 @@ public class YarnShuffleService extends AuxiliaryService {
     registeredExecutorFile =
       findRegisteredExecutorFile(conf.getStrings("yarn.nodemanager.local-dirs"));
 
-    TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
+    TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
     // If authentication is enabled, set up the shuffle server to use a
     // special RPC handler that filters out unauthenticated fetch requests
     boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);