diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 55f27033b5579453d6f952d58920472c9ac15deb..2d437f1b213639acef489239e4d86478701fde74 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -172,6 +172,13 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
           .map{case (k, v) => (k.substring(prefix.length), v)}
   }
 
+  /** Get all akka conf variables set on this SparkConf */
+  def getAkkaConf: Seq[(String, String)] = {
+    getAll.filter {
+      case (k, v) => k.startsWith("akka.")
+    }
+  }
+
   /** Does the configuration contain a given parameter? */
   def contains(key: String): Boolean = settings.contains(key)
 
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 7df7e3d8e561cb97f8a0708c35f7cb97faaba073..2ee37815dee218b1eb2345e4064d4bb2442efd2c 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.util
 
+import scala.collection.JavaConversions.mapAsJavaMap
 import scala.concurrent.duration.{Duration, FiniteDuration}
 
 import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
 import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
 
+import org.apache.log4j.{Level, Logger}
 import org.apache.spark.SparkConf
 
 /**
@@ -64,7 +65,8 @@ private[spark] object AkkaUtils {
       conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
     val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
 
-    val akkaConf = ConfigFactory.parseString(
+    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
+      ConfigFactory.parseString(
       s"""
       |akka.daemonic = on
       |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@@ -86,7 +88,7 @@ private[spark] object AkkaUtils {
       |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
       |akka.log-dead-letters = $lifecycleEvents
       |akka.log-dead-letters-during-shutdown = $lifecycleEvents
-      """.stripMargin)
+      """.stripMargin))
 
     val actorSystem = if (indestructible) {
       IndestructibleActorSystem(name, akkaConf)
diff --git a/docs/configuration.md b/docs/configuration.md
index 09342fedfc1a0224c80ec95fc18dd67d9e2a8c6a..8a8857bb3b84ec6be3d5648f4d2278b120dbcd03 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -360,6 +360,14 @@ Apart from these, the following properties are also available, and may be useful
     Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
   </td>
 </tr>
+<tr>
+  <td>akka.x.y....</td>
+  <td>value</td>
+  <td>
+    An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that spark context and its assigned executors as well.
+  </td>
+</tr>
+
 <tr>
   <td>spark.shuffle.consolidateFiles</td>
   <td>false</td>