Skip to content
Snippets Groups Projects
Commit b3018811 authored by Prashant Sharma's avatar Prashant Sharma
Browse files

Allow users to set arbitrary akka configurations via spark conf.

parent b97ef218
No related branches found
No related tags found
No related merge requests found
......@@ -172,6 +172,13 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
.map{case (k, v) => (k.substring(prefix.length), v)}
}
/** Get all akka conf variables set on this SparkConf */
def getAkkaConf: Seq[(String, String)] = {
getAll.filter {
case (k, v) => k.startsWith("akka.")
}
}
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
......
......@@ -17,12 +17,13 @@
package org.apache.spark.util
import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
/**
......@@ -64,7 +65,8 @@ private[spark] object AkkaUtils {
conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
val akkaConf = ConfigFactory.parseString(
val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
ConfigFactory.parseString(
s"""
|akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
......@@ -86,7 +88,7 @@ private[spark] object AkkaUtils {
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
|akka.log-dead-letters = $lifecycleEvents
|akka.log-dead-letters-during-shutdown = $lifecycleEvents
""".stripMargin)
""".stripMargin))
val actorSystem = if (indestructible) {
IndestructibleActorSystem(name, akkaConf)
......
......@@ -360,6 +360,14 @@ Apart from these, the following properties are also available, and may be useful
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
</td>
</tr>
<tr>
<td>akka.x.y....</td>
<td>value</td>
<td>
An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that spark context and its assigned executors as well.
</td>
</tr>
<tr>
<td>spark.shuffle.consolidateFiles</td>
<td>false</td>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment