diff --git a/core/pom.xml b/core/pom.xml
index 2071a58de92e41b07791123b742858b84f54cf65..0ab170e028ab4d2afdaf9d442d743093e89cfb98 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -185,19 +185,6 @@
       <groupId>commons-net</groupId>
       <artifactId>commons-net</artifactId>
     </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-remote_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-testkit_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
@@ -224,6 +211,10 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+    </dependency>
     <dependency>
       <groupId>com.clearspring.analytics</groupId>
       <artifactId>stream</artifactId>
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 4628093b91cb889ef2748a0b36382055dbf12a7a..5a42299a0bf831b446493cb360c264a2b6c3db3c 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -86,8 +86,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
    * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
    *
    * Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
-   * workaround for the issue, which is ultimately caused by the way the BlockManager actors
-   * issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
+   * workaround for the issue, which is ultimately caused by the way the BlockManager endpoints
+   * issue inter-dependent blocking RPC messages to each other at high frequencies. This happens,
    * for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
    * longer in scope.
    */
@@ -101,7 +101,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
    * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
    * parameter by default disables blocking on shuffle cleanups. Note that this does not affect
    * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
-   * until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
+   * until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is
    * resolved.
    */
   private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
deleted file mode 100644
index 46f9f9e9af7da6a11d036e276ec7a49d19ca551b..0000000000000000000000000000000000000000
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.File
-
-import com.google.common.io.Files
-
-import org.apache.spark.util.Utils
-
-private[spark] class HttpFileServer(
-    conf: SparkConf,
-    securityManager: SecurityManager,
-    requestedPort: Int = 0)
-  extends Logging {
-
-  var baseDir : File = null
-  var fileDir : File = null
-  var jarDir : File = null
-  var httpServer : HttpServer = null
-  var serverUri : String = null
-
-  def initialize() {
-    baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
-    fileDir = new File(baseDir, "files")
-    jarDir = new File(baseDir, "jars")
-    fileDir.mkdir()
-    jarDir.mkdir()
-    logInfo("HTTP File server directory is " + baseDir)
-    httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
-    httpServer.start()
-    serverUri = httpServer.uri
-    logDebug("HTTP file server started at: " + serverUri)
-  }
-
-  def stop() {
-    httpServer.stop()
-
-    // If we only stop sc, but the driver process still run as a services then we need to delete
-    // the tmp dir, if not, it will create too many tmp dirs
-    try {
-      Utils.deleteRecursively(baseDir)
-    } catch {
-      case e: Exception =>
-        logWarning(s"Exception while deleting Spark temp dir: ${baseDir.getAbsolutePath}", e)
-    }
-  }
-
-  def addFile(file: File) : String = {
-    addFileToDir(file, fileDir)
-    serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName)
-  }
-
-  def addJar(file: File) : String = {
-    addFileToDir(file, jarDir)
-    serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName)
-  }
-
-  def addDirectory(path: String, resourceBase: String): String = {
-    httpServer.addDirectory(path, resourceBase)
-    serverUri + path
-  }
-
-  def addFileToDir(file: File, dir: File) : String = {
-    // Check whether the file is a directory. If it is, throw a more meaningful exception.
-    // If we don't catch this, Guava throws a very confusing error message:
-    //   java.io.FileNotFoundException: [file] (No such file or directory)
-    // even though the directory ([file]) exists.
-    if (file.isDirectory) {
-      throw new IllegalArgumentException(s"$file cannot be a directory.")
-    }
-    Files.copy(file, new File(dir, file.getName))
-    dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName)
-  }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 1b59beb8d6efdade14cffc1d959d8444e083e631..eb2fdecc83011ba3e578c564069385c51870c48c 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -40,7 +40,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
 private[spark] class MapOutputTrackerMasterEndpoint(
     override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: SparkConf)
   extends RpcEndpoint with Logging {
-  val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
     case GetMapOutputStatuses(shuffleId: Int) =>
@@ -48,9 +48,10 @@ private[spark] class MapOutputTrackerMasterEndpoint(
       logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
       val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
       val serializedSize = mapOutputStatuses.length
-      if (serializedSize > maxAkkaFrameSize) {
+      if (serializedSize > maxRpcMessageSize) {
+
         val msg = s"Map output statuses were $serializedSize bytes which " +
-          s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)."
+          s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
 
         /* For SPARK-1244 we'll opt for just logging an error and then sending it to the sender.
          * A bigger refactoring (SPARK-1239) will ultimately remove this entire code path. */
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 261265f0b4c550b3508e4b8990ec10dc4ed12c66..d755f07965e6c9fe94c7ee5ffb483a123005c1bc 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -21,9 +21,6 @@ import java.io.File
 import java.security.NoSuchAlgorithmException
 import javax.net.ssl.SSLContext
 
-import scala.collection.JavaConverters._
-
-import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
 import org.eclipse.jetty.util.ssl.SslContextFactory
 
 /**
@@ -31,8 +28,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
  * generate specific objects to configure SSL for different communication protocols.
  *
  * SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
- * by the protocol, which it can generate the configuration for. Since Akka doesn't support client
- * authentication with SSL, SSLOptions cannot support it either.
+ * by the protocol, which it can generate the configuration for.
  *
  * @param enabled             enables or disables SSL; if it is set to false, the rest of the
  *                            settings are disregarded
@@ -88,43 +84,6 @@ private[spark] case class SSLOptions(
     }
   }
 
-  /**
-   * Creates an Akka configuration object which contains all the SSL settings represented by this
-   * object. It can be used then to compose the ultimate Akka configuration.
-   */
-  def createAkkaConfig: Option[Config] = {
-    if (enabled) {
-      if (keyStoreType.isDefined) {
-        logWarning("Akka configuration does not support key store type.");
-      }
-      if (trustStoreType.isDefined) {
-        logWarning("Akka configuration does not support trust store type.");
-      }
-
-      Some(ConfigFactory.empty()
-        .withValue("akka.remote.netty.tcp.security.key-store",
-          ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
-        .withValue("akka.remote.netty.tcp.security.key-store-password",
-          ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
-        .withValue("akka.remote.netty.tcp.security.trust-store",
-          ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
-        .withValue("akka.remote.netty.tcp.security.trust-store-password",
-          ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
-        .withValue("akka.remote.netty.tcp.security.key-password",
-          ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
-        .withValue("akka.remote.netty.tcp.security.random-number-generator",
-          ConfigValueFactory.fromAnyRef(""))
-        .withValue("akka.remote.netty.tcp.security.protocol",
-          ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
-        .withValue("akka.remote.netty.tcp.security.enabled-algorithms",
-          ConfigValueFactory.fromIterable(supportedAlgorithms.asJava))
-        .withValue("akka.remote.netty.tcp.enable-ssl",
-          ConfigValueFactory.fromAnyRef(true)))
-    } else {
-      None
-    }
-  }
-
   /*
    * The supportedAlgorithms set is a subset of the enabledAlgorithms that
    * are supported by the current Java security provider for this protocol.
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index c5aec05c03fced6c8f7ad6e54259d75e39419837..0675957e1668094c1598117744e4ba5323b11abb 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -67,17 +67,6 @@ import org.apache.spark.util.Utils
  * At this point spark has multiple communication protocols that need to be secured and
  * different underlying mechanisms are used depending on the protocol:
  *
- *  - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
- *            Akka remoting allows you to specify a secure cookie that will be exchanged
- *            and ensured to be identical in the connection handshake between the client
- *            and the server. If they are not identical then the client will be refused
- *            to connect to the server. There is no control of the underlying
- *            authentication mechanism so its not clear if the password is passed in
- *            plaintext or uses DIGEST-MD5 or some other mechanism.
- *
- *            Akka also has an option to turn on SSL, this option is currently supported (see
- *            the details below).
- *
  *  - HTTP for broadcast and file server (via HttpServer) ->  Spark currently uses Jetty
  *            for the HttpServer. Jetty supports multiple authentication mechanisms -
  *            Basic, Digest, Form, Spengo, etc. It also supports multiple different login
@@ -168,16 +157,16 @@ import org.apache.spark.util.Utils
  *  denote the global configuration for all the supported protocols. In order to override the global
  *  configuration for the particular protocol, the properties must be overwritten in the
  *  protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
- *  configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
- *  Akka based connections or `fs` for broadcast and file server.
+ *  configuration for particular protocol denoted by `yyy`. Currently `yyy` can be only`fs` for
+ *  broadcast and file server.
  *
  *  Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
  *  options that can be specified.
  *
  *  SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
  *  object parses Spark configuration at a given namespace and builds the common representation
- *  of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
- *  TypeSafe configuration for Akka or SSLContextFactory for Jetty.
+ *  of SSL settings. SSLOptions is then used to provide protocol-specific SSLContextFactory for
+ *  Jetty.
  *
  *  SSL must be configured on each node and configured for each component involved in
  *  communication using the particular protocol. In YARN clusters, the key-store can be prepared on
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 340e1f7824d1e3b5e5899e2ff6bcd74ffca45a4c..36e240e618490ca48747cf76452486f7a8677a4f 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -344,17 +344,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
           .map{case (k, v) => (k.substring(prefix.length), v)}
   }
 
-  /** Get all akka conf variables set on this SparkConf */
-  def getAkkaConf: Seq[(String, String)] =
-    /* This is currently undocumented. If we want to make this public we should consider
-     * nesting options under the spark namespace to avoid conflicts with user akka options.
-     * Otherwise users configuring their own akka code via system properties could mess up
-     * spark's akka options.
-     *
-     *   E.g. spark.akka.option.x.y.x = "value"
-     */
-    getAll.filter { case (k, _) => isAkkaConf(k) }
-
   /**
    * Returns the Spark application id, valid in the Driver after TaskScheduler registration and
    * from the start in the Executor.
@@ -600,7 +589,9 @@ private[spark] object SparkConf extends Logging {
     "spark.yarn.max.executor.failures" -> Seq(
       AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
     "spark.memory.offHeap.enabled" -> Seq(
-      AlternateConfig("spark.unsafe.offHeap", "1.6"))
+      AlternateConfig("spark.unsafe.offHeap", "1.6")),
+    "spark.rpc.message.maxSize" -> Seq(
+      AlternateConfig("spark.akka.frameSize", "1.6"))
     )
 
   /**
@@ -615,21 +606,13 @@ private[spark] object SparkConf extends Logging {
     }.toMap
   }
 
-  /**
-   * Return whether the given config is an akka config (e.g. akka.actor.provider).
-   * Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
-   */
-  def isAkkaConf(name: String): Boolean = name.startsWith("akka.")
-
   /**
    * Return whether the given config should be passed to an executor on start-up.
    *
-   * Certain akka and authentication configs are required from the executor when it connects to
+   * Certain authentication configs are required from the executor when it connects to
    * the scheduler, while the rest of the spark configs can be inherited from the driver later.
    */
   def isExecutorStartupConf(name: String): Boolean = {
-    isAkkaConf(name) ||
-    name.startsWith("spark.akka") ||
     (name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
     name.startsWith("spark.ssl") ||
     name.startsWith("spark.rpc") ||
@@ -664,12 +647,19 @@ private[spark] object SparkConf extends Logging {
       logWarning(
         s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
         s"may be removed in the future. ${cfg.deprecationMessage}")
+      return
     }
 
     allAlternatives.get(key).foreach { case (newKey, cfg) =>
       logWarning(
         s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
         s"and may be removed in the future. Please use the new key '$newKey' instead.")
+      return
+    }
+    if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {
+      logWarning(
+        s"The configuration key $key is not supported any more " +
+          s"because Spark doesn't use Akka since 2.0")
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ec43be0e2f3a47712cb1564f0d45b59511f55c03..9461afdc54124a22191cee30e1ae90c96183a423 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -23,7 +23,6 @@ import java.net.Socket
 import scala.collection.mutable
 import scala.util.Properties
 
-import akka.actor.ActorSystem
 import com.google.common.collect.MapMaker
 
 import org.apache.spark.annotation.DeveloperApi
@@ -39,12 +38,12 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.storage._
-import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
+import org.apache.spark.util.{RpcUtils, Utils}
 
 /**
  * :: DeveloperApi ::
  * Holds all the runtime environment objects for a running Spark instance (either master or worker),
- * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
+ * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
  * Spark code finds the SparkEnv through a global variable, so all the threads can access the same
  * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
  *
@@ -55,7 +54,6 @@ import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
 class SparkEnv (
     val executorId: String,
     private[spark] val rpcEnv: RpcEnv,
-    _actorSystem: ActorSystem, // TODO Remove actorSystem
     val serializer: Serializer,
     val closureSerializer: Serializer,
     val cacheManager: CacheManager,
@@ -71,10 +69,6 @@ class SparkEnv (
     val outputCommitCoordinator: OutputCommitCoordinator,
     val conf: SparkConf) extends Logging {
 
-  // TODO Remove actorSystem
-  @deprecated("Actor system is no longer supported as of 1.4.0", "1.4.0")
-  val actorSystem: ActorSystem = _actorSystem
-
   private[spark] var isStopped = false
   private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
 
@@ -96,13 +90,8 @@ class SparkEnv (
       blockManager.master.stop()
       metricsSystem.stop()
       outputCommitCoordinator.stop()
-      actorSystem.shutdown()
       rpcEnv.shutdown()
-
-      // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
-      // down, but let's call it anyway in case it gets fixed in a later release
-      // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
-      // actorSystem.awaitTermination()
+      rpcEnv.awaitTermination()
 
       // Note that blockTransferService is stopped by BlockManager since it is started by it.
 
@@ -152,8 +141,8 @@ class SparkEnv (
 object SparkEnv extends Logging {
   @volatile private var env: SparkEnv = _
 
-  private[spark] val driverActorSystemName = "sparkDriver"
-  private[spark] val executorActorSystemName = "sparkExecutor"
+  private[spark] val driverSystemName = "sparkDriver"
+  private[spark] val executorSystemName = "sparkExecutor"
 
   def set(e: SparkEnv) {
     env = e
@@ -202,7 +191,7 @@ object SparkEnv extends Logging {
 
   /**
    * Create a SparkEnv for an executor.
-   * In coarse-grained mode, the executor provides an actor system that is already instantiated.
+   * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
    */
   private[spark] def createExecutorEnv(
       conf: SparkConf,
@@ -245,28 +234,11 @@ object SparkEnv extends Logging {
 
     val securityManager = new SecurityManager(conf)
 
-    val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
-    // Create the ActorSystem for Akka and get the port it binds to.
-    val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
+    val systemName = if (isDriver) driverSystemName else executorSystemName
+    val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager,
       clientMode = !isDriver)
-    val actorSystem: ActorSystem = {
-        val actorSystemPort =
-          if (port == 0 || rpcEnv.address == null) {
-            port
-          } else {
-            rpcEnv.address.port + 1
-          }
-        // Create a ActorSystem for legacy codes
-        AkkaUtils.createActorSystem(
-          actorSystemName + "ActorSystem",
-          hostname,
-          actorSystemPort,
-          conf,
-          securityManager
-        )._1
-      }
 
-    // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
+    // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
     // In the non-driver case, the RPC env's address may be null since it may not be listening
     // for incoming connections.
     if (isDriver) {
@@ -325,7 +297,7 @@ object SparkEnv extends Logging {
       new MapOutputTrackerWorker(conf)
     }
 
-    // Have to assign trackerActor after initialization as MapOutputTrackerActor
+    // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
     // requires the MapOutputTracker itself
     mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
       new MapOutputTrackerMasterEndpoint(
@@ -398,7 +370,6 @@ object SparkEnv extends Logging {
     val envInstance = new SparkEnv(
       executorId,
       rpcEnv,
-      actorSystem,
       serializer,
       closureSerializer,
       cacheManager,
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 63a20ab41a0f7829b55a161e008577149da550a0..dcef03ef3e3eda66a866a1582341bbe5ad01f0a1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -219,11 +219,7 @@ object Client {
     val conf = new SparkConf()
     val driverArgs = new ClientArguments(args)
 
-    if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
-      conf.set("spark.akka.logLifecycleEvents", "true")
-    }
     conf.set("spark.rpc.askTimeout", "10")
-    conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
     Logger.getRootLogger.setLevel(driverArgs.logLevel)
 
     val rpcEnv =
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a1e8da1ec8f5d5a27fe8fce834ace0737094295a..a6749f7e38802dbde9b7e9057ef58d422166c47e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -183,7 +183,7 @@ object SparkSubmit {
     }
 
      // In standalone cluster mode, there are two submission gateways:
-     //   (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
+     //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
      //   (2) The new REST-based gateway introduced in Spark 1.3
      // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
      // to use the legacy gateway if the master endpoint turns out to be not a REST server.
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 58bd9ca3d12c326cb7906bdf4910b82e38e0d97f..e3a6c4c07a75e4d25fe9bf07a0bbc58de29ac7fa 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -37,7 +37,6 @@ private[spark] class CoarseGrainedExecutorBackend(
     override val rpcEnv: RpcEnv,
     driverUrl: String,
     executorId: String,
-    hostPort: String,
     cores: Int,
     userClassPath: Seq[URL],
     env: SparkEnv)
@@ -55,8 +54,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       driver = Some(ref)
-      ref.ask[RegisterExecutorResponse](
-        RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
+      ref.ask[RegisterExecutorResponse](RegisterExecutor(executorId, self, cores, extractLogUrls))
     }(ThreadUtils.sameThread).onComplete {
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       case Success(msg) => Utils.tryLogNonFatalError {
@@ -184,14 +182,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       val env = SparkEnv.createExecutorEnv(
         driverConf, executorId, hostname, port, cores, isLocal = false)
 
-      // SparkEnv will set spark.executor.port if the rpc env is listening for incoming
-      // connections (e.g., if it's using akka). Otherwise, the executor is running in
-      // client mode only, and does not accept incoming connections.
-      val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
-          hostname + ":" + port
-        }.orNull
       env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
-        env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
+        env.rpcEnv, driverUrl, executorId, cores, userClassPath, env))
       workerUrl.foreach { url =>
         env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
       }
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 75d7e34d60eb2c9368a586ad3405d8cb81b20a88..030ae41db4a625348bab60264fdfbf51dd67558d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -40,7 +40,7 @@ import org.apache.spark.util._
  * Spark executor, backed by a threadpool to run tasks.
  *
  * This can be used with Mesos, YARN, and the standalone scheduler.
- * An internal RPC interface (at the moment Akka) is used for communication with the driver,
+ * An internal RPC interface is used for communication with the driver,
  * except in the case of Mesos fine-grained mode.
  */
 private[spark] class Executor(
@@ -97,9 +97,9 @@ private[spark] class Executor(
   // Set the classloader for serializer
   env.serializer.setDefaultClassLoader(replClassLoader)
 
-  // Akka's message frame size. If task result is bigger than this, we use the block manager
+  // Max RPC message size. If task result is bigger than this, we use the block manager
   // to send the result back.
-  private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
 
   // Limit of bytes for total size of results (default is 1GB)
   private val maxResultSize = Utils.getMaxResultSize(conf)
@@ -263,7 +263,7 @@ private[spark] class Executor(
               s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
               s"dropping it.")
             ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
-          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
+          } else if (resultSize >= maxRpcMessageSize) {
             val blockId = TaskResultBlockId(taskId)
             env.blockManager.putBytes(
               blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index f3d0d8547677226dc285a4ada2542232f376530c..29e469c3f5983ee010564fc3950ee3ce71731c75 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -48,7 +48,6 @@ private[spark] object CoarseGrainedClusterMessages {
   case class RegisterExecutor(
       executorId: String,
       executorRef: RpcEndpointRef,
-      hostPort: String,
       cores: Int,
       logUrls: Map[String, String])
     extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b808993aa6cd31ab67097ed61be889c7df3bc5fc..f69a3d371e5dda6ec946f5420b42cfc74f32d819 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME
-import org.apache.spark.util.{AkkaUtils, SerializableBuffer, ThreadUtils, Utils}
+import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils}
 
 /**
  * A scheduler backend that waits for coarse-grained executors to connect.
@@ -46,7 +46,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // Total number of executors that are currently registered
   var totalRegisteredExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
-  private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
   // Submit tasks only after (registered resources / total expected resources)
   // is equal to at least this value, that is double between 0 and 1.
   var minRegisteredRatio =
@@ -134,7 +134,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
 
-      case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
+      case RegisterExecutor(executorId, executorRef, cores, logUrls) =>
         if (executorDataMap.contains(executorId)) {
           context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
         } else {
@@ -224,14 +224,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
       for (task <- tasks.flatten) {
         val serializedTask = ser.serialize(task)
-        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
+        if (serializedTask.limit >= maxRpcMessageSize) {
           scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
             try {
               var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
-                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
-                "spark.akka.frameSize or using broadcast variables for large values."
-              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
-                AkkaUtils.reservedSizeBytes)
+                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
+                "spark.rpc.message.maxSize or using broadcast variables for large values."
+              msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
               taskSetMgr.abort(msg)
             } catch {
               case e: Exception => logError("Exception in error callback", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 0a6f2c01c18dfdf5dbbf0c8a0dd8bd9e9b842409..a298cf5ef9a6dbccf977bf12c3373900310330f3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -49,7 +49,7 @@ private[spark] class SimrSchedulerBackend(
     val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
 
     logInfo("Writing to HDFS file: "  + driverFilePath)
-    logInfo("Writing Akka address: "  + driverUrl)
+    logInfo("Writing Driver address: "  + driverUrl)
     logInfo("Writing Spark UI Address: " + appUIAddress)
 
     // Create temporary file to prevent race condition where executors get empty driverUrl file
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
deleted file mode 100644
index 3f4ac9b2f18cdc445f960196f27b305557462d32..0000000000000000000000000000000000000000
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.collection.JavaConverters._
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
-
-/**
- * Various utility classes for working with Akka.
- */
-private[spark] object AkkaUtils extends Logging {
-
-  /**
-   * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
-   * ActorSystem itself and its port (which is hard to get from Akka).
-   *
-   * Note: the `name` parameter is important, as even if a client sends a message to right
-   * host + port, if the system name is incorrect, Akka will drop the message.
-   *
-   * If indestructible is set to true, the Actor System will continue running in the event
-   * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
-   */
-  def createActorSystem(
-      name: String,
-      host: String,
-      port: Int,
-      conf: SparkConf,
-      securityManager: SecurityManager): (ActorSystem, Int) = {
-    val startService: Int => (ActorSystem, Int) = { actualPort =>
-      doCreateActorSystem(name, host, actualPort, conf, securityManager)
-    }
-    Utils.startServiceOnPort(port, startService, conf, name)
-  }
-
-  private def doCreateActorSystem(
-      name: String,
-      host: String,
-      port: Int,
-      conf: SparkConf,
-      securityManager: SecurityManager): (ActorSystem, Int) = {
-
-    val akkaThreads = conf.getInt("spark.akka.threads", 4)
-    val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
-    val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
-      conf.get("spark.network.timeout", "120s"))
-    val akkaFrameSize = maxFrameSizeBytes(conf)
-    val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
-    val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
-    if (!akkaLogLifecycleEvents) {
-      // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
-      // See: https://www.assembla.com/spaces/akka/tickets/3787#/
-      Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
-    }
-
-    val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
-
-    val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s")
-    val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s")
-
-    val secretKey = securityManager.getSecretKey()
-    val isAuthOn = securityManager.isAuthenticationEnabled()
-    if (isAuthOn && secretKey == null) {
-      throw new Exception("Secret key is null with authentication on")
-    }
-    val requireCookie = if (isAuthOn) "on" else "off"
-    val secureCookie = if (isAuthOn) secretKey else ""
-    logDebug(s"In createActorSystem, requireCookie is: $requireCookie")
-
-    val akkaSslConfig = securityManager.getSSLOptions("akka").createAkkaConfig
-        .getOrElse(ConfigFactory.empty())
-
-    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava)
-      .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString(
-      s"""
-      |akka.daemonic = on
-      |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
-      |akka.stdout-loglevel = "ERROR"
-      |akka.jvm-exit-on-fatal-error = off
-      |akka.remote.require-cookie = "$requireCookie"
-      |akka.remote.secure-cookie = "$secureCookie"
-      |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s
-      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s
-      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
-      |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
-      |akka.remote.netty.tcp.hostname = "$host"
-      |akka.remote.netty.tcp.port = $port
-      |akka.remote.netty.tcp.tcp-nodelay = on
-      |akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s
-      |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
-      |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
-      |akka.actor.default-dispatcher.throughput = $akkaBatchSize
-      |akka.log-config-on-start = $logAkkaConfig
-      |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
-      |akka.log-dead-letters = $lifecycleEvents
-      |akka.log-dead-letters-during-shutdown = $lifecycleEvents
-      """.stripMargin))
-
-    val actorSystem = ActorSystem(name, akkaConf)
-    val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
-    val boundPort = provider.getDefaultAddress.port.get
-    (actorSystem, boundPort)
-  }
-
-  private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
-
-  /** Returns the configured max frame size for Akka messages in bytes. */
-  def maxFrameSizeBytes(conf: SparkConf): Int = {
-    val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128)
-    if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) {
-      throw new IllegalArgumentException(
-        s"spark.akka.frameSize should not be greater than $AKKA_MAX_FRAME_SIZE_IN_MB MB")
-    }
-    frameSizeInMB * 1024 * 1024
-  }
-
-  /** Space reserved for extra data in an Akka message besides serialized task or task result. */
-  val reservedSizeBytes = 200 * 1024
-
-}
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index b68936f5c9f0a42f0b8294bb7474b8adf6a31faf..2bb8de568e803e6e554d060da6ba3fffd16ed983 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -53,4 +53,16 @@ private[spark] object RpcUtils {
   def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
     RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
   }
+
+  private val MAX_MESSAGE_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
+
+  /** Returns the configured max message size for messages in bytes. */
+  def maxMessageSizeBytes(conf: SparkConf): Int = {
+    val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128)
+    if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) {
+      throw new IllegalArgumentException(
+        s"spark.rpc.message.maxSize should not be greater than $MAX_MESSAGE_SIZE_IN_MB MB")
+    }
+    maxSizeInMB * 1024 * 1024
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
deleted file mode 100644
index bc7059b77fec56f1589b3b03d2268b965ba98b75..0000000000000000000000000000000000000000
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io._
-import java.net.URI
-import java.util.jar.{JarEntry, JarOutputStream}
-import javax.net.ssl.SSLException
-
-import com.google.common.io.{ByteStreams, Files}
-import org.apache.commons.lang3.RandomUtils
-
-import org.apache.spark.util.Utils
-
-class FileServerSuite extends SparkFunSuite with LocalSparkContext {
-
-  import SSLSampleConfigs._
-
-  @transient var tmpDir: File = _
-  @transient var tmpFile: File = _
-  @transient var tmpJarUrl: String = _
-
-  def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false")
-
-  override def beforeEach() {
-    super.beforeEach()
-    resetSparkContext()
-  }
-
-  override def beforeAll() {
-    super.beforeAll()
-
-    tmpDir = Utils.createTempDir()
-    val testTempDir = new File(tmpDir, "test")
-    testTempDir.mkdir()
-
-    val textFile = new File(testTempDir, "FileServerSuite.txt")
-    val pw = new PrintWriter(textFile)
-    // scalastyle:off println
-    pw.println("100")
-    // scalastyle:on println
-    pw.close()
-
-    val jarFile = new File(testTempDir, "test.jar")
-    val jarStream = new FileOutputStream(jarFile)
-    val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
-
-    val jarEntry = new JarEntry(textFile.getName)
-    jar.putNextEntry(jarEntry)
-
-    val in = new FileInputStream(textFile)
-    ByteStreams.copy(in, jar)
-
-    in.close()
-    jar.close()
-    jarStream.close()
-
-    tmpFile = textFile
-    tmpJarUrl = jarFile.toURI.toURL.toString
-  }
-
-  override def afterAll() {
-    try {
-      Utils.deleteRecursively(tmpDir)
-    } finally {
-      super.afterAll()
-    }
-  }
-
-  test("Distributing files locally") {
-    sc = new SparkContext("local[4]", "test", newConf)
-    sc.addFile(tmpFile.toString)
-    val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
-    val result = sc.parallelize(testData).reduceByKey {
-      val path = SparkFiles.get("FileServerSuite.txt")
-      val in = new BufferedReader(new FileReader(path))
-      val fileVal = in.readLine().toInt
-      in.close()
-      _ * fileVal + _ * fileVal
-    }.collect()
-    assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
-  }
-
-  test("Distributing files locally security On") {
-    val sparkConf = new SparkConf(false)
-    sparkConf.set("spark.authenticate", "true")
-    sparkConf.set("spark.authenticate.secret", "good")
-    sc = new SparkContext("local[4]", "test", sparkConf)
-
-    sc.addFile(tmpFile.toString)
-    assert(sc.env.securityManager.isAuthenticationEnabled() === true)
-    val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
-    val result = sc.parallelize(testData).reduceByKey {
-      val path = SparkFiles.get("FileServerSuite.txt")
-      val in = new BufferedReader(new FileReader(path))
-      val fileVal = in.readLine().toInt
-      in.close()
-      _ * fileVal + _ * fileVal
-    }.collect()
-    assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
-  }
-
-  test("Distributing files locally using URL as input") {
-    // addFile("file:///....")
-    sc = new SparkContext("local[4]", "test", newConf)
-    sc.addFile(new File(tmpFile.toString).toURI.toString)
-    val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
-    val result = sc.parallelize(testData).reduceByKey {
-      val path = SparkFiles.get("FileServerSuite.txt")
-      val in = new BufferedReader(new FileReader(path))
-      val fileVal = in.readLine().toInt
-      in.close()
-      _ * fileVal + _ * fileVal
-    }.collect()
-    assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
-  }
-
-  test ("Dynamically adding JARS locally") {
-    sc = new SparkContext("local[4]", "test", newConf)
-    sc.addJar(tmpJarUrl)
-    val testData = Array((1, 1))
-    sc.parallelize(testData).foreach { x =>
-      if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
-        throw new SparkException("jar not added")
-      }
-    }
-  }
-
-  test("Distributing files on a standalone cluster") {
-    sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
-    sc.addFile(tmpFile.toString)
-    val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
-    val result = sc.parallelize(testData).reduceByKey {
-      val path = SparkFiles.get("FileServerSuite.txt")
-      val in = new BufferedReader(new FileReader(path))
-      val fileVal = in.readLine().toInt
-      in.close()
-      _ * fileVal + _ * fileVal
-    }.collect()
-    assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
-  }
-
-  test ("Dynamically adding JARS on a standalone cluster") {
-    sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
-    sc.addJar(tmpJarUrl)
-    val testData = Array((1, 1))
-    sc.parallelize(testData).foreach { x =>
-      if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
-        throw new SparkException("jar not added")
-      }
-    }
-  }
-
-  test ("Dynamically adding JARS on a standalone cluster using local: URL") {
-    sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
-    sc.addJar(tmpJarUrl.replace("file", "local"))
-    val testData = Array((1, 1))
-    sc.parallelize(testData).foreach { x =>
-      if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
-        throw new SparkException("jar not added")
-      }
-    }
-  }
-
-  test ("HttpFileServer should work with SSL") {
-    val sparkConf = sparkSSLConfig()
-    val sm = new SecurityManager(sparkConf)
-    val server = new HttpFileServer(sparkConf, sm, 0)
-    try {
-      server.initialize()
-
-      fileTransferTest(server, sm)
-    } finally {
-      server.stop()
-    }
-  }
-
-  test ("HttpFileServer should work with SSL and good credentials") {
-    val sparkConf = sparkSSLConfig()
-    sparkConf.set("spark.authenticate", "true")
-    sparkConf.set("spark.authenticate.secret", "good")
-
-    val sm = new SecurityManager(sparkConf)
-    val server = new HttpFileServer(sparkConf, sm, 0)
-    try {
-      server.initialize()
-
-      fileTransferTest(server, sm)
-    } finally {
-      server.stop()
-    }
-  }
-
-  test ("HttpFileServer should not work with valid SSL and bad credentials") {
-    val sparkConf = sparkSSLConfig()
-    sparkConf.set("spark.authenticate", "true")
-    sparkConf.set("spark.authenticate.secret", "bad")
-
-    val sm = new SecurityManager(sparkConf)
-    val server = new HttpFileServer(sparkConf, sm, 0)
-    try {
-      server.initialize()
-
-      intercept[IOException] {
-        fileTransferTest(server)
-      }
-    } finally {
-      server.stop()
-    }
-  }
-
-  test ("HttpFileServer should not work with SSL when the server is untrusted") {
-    val sparkConf = sparkSSLConfigUntrusted()
-    val sm = new SecurityManager(sparkConf)
-    val server = new HttpFileServer(sparkConf, sm, 0)
-    try {
-      server.initialize()
-
-      intercept[SSLException] {
-        fileTransferTest(server)
-      }
-    } finally {
-      server.stop()
-    }
-  }
-
-  def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = {
-    val randomContent = RandomUtils.nextBytes(100)
-    val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir)
-    Files.write(randomContent, file)
-    server.addFile(file)
-
-    val uri = new URI(server.serverUri + "/files/" + file.getName)
-
-    val connection = if (sm != null && sm.isAuthenticationEnabled()) {
-      Utils.constructURIForAuthentication(uri, sm).toURL.openConnection()
-    } else {
-      uri.toURL.openConnection()
-    }
-
-    if (sm != null) {
-      Utils.setupSecureURLConnection(connection, sm)
-    }
-
-    val buf = ByteStreams.toByteArray(connection.getInputStream)
-    assert(buf === randomContent)
-  }
-
-}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 18e53508406dc4091593915407d86d7b2447ea52..c7f629a14ba24f34ea332b3c78eaab2abd9a0cb6 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -175,9 +175,9 @@ class HeartbeatReceiverSuite
     val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
     val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
     fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
-      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty))
+      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, 0, Map.empty))
     fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
-      RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
+      RegisterExecutor(executorId2, dummyExecutorEndpointRef2, 0, Map.empty))
     heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
     addExecutorAndVerify(executorId1)
     addExecutorAndVerify(executorId2)
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index e1a0bf7c933b92d8cbaee8b267ad789e492bf13c..24ec99c7e5e60fe24e416ab901137fa4fdd3dd73 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -52,7 +52,7 @@ object LocalSparkContext {
     if (sc != null) {
       sc.stop()
     }
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    // To avoid RPC rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
   }
 
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 3819c0a8f31dc8d2038b6a4dd6bde724c66039a9..6546def5966e1b81f0cb657d6134c96d4cf49887 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -154,9 +154,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     slaveRpcEnv.shutdown()
   }
 
-  test("remote fetch below akka frame size") {
+  test("remote fetch below max RPC message size") {
     val newConf = new SparkConf
-    newConf.set("spark.akka.frameSize", "1")
+    newConf.set("spark.rpc.message.maxSize", "1")
     newConf.set("spark.rpc.askTimeout", "1") // Fail fast
 
     val masterTracker = new MapOutputTrackerMaster(conf)
@@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
     rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
 
-    // Frame size should be ~123B, and no exception should be thrown
+    // Message size should be ~123B, and no exception should be thrown
     masterTracker.registerShuffle(10, 1)
     masterTracker.registerMapOutput(10, 0, MapStatus(
       BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
@@ -179,9 +179,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     rpcEnv.shutdown()
   }
 
-  test("remote fetch exceeds akka frame size") {
+  test("remote fetch exceeds max RPC message size") {
     val newConf = new SparkConf
-    newConf.set("spark.akka.frameSize", "1")
+    newConf.set("spark.rpc.message.maxSize", "1")
     newConf.set("spark.rpc.askTimeout", "1") // Fail fast
 
     val masterTracker = new MapOutputTrackerMaster(conf)
@@ -189,7 +189,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
     rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
 
-    // Frame size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception.
+    // Message size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception.
     // Note that the size is hand-selected here because map output statuses are compressed before
     // being sent.
     masterTracker.registerShuffle(20, 100)
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index 7603cef7736912c00f9357f75db02db82d51f498..8bdb237c28f66724a364b0045a7c4bf3c1bfb1f0 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -181,10 +181,8 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
     "SSL_DHE_RSA_WITH_AES_128_CBC_SHA256")
 
     val securityManager = new SecurityManager(conf)
-    val akkaSSLOptions = securityManager.getSSLOptions("akka")
 
     assert(securityManager.fileServerSSLOptions.enabled === true)
-    assert(akkaSSLOptions.enabled === true)
 
     assert(securityManager.sslSocketFactory.isDefined === true)
     assert(securityManager.hostnameVerifier.isDefined === true)
@@ -198,16 +196,6 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
     assert(securityManager.fileServerSSLOptions.keyPassword === Some("password"))
     assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1.2"))
     assert(securityManager.fileServerSSLOptions.enabledAlgorithms === expectedAlgorithms)
-
-    assert(akkaSSLOptions.trustStore.isDefined === true)
-    assert(akkaSSLOptions.trustStore.get.getName === "truststore")
-    assert(akkaSSLOptions.keyStore.isDefined === true)
-    assert(akkaSSLOptions.keyStore.get.getName === "keystore")
-    assert(akkaSSLOptions.trustStorePassword === Some("password"))
-    assert(akkaSSLOptions.keyStorePassword === Some("password"))
-    assert(akkaSSLOptions.keyPassword === Some("password"))
-    assert(akkaSSLOptions.protocol === Some("TLSv1.2"))
-    assert(akkaSSLOptions.enabledAlgorithms === expectedAlgorithms)
   }
 
   test("ssl off setup") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
index 190e4dd7285b3a913b50111d62e2e74ddfc5aff8..9c13c15281a423b21038d3d57714ec27fc12b8b9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
@@ -69,7 +69,7 @@ private[deploy] object DeployTestUtils {
       "publicAddress",
       new File("sparkHome"),
       new File("workDir"),
-      "akka://worker",
+      "spark://worker",
       new SparkConf,
       Seq("localDir"),
       ExecutorState.RUNNING)
@@ -84,7 +84,7 @@ private[deploy] object DeployTestUtils {
       new File("sparkHome"),
       createDriverDesc(),
       null,
-      "akka://worker",
+      "spark://worker",
       new SecurityManager(conf))
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index ab3d4cafebefa6c30a657104d37941ef5f6c8479..fdada0777f9a92aeae4ce87b599949fd4b76a6e8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -544,7 +544,7 @@ class StandaloneDynamicAllocationSuite
       val endpointRef = mock(classOf[RpcEndpointRef])
       val mockAddress = mock(classOf[RpcAddress])
       when(endpointRef.address).thenReturn(mockAddress)
-      val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, Map.empty)
+      val message = RegisterExecutor(id, endpointRef, 10, Map.empty)
       val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
       backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message)
     }
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index bd8b0655f4bbfed2d90939d038ee5ff3ddd32450..2a1696be3660a8943686153cf29670b10c96fccd 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -34,7 +34,7 @@ class DriverRunnerTest extends SparkFunSuite {
     val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
     val conf = new SparkConf()
     new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
-      driverDescription, null, "akka://1.2.3.4/worker/", new SecurityManager(conf))
+      driverDescription, null, "spark://1.2.3.4/worker/", new SecurityManager(conf))
   }
 
   private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 64e486d791cde54d9a0537f942ad2e578b7eeea7..6f4eda8b47ddeabdcb4bdfc47cbad20fb7283bd8 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -626,9 +626,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
       val e = intercept[Exception] {
         Await.result(f, 1 seconds)
       }
-      assert(e.isInstanceOf[TimeoutException] || // For Akka
-        e.isInstanceOf[NotSerializableException] // For Netty
-      )
+      assert(e.isInstanceOf[NotSerializableException])
     } finally {
       anotherEnv.shutdown()
       anotherEnv.awaitTermination()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 70f40fb26c2f6d752408c77bff89660f18f704fa..04cccc67e328e84209911d15a905373e07f901f7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -18,16 +18,16 @@
 package org.apache.spark.scheduler
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
-import org.apache.spark.util.{AkkaUtils, SerializableBuffer}
+import org.apache.spark.util.{RpcUtils, SerializableBuffer}
 
 class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext {
 
-  test("serialized task larger than akka frame size") {
+  test("serialized task larger than max RPC message size") {
     val conf = new SparkConf
-    conf.set("spark.akka.frameSize", "1")
+    conf.set("spark.rpc.message.maxSize", "1")
     conf.set("spark.default.parallelism", "1")
     sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
-    val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
+    val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
     val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
     val larger = sc.parallelize(Seq(buffer))
     val thrown = intercept[SparkException] {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index c87158d89f3fb8bf06e57a0a6dae401f9267c753..58d217ffef566561b7d2bd920704acfbfc518ff7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.Matchers
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
 
 class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
   with ResetSystemProperties {
@@ -284,19 +284,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
   }
 
   test("onTaskGettingResult() called when result fetched remotely") {
-    val conf = new SparkConf().set("spark.akka.frameSize", "1")
+    val conf = new SparkConf().set("spark.rpc.message.maxSize", "1")
     sc = new SparkContext("local", "SparkListenerSuite", conf)
     val listener = new SaveTaskEvents
     sc.addSparkListener(listener)
 
-    // Make a task whose result is larger than the akka frame size
-    val akkaFrameSize =
-      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
-    assert(akkaFrameSize === 1024 * 1024)
+    // Make a task whose result is larger than the RPC message size
+    val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+    assert(maxRpcMessageSize === 1024 * 1024)
     val result = sc.parallelize(Seq(1), 1)
-      .map { x => 1.to(akkaFrameSize).toArray }
+      .map { x => 1.to(maxRpcMessageSize).toArray }
       .reduce { case (x, y) => x }
-    assert(result === 1.to(akkaFrameSize).toArray)
+    assert(result === 1.to(maxRpcMessageSize).toArray)
 
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     val TASK_INDEX = 0
@@ -310,7 +309,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     val listener = new SaveTaskEvents
     sc.addSparkListener(listener)
 
-    // Make a task whose result is larger than the akka frame size
+    // Make a task whose result is larger than the RPC message size
     val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
     assert(result === 2)
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index bc72c3685e8c1d43a9f7c502a639d8b3afeb8144..cc2557c2f1df29c463eb14a6101186ee453ec174 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark._
 import org.apache.spark.storage.TaskResultBlockId
 import org.apache.spark.TestUtils.JavaSourceFromString
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
+import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils}
 
 /**
  * Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
@@ -77,22 +77,22 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
  */
 class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext {
 
-  // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
+  // Set the RPC message size to be as small as possible (it must be an integer, so 1 is as small
   // as we can make it) so the tests don't take too long.
-  def conf: SparkConf = new SparkConf().set("spark.akka.frameSize", "1")
+  def conf: SparkConf = new SparkConf().set("spark.rpc.message.maxSize", "1")
 
-  test("handling results smaller than Akka frame size") {
+  test("handling results smaller than max RPC message size") {
     sc = new SparkContext("local", "test", conf)
     val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
     assert(result === 2)
   }
 
-  test("handling results larger than Akka frame size") {
+  test("handling results larger than max RPC message size") {
     sc = new SparkContext("local", "test", conf)
-    val akkaFrameSize =
-      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
-    val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
-    assert(result === 1.to(akkaFrameSize).toArray)
+    val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+    val result =
+      sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x)
+    assert(result === 1.to(maxRpcMessageSize).toArray)
 
     val RESULT_BLOCK_ID = TaskResultBlockId(0)
     assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
@@ -114,11 +114,11 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
     }
     val resultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
     scheduler.taskResultGetter = resultGetter
-    val akkaFrameSize =
-      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
-    val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
+    val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+    val result =
+      sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x)
     assert(resultGetter.removeBlockSuccessfully)
-    assert(result === 1.to(akkaFrameSize).toArray)
+    assert(result === 1.to(maxRpcMessageSize).toArray)
 
     // Make sure two tasks were run (one failed one, and a second retried one).
     assert(scheduler.nextTaskId.get() === 2)
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 0760529b578eec2f7c95c0cdc3c010682716ab49..4d9937c5cbc3400f3df9cf9faad8e327d5bc01cd 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
 RoaringBitmap-0.5.11.jar
 ST4-4.0.4.jar
 activation-1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
 antlr-runtime-3.5.2.jar
 aopalliance-1.0.jar
 apache-log4j-extras-1.2.17.jar
@@ -44,7 +41,6 @@ commons-math3-3.4.1.jar
 commons-net-2.2.jar
 commons-pool-1.5.4.jar
 compress-lzf-1.0.3.jar
-config-1.2.1.jar
 core-1.1.2.jar
 curator-client-2.4.0.jar
 curator-framework-2.4.0.jar
@@ -179,7 +175,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar
 xbean-asm5-shaded-4.4.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 191f2a0e4e86f9c11decde31c175cbd5401b47c9..fd659ee20df1a7b280faf5c9418ebf740a2dd6a8 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
 RoaringBitmap-0.5.11.jar
 ST4-4.0.4.jar
 activation-1.1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
 antlr-runtime-3.5.2.jar
 aopalliance-1.0.jar
 apache-log4j-extras-1.2.17.jar
@@ -45,7 +42,6 @@ commons-math3-3.4.1.jar
 commons-net-2.2.jar
 commons-pool-1.5.4.jar
 compress-lzf-1.0.3.jar
-config-1.2.1.jar
 core-1.1.2.jar
 curator-client-2.4.0.jar
 curator-framework-2.4.0.jar
@@ -170,7 +166,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar
 xbean-asm5-shaded-4.4.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 9134e997c845714a7bfc2fb6efe442957cf7ad94..afae3deb9ada2af3796ae7eb1197316580086b98 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
 RoaringBitmap-0.5.11.jar
 ST4-4.0.4.jar
 activation-1.1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
 antlr-runtime-3.5.2.jar
 aopalliance-1.0.jar
 apache-log4j-extras-1.2.17.jar
@@ -45,7 +42,6 @@ commons-math3-3.4.1.jar
 commons-net-2.2.jar
 commons-pool-1.5.4.jar
 compress-lzf-1.0.3.jar
-config-1.2.1.jar
 core-1.1.2.jar
 curator-client-2.4.0.jar
 curator-framework-2.4.0.jar
@@ -171,7 +167,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar
 xbean-asm5-shaded-4.4.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 8c45832873848881d2c56c0e7ef024fb09e99a40..5a6460136a3a0e7a9ced1f2bebc517e3928ede87 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
 RoaringBitmap-0.5.11.jar
 ST4-4.0.4.jar
 activation-1.1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
 antlr-runtime-3.5.2.jar
 aopalliance-1.0.jar
 apache-log4j-extras-1.2.17.jar
@@ -49,7 +46,6 @@ commons-math3-3.4.1.jar
 commons-net-2.2.jar
 commons-pool-1.5.4.jar
 compress-lzf-1.0.3.jar
-config-1.2.1.jar
 core-1.1.2.jar
 curator-client-2.6.0.jar
 curator-framework-2.6.0.jar
@@ -177,7 +173,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar
 xbean-asm5-shaded-4.4.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 1d34854819a6962a61c5bc2be946f46adc41b29d..70083e7f3d16aa3237fd0ce85954455492c8955b 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
 RoaringBitmap-0.5.11.jar
 ST4-4.0.4.jar
 activation-1.1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
 antlr-runtime-3.5.2.jar
 aopalliance-1.0.jar
 apache-log4j-extras-1.2.17.jar
@@ -49,7 +46,6 @@ commons-math3-3.4.1.jar
 commons-net-2.2.jar
 commons-pool-1.5.4.jar
 compress-lzf-1.0.3.jar
-config-1.2.1.jar
 core-1.1.2.jar
 curator-client-2.6.0.jar
 curator-framework-2.6.0.jar
@@ -178,7 +174,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar
 xbean-asm5-shaded-4.4.jar
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index 2810112f5294ec7714eaf4363699936956c80baa..814e4406cf4357c1e92f5b8e32219184cb4a39ea 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -35,7 +35,7 @@ There are several useful things to note about this architecture:
    processes, and these communicate with each other, it is relatively easy to run it even on a
    cluster manager that also supports other applications (e.g. Mesos/YARN).
 3. The driver program must listen for and accept incoming connections from its executors throughout
-   its lifetime (e.g., see [spark.driver.port and spark.fileserver.port in the network config
+   its lifetime (e.g., see [spark.driver.port in the network config
    section](configuration.html#networking)). As such, the driver program must be network
    addressable from the worker nodes.
 4. Because the driver schedules tasks on the cluster, it should be run close to the worker
diff --git a/docs/configuration.md b/docs/configuration.md
index acaeb830081e2d814bc22eab62afb6906ffb799f..d2a2f1052405d872b4facff4b1b5bd3c1b8e18af 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -944,52 +944,12 @@ Apart from these, the following properties are also available, and may be useful
 <table class="table">
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
 <tr>
-  <td><code>spark.akka.frameSize</code></td>
+  <td><code>spark.rpc.message.maxSize</code></td>
   <td>128</td>
   <td>
     Maximum message size (in MB) to allow in "control plane" communication; generally only applies to map
     output size information sent between executors and the driver. Increase this if you are running
-    jobs with many thousands of map and reduce tasks and see messages about the frame size.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.akka.heartbeat.interval</code></td>
-  <td>1000s</td>
-  <td>
-    This is set to a larger value to disable the transport failure detector that comes built in to
-    Akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger
-    interval value reduces network overhead and a smaller value ( ~ 1 s) might be more
-    informative for Akka's failure detector. Tune this in combination of <code>spark.akka.heartbeat.pauses</code>
-    if you need to. A likely positive use case for using failure detector would be: a sensistive
-    failure detector can help evict rogue executors quickly. However this is usually not the case
-    as GC pauses and network lags are expected in a real Spark cluster. Apart from that enabling
-    this leads to a lot of exchanges of heart beats between nodes leading to flooding the network
-    with those.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.akka.heartbeat.pauses</code></td>
-  <td>6000s</td>
-  <td>
-     This is set to a larger value to disable the transport failure detector that comes built in to Akka.
-     It can be enabled again, if you plan to use this feature (Not recommended). Acceptable heart
-     beat pause for Akka. This can be used to control sensitivity to GC pauses. Tune
-     this along with <code>spark.akka.heartbeat.interval</code> if you need to.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.akka.threads</code></td>
-  <td>4</td>
-  <td>
-    Number of actor threads to use for communication. Can be useful to increase on large clusters
-    when the driver has a lot of CPU cores.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.akka.timeout</code></td>
-  <td>100s</td>
-  <td>
-    Communication timeout between Spark nodes.
+    jobs with many thousands of map and reduce tasks and see messages about the RPC message size.
   </td>
 </tr>
 <tr>
@@ -1015,28 +975,12 @@ Apart from these, the following properties are also available, and may be useful
     This is used for communicating with the executors and the standalone Master.
   </td>
 </tr>
-<tr>
-  <td><code>spark.executor.port</code></td>
-  <td>(random)</td>
-  <td>
-    Port for the executor to listen on. This is used for communicating with the driver.
-    This is only relevant when using the Akka RPC backend.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.fileserver.port</code></td>
-  <td>(random)</td>
-  <td>
-    Port for the driver's HTTP file server to listen on.
-    This is only relevant when using the Akka RPC backend.
-  </td>
-</tr>
 <tr>
   <td><code>spark.network.timeout</code></td>
   <td>120s</td>
   <td>
     Default timeout for all network interactions. This config will be used in place of
-    <code>spark.core.connection.ack.wait.timeout</code>, <code>spark.akka.timeout</code>,
+    <code>spark.core.connection.ack.wait.timeout</code>,
     <code>spark.storage.blockManagerSlaveTimeoutMs</code>,
     <code>spark.shuffle.io.connectionTimeout</code>, <code>spark.rpc.askTimeout</code> or
     <code>spark.rpc.lookupTimeout</code> if they are not configured.
@@ -1418,8 +1362,7 @@ Apart from these, the following properties are also available, and may be useful
 
             <p>Use <code>spark.ssl.YYY.XXX</code> settings to overwrite the global configuration for
             particular protocol denoted by <code>YYY</code>. Currently <code>YYY</code> can be
-            either <code>akka</code> for Akka based connections or <code>fs</code> for file
-            server.</p>
+            only <code>fs</code> for file server.</p>
         </td>
     </tr>
     <tr>
diff --git a/docs/security.md b/docs/security.md
index a4cc0f42b24826cad5b2e2887d8238c778ef8683..32c33d285747a9371e6ce831a40beed5333eea59 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -27,8 +27,7 @@ If your applications are using event logging, the directory where the event logs
 
 ## Encryption
 
-Spark supports SSL for Akka and HTTP protocols. SASL encryption is supported for the block transfer
-service.
+Spark supports SSL for HTTP protocols. SASL encryption is supported for the block transfer service.
 
 Encryption is not yet supported for data stored by Spark in temporary local storage, such as shuffle
 files, cached data, and other application files. If encrypting this data is desired, a workaround is
@@ -48,10 +47,6 @@ component-specific configuration namespaces used to override the default setting
     <th>Config Namespace</th>
     <th>Component</th>
   </tr>
-  <tr>
-    <td><code>spark.ssl.akka</code></td>
-    <td>Akka communication channels</td>
-  </tr>
   <tr>
     <td><code>spark.ssl.fs</code></td>
     <td>HTTP file server and broadcast server</td>
@@ -137,7 +132,7 @@ configure those ports.
     <td>7077</td>
     <td>Submit job to cluster /<br> Join cluster</td>
     <td><code>SPARK_MASTER_PORT</code></td>
-    <td>Akka-based. Set to "0" to choose a port randomly. Standalone mode only.</td>
+    <td>Set to "0" to choose a port randomly. Standalone mode only.</td>
   </tr>
   <tr>
     <td>Standalone Master</td>
@@ -145,7 +140,7 @@ configure those ports.
     <td>(random)</td>
     <td>Schedule executors</td>
     <td><code>SPARK_WORKER_PORT</code></td>
-    <td>Akka-based. Set to "0" to choose a port randomly. Standalone mode only.</td>
+    <td>Set to "0" to choose a port randomly. Standalone mode only.</td>
   </tr>
 </table>
 
@@ -178,24 +173,7 @@ configure those ports.
     <td>(random)</td>
     <td>Connect to application /<br> Notify executor state changes</td>
     <td><code>spark.driver.port</code></td>
-    <td>Akka-based. Set to "0" to choose a port randomly.</td>
-  </tr>
-  <tr>
-    <td>Driver</td>
-    <td>Executor</td>
-    <td>(random)</td>
-    <td>Schedule tasks</td>
-    <td><code>spark.executor.port</code></td>
-    <td>Akka-based. Set to "0" to choose a port randomly. Only used if Akka RPC backend is
-    configured.</td>
-  </tr>
-  <tr>
-    <td>Executor</td>
-    <td>Driver</td>
-    <td>(random)</td>
-    <td>File server for files and jars</td>
-    <td><code>spark.fileserver.port</code></td>
-    <td>Jetty-based. Only used if Akka RPC backend is configured.</td>
+    <td>Set to "0" to choose a port randomly.</td>
   </tr>
   <tr>
     <td>Executor / Driver</td>
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala
index 5778fd1d092542884d6c2e9d147142f97cb7cbb8..ca7385128d79aa7e580d57c179c24c3ef13d1cb5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala
@@ -47,7 +47,7 @@ trait VectorTransformer extends Serializable {
    */
   @Since("1.1.0")
   def transform(data: RDD[Vector]): RDD[Vector] = {
-    // Later in #1498 , all RDD objects are sent via broadcasting instead of akka.
+    // Later in #1498 , all RDD objects are sent via broadcasting instead of RPC.
     // So it should be no longer necessary to explicitly broadcast `this` object.
     data.map(x => this.transform(x))
   }
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
index 9b2d023bbf73882dec69ff0d7aad78fb9f4832af..95d874b8432ebeaa9849c29e8bebe85cb848de71 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
@@ -29,7 +29,7 @@ trait LocalClusterSparkContext extends BeforeAndAfterAll { self: Suite =>
     val conf = new SparkConf()
       .setMaster("local-cluster[2, 1, 1024]")
       .setAppName("test-cluster")
-      .set("spark.akka.frameSize", "1") // set to 1MB to detect direct serialization of data
+      .set("spark.rpc.message.maxSize", "1") // set to 1MB to detect direct serialization of data
     sc = new SparkContext(conf)
   }
 
diff --git a/pom.xml b/pom.xml
index 43f08efaae86db721b7bbdfe366711ad98029c09..f08642f606788ce73303dd097557513f6cf69edc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -569,6 +569,11 @@
         <artifactId>netty-all</artifactId>
         <version>4.0.29.Final</version>
       </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty</artifactId>
+        <version>3.8.0.Final</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.derby</groupId>
         <artifactId>derby</artifactId>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 905fb4cd90377d2f3dc759d096b98183b1b6d823..c65fae482c5ca265bb08145d5cc7d0f2a8e8244a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -162,7 +162,9 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$4"),
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$3"),
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream"),
-        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.actorStream")
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.actorStream"),
+        ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.streaming.zeromq.ZeroMQReceiver"),
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.streaming.receiver.ActorReceiver$Supervisor")
       ) ++ Seq(
         // SPARK-12847 Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus$"),
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 081f5a1c93e6e5939423e4c4dd7de3982e72576a..898db85190b6704a26be366e681ef9b8ebe65cb8 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -41,7 +41,6 @@ class ReceivedBlockTrackerSuite
   extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
 
   val hadoopConf = new Configuration()
-  val akkaTimeout = 10 seconds
   val streamId = 1
 
   var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]()
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 31fa53e24b5075f1d47892f385902be90c8119f0..21ac04dc76c32229746ed024fff68f1f9792fac4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -166,7 +166,7 @@ class ExecutorRunnable(
 
     // Certain configs need to be passed here because they are needed before the Executor
     // registers with the Scheduler and transfers the spark configs. Since the Executor backend
-    // uses Akka to connect to the scheduler, the akka settings are needed as well as the
+    // uses RPC to connect to the scheduler, the RPC settings are needed as well as the
     // authentication settings.
     sparkConf.getAll
       .filter { case (k, v) => SparkConf.isExecutorStartupConf(k) }