-
Patrick Wendell authored
Bump this to being enabled for 0.9.0.
Patrick Wendell authoredBump this to being enabled for 0.9.0.
layout: global
title: Spark Configuration
Spark provides three locations to configure the system:
- Spark properties control most application parameters and can be set by passing a SparkConf object to SparkContext, or through Java system properties.
-
Environment variables can be used to set per-machine settings, such as
the IP address, through the
conf/spark-env.sh
script on each node. -
Logging can be configured through
log4j.properties
.
Spark Properties
Spark properties control most application settings and are configured separately for each application.
The preferred way to set them is by passing a SparkConf
class to your SparkContext constructor.
Alternatively, Spark will also load them from Java system properties (for compatibility with old versions
of Spark) and from a spark.conf
file on your classpath.
SparkConf lets you configure most of the common properties to initialize a cluster (e.g., master URL and
application name), as well as arbitrary key-value pairs through the set()
method. For example, we could
initialize an application as follows:
{% highlight scala %} val conf = new SparkConf() .setMaster("local") .setAppName("My application") .set("spark.executor.memory", "1g") val sc = new SparkContext(conf) {% endhighlight %}
Most of the properties control internal settings that have reasonable default values. However, there are at least five properties that you will commonly want to control:
Property Name | Default | Meaning |
---|---|---|
spark.executor.memory | 512m |
Amount of memory to use per executor process, in the same format as JVM memory strings (e.g. 512m , 2g ).
|
spark.serializer | org.apache.spark.serializer. JavaSerializer |
Class to use for serializing objects that will be sent over the network or need to be cached
in serialized form. The default of Java serialization works with any Serializable Java object but is
quite slow, so we recommend using org.apache.spark.serializer.KryoSerializer
and configuring Kryo serialization when speed is necessary. Can be any subclass of
org.apache.spark.Serializer .
|
spark.kryo.registrator | (none) |
If you use Kryo serialization, set this class to register your custom classes with Kryo.
It should be set to a class that extends
KryoRegistrator .
See the tuning guide for more details.
|
spark.local.dir | /tmp | Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. |
spark.cores.max | (not set) |
When running on a standalone deploy cluster or a
Mesos cluster in "coarse-grained"
sharing mode, the maximum amount of CPU cores to request for the application from
across the cluster (not from each machine). If not set, the default will be
spark.deploy.defaultCores on Spark's standalone cluster manager, or
infinite (all available cores) on Mesos.
|
Apart from these, the following properties are also available, and may be useful in some situations:
Property Name | Default | Meaning |
---|---|---|
spark.default.parallelism | 8 |
Default number of tasks to use for distributed shuffle operations (groupByKey ,
reduceByKey , etc) when not set by user.
|
spark.storage.memoryFraction | 0.66 | Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase it if you configure your own old generation size. |
spark.mesos.coarse | false | If set to "true", runs over Mesos clusters in "coarse-grained" sharing mode, where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per Spark task. This gives lower-latency scheduling for short queries, but leaves resources in use for the whole duration of the Spark job. |
spark.ui.port | 4040 | Port for your application's dashboard, which shows memory and workload data |
spark.ui.retainedStages | 1000 | How many stages the Spark UI remembers before garbage collecting. |
spark.shuffle.compress | true | Whether to compress map output files. Generally a good idea. |
spark.broadcast.compress | true | Whether to compress broadcast variables before sending them. Generally a good idea. |
spark.rdd.compress | false |
Whether to compress serialized RDD partitions (e.g. for StorageLevel.MEMORY_ONLY_SER ).
Can save substantial space at the cost of some extra CPU time.
|
spark.io.compression.codec | org.apache.spark.io. LZFCompressionCodec |
The codec used to compress internal data such as RDD partitions and shuffle outputs. By default, Spark provides two
codecs: org.apache.spark.io.LZFCompressionCodec and org.apache.spark.io.SnappyCompressionCodec .
|
spark.io.compression.snappy.block.size | 32768 | Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec is used. |
spark.scheduler.mode | FIFO |
The scheduling mode between
jobs submitted to the same SparkContext. Can be set to FAIR
to use fair sharing instead of queueing jobs one after another. Useful for
multi-user services.
|
spark.reducer.maxMbInFlight | 48 | Maximum size (in megabytes) of map outputs to fetch simultaneously from each reduce task. Since each output requires us to create a buffer to receive it, this represents a fixed memory overhead per reduce task, so keep it small unless you have a large amount of memory. |
spark.closure.serializer | org.apache.spark.serializer. JavaSerializer |
Serializer class to use for closures. Generally Java is fine unless your distributed functions (e.g. map functions) reference large objects in the driver program. |
spark.kryo.referenceTracking | true | Whether to track references to the same object when serializing data with Kryo, which is necessary if your object graphs have loops and useful for efficiency if they contain multiple copies of the same object. Can be disabled to improve performance if you know this is not the case. |
spark.kryoserializer.buffer.mb | 2 | Maximum object size to allow within Kryo (the library needs to create a buffer at least as large as the largest single object you'll serialize). Increase this if you get a "buffer limit exceeded" exception inside Kryo. Note that there will be one buffer per core on each worker. |
spark.broadcast.factory | org.apache.spark.broadcast. HttpBroadcastFactory |
Which broadcast implementation to use. |
spark.locality.wait | 3000 |
Number of milliseconds to wait to launch a data-local task before giving up and launching it
on a less-local node. The same wait will be used to step through multiple locality levels
(process-local, node-local, rack-local and then any). It is also possible to customize the
waiting time for each level by setting spark.locality.wait.node , etc.
You should increase this setting if your tasks are long and see poor locality, but the
default usually works well.
|
spark.locality.wait.process | spark.locality.wait | Customize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process. |
spark.locality.wait.node | spark.locality.wait | Customize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information). |
spark.locality.wait.rack | spark.locality.wait | Customize the locality wait for rack locality. |
spark.worker.timeout | 60 | Number of seconds after which the standalone deploy master considers a worker lost if it receives no heartbeats. |
spark.akka.frameSize | 10 |
Maximum message size to allow in "control plane" communication (for serialized tasks and task
results), in MB. Increase this if your tasks need to send back large results to the driver
(e.g. using collect() on a large dataset).
|
spark.akka.threads | 4 | Number of actor threads to use for communication. Can be useful to increase on large clusters when the driver has a lot of CPU cores. |
spark.akka.timeout | 100 | Communication timeout between Spark nodes, in seconds. |
spark.akka.heartbeat.pauses | 600 | This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). Acceptable heart beat pause in seconds for akka. This can be used to control sensitivity to gc pauses. Tune this in combination of `spark.akka.heartbeat.interval` and `spark.akka.failure-detector.threshold` if you need to. |
spark.akka.failure-detector.threshold | 300.0 | This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). This maps to akka's `akka.remote.transport-failure-detector.threshold`. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.heartbeat.interval` if you need to. |
spark.akka.heartbeat.interval | 1000 | This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. |
spark.driver.host | (local hostname) | Hostname or IP address for the driver to listen on. |
spark.driver.port | (random) | Port for the driver to listen on. |
spark.cleaner.ttl | (infinite) | Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be forgetten. This is useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming applications). Note that any RDD that persists in memory for more than this duration will be cleared as well. |
spark.streaming.blockInterval | 200 | Duration (milliseconds) of how long to batch new objects coming from network receivers. |
spark.task.maxFailures | 4 | Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1. |
spark.broadcast.blockSize | 4096 |
Size of each piece of a block in kilobytes for TorrentBroadcastFactory .
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, BlockManager might take a performance hit.
|
akka.x.y.... | value | An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that SparkContext and its assigned executors as well. |
spark.shuffle.consolidateFiles | true | If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (>8) cores due to filesystem limitations. |
spark.speculation | false | If set to "true", performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. |
spark.speculation.interval | 100 | How often Spark will check for tasks to speculate, in milliseconds. |
spark.speculation.quantile | 0.75 | Percentage of tasks which must be complete before speculation is enabled for a particular stage. |
spark.speculation.multiplier | 1.5 | How many times slower a task is than the median to be considered for speculation. |
spark.logConf | false | Log the supplied SparkConf as INFO at start of spark context. |
spark.deploy.spreadOut | true |
Whether the standalone cluster manager should spread applications out across nodes or try
to consolidate them onto as few nodes as possible. Spreading out is usually better for
data locality in HDFS, but consolidating is more efficient for compute-intensive workloads. Note: this setting needs to be configured in the standalone cluster master, not in individual applications; you can set it through SPARK_JAVA_OPTS in spark-env.sh .
|
spark.deploy.defaultCores | (infinite) |
Default number of cores to give to applications in Spark's standalone mode if they don't
set spark.cores.max . If not set, applications always get all available
cores unless they configure spark.cores.max themselves.
Set this lower on a shared cluster to prevent users from grabbing
the whole cluster by default. Note: this setting needs to be configured in the standalone cluster master, not in individual applications; you can set it through SPARK_JAVA_OPTS in spark-env.sh .
|
Viewing Spark Properties
The application web UI at http://<driver>:4040
lists Spark properties in the "Environment" tab.
This is a useful place to check to make sure that your properties have been set correctly.
Configuration Files
You can also configure Spark properties through a spark.conf
file on your Java classpath.
Because these properties are usually application-specific, we recommend putting this fine only on your
application's classpath, and not in a global Spark classpath.
The spark.conf
file uses Typesafe Config's HOCON format,
which is a superset of Java properties files and JSON. For example, the following is a simple config file:
{% highlight awk %}
Comments are allowed
spark.executor.memory = 512m spark.serializer = org.apache.spark.serializer.KryoSerializer {% endhighlight %}
The format also allows hierarchical nesting, as follows: