diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index df4b085d2251e468bad195883cdfd1546a20d4bc..243b71c980864f0d9d1841921f7410ac1ddf2385 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -83,9 +83,21 @@ private[nio] class ConnectionManager(
 
   private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)
 
+  // Get the thread counts from the Spark Configuration.
+  // 
+  // Even though the ThreadPoolExecutor constructor takes both a minimum and maximum value,
+  // we only query for the minimum value because we are using LinkedBlockingDeque.
+  // 
+  // The JavaDoc for ThreadPoolExecutor points out that when using a LinkedBlockingDeque (which is 
+  // an unbounded queue) no more than corePoolSize threads will ever be created, so only the "min"
+  // parameter is necessary.
+  private val handlerThreadCount = conf.getInt("spark.core.connection.handler.threads.min", 20)
+  private val ioThreadCount = conf.getInt("spark.core.connection.io.threads.min", 4)
+  private val connectThreadCount = conf.getInt("spark.core.connection.connect.threads.min", 1)
+
   private val handleMessageExecutor = new ThreadPoolExecutor(
-    conf.getInt("spark.core.connection.handler.threads.min", 20),
-    conf.getInt("spark.core.connection.handler.threads.max", 60),
+    handlerThreadCount,
+    handlerThreadCount,
     conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
     Utils.namedThreadFactory("handle-message-executor")) {
@@ -96,12 +108,11 @@ private[nio] class ConnectionManager(
         logError("Error in handleMessageExecutor is not handled properly", t)
       }
     }
-
   }
 
   private val handleReadWriteExecutor = new ThreadPoolExecutor(
-    conf.getInt("spark.core.connection.io.threads.min", 4),
-    conf.getInt("spark.core.connection.io.threads.max", 32),
+    ioThreadCount,
+    ioThreadCount,
     conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
     Utils.namedThreadFactory("handle-read-write-executor")) {
@@ -112,14 +123,13 @@ private[nio] class ConnectionManager(
         logError("Error in handleReadWriteExecutor is not handled properly", t)
       }
     }
-
   }
 
   // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks :
   // which should be executed asap
   private val handleConnectExecutor = new ThreadPoolExecutor(
-    conf.getInt("spark.core.connection.connect.threads.min", 1),
-    conf.getInt("spark.core.connection.connect.threads.max", 8),
+    connectThreadCount,
+    connectThreadCount,
     conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable](),
     Utils.namedThreadFactory("handle-connect-executor")) {
@@ -130,7 +140,6 @@ private[nio] class ConnectionManager(
         logError("Error in handleConnectExecutor is not handled properly", t)
       }
     }
-
   }
 
   private val serverChannel = ServerSocketChannel.open()