From b3018811e106e6414816380a35c07a8564945d37 Mon Sep 17 00:00:00 2001 From: Prashant Sharma <prashant.s@imaginea.com> Date: Mon, 6 Jan 2014 15:47:40 +0530 Subject: [PATCH] Allow users to set arbitrary akka configurations via spark conf. --- core/src/main/scala/org/apache/spark/SparkConf.scala | 7 +++++++ core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 8 +++++--- docs/configuration.md | 8 ++++++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 55f27033b5..2d437f1b21 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -172,6 +172,13 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with .map{case (k, v) => (k.substring(prefix.length), v)} } + /** Get all akka conf variables set on this SparkConf */ + def getAkkaConf: Seq[(String, String)] = { + getAll.filter { + case (k, v) => k.startsWith("akka.") + } + } + /** Does the configuration contain a given parameter? */ def contains(key: String): Boolean = settings.contains(key) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 7df7e3d8e5..2ee37815de 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,12 +17,13 @@ package org.apache.spark.util +import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem} import com.typesafe.config.ConfigFactory -import org.apache.log4j.{Level, Logger} +import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf /** @@ -64,7 +65,8 @@ private[spark] object AkkaUtils { conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt - val akkaConf = ConfigFactory.parseString( + val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback( + ConfigFactory.parseString( s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] @@ -86,7 +88,7 @@ private[spark] object AkkaUtils { |akka.remote.log-remote-lifecycle-events = $lifecycleEvents |akka.log-dead-letters = $lifecycleEvents |akka.log-dead-letters-during-shutdown = $lifecycleEvents - """.stripMargin) + """.stripMargin)) val actorSystem = if (indestructible) { IndestructibleActorSystem(name, akkaConf) diff --git a/docs/configuration.md b/docs/configuration.md index 09342fedfc..8a8857bb3b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -360,6 +360,14 @@ Apart from these, the following properties are also available, and may be useful Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit. </td> </tr> +<tr> + <td>akka.x.y....</td> + <td>value</td> + <td> + An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that spark context and its assigned executors as well. + </td> +</tr> + <tr> <td>spark.shuffle.consolidateFiles</td> <td>false</td> -- GitLab