Skip to content
Snippets Groups Projects
Commit c3cf0475 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Merge pull request #339 from ScrapCodes/conf-improvements

Conf improvements

There are two new features.

1. Allow users to set arbitrary akka configurations via spark conf.

2. Allow configuration to be printed in logs for diagnosis.
parents a862cafa c729fa7c
No related branches found
No related tags found
No related merge requests found
...@@ -172,6 +172,9 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with ...@@ -172,6 +172,9 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
.map{case (k, v) => (k.substring(prefix.length), v)} .map{case (k, v) => (k.substring(prefix.length), v)}
} }
/** Get all akka conf variables set on this SparkConf */
def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")}
/** Does the configuration contain a given parameter? */ /** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key) def contains(key: String): Boolean = settings.contains(key)
......
...@@ -116,6 +116,10 @@ class SparkContext( ...@@ -116,6 +116,10 @@ class SparkContext(
throw new SparkException("An application must be set in your configuration") throw new SparkException("An application must be set in your configuration")
} }
if (conf.get("spark.log-conf", "false").toBoolean) {
logInfo("Spark configuration:\n" + conf.toDebugString)
}
// Set Spark driver host and port system properties // Set Spark driver host and port system properties
conf.setIfMissing("spark.driver.host", Utils.localHostName()) conf.setIfMissing("spark.driver.host", Utils.localHostName())
conf.setIfMissing("spark.driver.port", "0") conf.setIfMissing("spark.driver.port", "0")
......
...@@ -17,12 +17,13 @@ ...@@ -17,12 +17,13 @@
package org.apache.spark.util package org.apache.spark.util
import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem} import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
/** /**
...@@ -64,7 +65,8 @@ private[spark] object AkkaUtils { ...@@ -64,7 +65,8 @@ private[spark] object AkkaUtils {
conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
val akkaConf = ConfigFactory.parseString( val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
ConfigFactory.parseString(
s""" s"""
|akka.daemonic = on |akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""] |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
...@@ -86,7 +88,7 @@ private[spark] object AkkaUtils { ...@@ -86,7 +88,7 @@ private[spark] object AkkaUtils {
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
|akka.log-dead-letters = $lifecycleEvents |akka.log-dead-letters = $lifecycleEvents
|akka.log-dead-letters-during-shutdown = $lifecycleEvents |akka.log-dead-letters-during-shutdown = $lifecycleEvents
""".stripMargin) """.stripMargin))
val actorSystem = if (indestructible) { val actorSystem = if (indestructible) {
IndestructibleActorSystem(name, akkaConf) IndestructibleActorSystem(name, akkaConf)
......
...@@ -360,6 +360,14 @@ Apart from these, the following properties are also available, and may be useful ...@@ -360,6 +360,14 @@ Apart from these, the following properties are also available, and may be useful
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit. Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
</td> </td>
</tr> </tr>
<tr>
<td>akka.x.y....</td>
<td>value</td>
<td>
An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that SparkContext and its assigned executors as well.
</td>
</tr>
<tr> <tr>
<td>spark.shuffle.consolidateFiles</td> <td>spark.shuffle.consolidateFiles</td>
<td>false</td> <td>false</td>
...@@ -395,6 +403,13 @@ Apart from these, the following properties are also available, and may be useful ...@@ -395,6 +403,13 @@ Apart from these, the following properties are also available, and may be useful
How many times slower a task is than the median to be considered for speculation. How many times slower a task is than the median to be considered for speculation.
</td> </td>
</tr> </tr>
<tr>
<td>spark.log-conf</td>
<td>false</td>
<td>
Log the supplied SparkConf as INFO at start of spark context.
</td>
</tr>
</table> </table>
## Viewing Spark Properties ## Viewing Spark Properties
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment