Skip to content
Snippets Groups Projects
Commit f5f2d273 authored by Aaron Davidson's avatar Aaron Davidson Committed by Patrick Wendell
Browse files

[SPARK-4516] Cap default number of Netty threads at 8

In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes at a premium.

Thus, this value should still retain maximum throughput and reduce wasted off-heap memory allocation. It can be overridden by setting the number of serverThreads and clientThreads manually in Spark's configuration.

Author: Aaron Davidson <aaron@databricks.com>

Closes #3469 from aarondav/fewer-pools2 and squashes the following commits:

087c59f [Aaron Davidson] [SPARK-4516] Cap default number of Netty threads at 8
parent b5fb1410
No related branches found
No related tags found
No related merge requests found
......@@ -20,7 +20,24 @@ package org.apache.spark.network.netty
import org.apache.spark.SparkConf
import org.apache.spark.network.util.{TransportConf, ConfigProvider}
/**
* Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor,
* Driver, or a standalone shuffle service) into a TransportConf with details on our environment
* like the number of cores that are allocated to this JVM.
*/
object SparkTransportConf {
/**
* Specifies an upper bound on the number of Netty threads that Spark requires by default.
* In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
* that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
* at a premium.
*
* Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
* allocation. It can be overridden by setting the number of serverThreads and clientThreads
* manually in Spark's configuration.
*/
private val MAX_DEFAULT_NETTY_THREADS = 8
/**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
......@@ -29,15 +46,28 @@ object SparkTransportConf {
*/
def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
val conf = _conf.clone
if (numUsableCores > 0) {
// Only set if serverThreads/clientThreads not already set.
conf.set("spark.shuffle.io.serverThreads",
conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString))
conf.set("spark.shuffle.io.clientThreads",
conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString))
}
// Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
// assuming we have all the machine's cores).
// NB: Only set if serverThreads/clientThreads not already set.
val numThreads = defaultNumThreads(numUsableCores)
conf.set("spark.shuffle.io.serverThreads",
conf.get("spark.shuffle.io.serverThreads", numThreads.toString))
conf.set("spark.shuffle.io.clientThreads",
conf.get("spark.shuffle.io.clientThreads", numThreads.toString))
new TransportConf(new ConfigProvider {
override def get(name: String): String = conf.get(name)
})
}
/**
* Returns the default number of threads for both the Netty client and server thread pools.
* If numUsableCores is 0, we will use Runtime get an approximate number of available cores.
*/
private def defaultNumThreads(numUsableCores: Int): Int = {
val availableCores =
if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment