diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index ac553b71115df3c321f2e0f411ba1458bc3681d3..7e2cf956c7253d19aeedb3c90d45237199fadffe 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -66,6 +66,7 @@ private[spark] class ApplicationInfo(
     nextExecutorId = 0
     removedExecutors = new ArrayBuffer[ExecutorDesc]
     executorLimit = Integer.MAX_VALUE
+    appUIUrlAtHistoryServer = None
   }
 
   private def newExecutorId(useID: Option[Int] = None): Int = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1afc1ff59f2f9f74ee1652cf8ec72044dbb4f83a..f41efb097b4be6954fbf368c4ed06d2e49535dea 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -690,7 +690,7 @@ private[deploy] object Worker extends Logging {
     val conf = new SparkConf
     val args = new WorkerArguments(argStrings, conf)
     val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
-      args.memory, args.masters, args.workDir)
+      args.memory, args.masters, args.workDir, conf = conf)
     rpcEnv.awaitTermination()
   }
 
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 68c5f44145b0d128349e45d070c964af54a4dc5c..f82fd4eb5756d1456f9f01404141a8c16ac8accd 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -553,6 +553,9 @@ private[netty] class NettyRpcHandler(
   // A variable to track whether we should dispatch the RemoteProcessConnected message.
   private val clients = new ConcurrentHashMap[TransportClient, JBoolean]()
 
+  // A variable to track the remote RpcEnv addresses of all clients
+  private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()
+
   override def receive(
       client: TransportClient,
       message: ByteBuffer,
@@ -580,6 +583,12 @@ private[netty] class NettyRpcHandler(
       // Create a new message with the socket address of the client as the sender.
       RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content)
     } else {
+      // The remote RpcEnv listens to some port, we should also fire a RemoteProcessConnected for
+      // the listening address
+      val remoteEnvAddress = requestMessage.senderAddress
+      if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) {
+        dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress))
+      }
       requestMessage
     }
   }
@@ -591,6 +600,12 @@ private[netty] class NettyRpcHandler(
     if (addr != null) {
       val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
       dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
+      // If the remove RpcEnv listens to some address, we should also fire a
+      // RemoteProcessConnectionError for the remote RpcEnv listening address
+      val remoteEnvAddress = remoteAddresses.get(clientAddr)
+      if (remoteEnvAddress != null) {
+        dispatcher.postToAll(RemoteProcessConnectionError(cause, remoteEnvAddress))
+      }
     } else {
       // If the channel is closed before connecting, its remoteAddress will be null.
       // See java.net.Socket.getRemoteSocketAddress
@@ -606,6 +621,12 @@ private[netty] class NettyRpcHandler(
       val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
       nettyEnv.removeOutbox(clientAddr)
       dispatcher.postToAll(RemoteProcessDisconnected(clientAddr))
+      val remoteEnvAddress = remoteAddresses.remove(clientAddr)
+      // If the remove RpcEnv listens to some address, we should also  fire a
+      // RemoteProcessDisconnected for the remote RpcEnv listening address
+      if (remoteEnvAddress != null) {
+        dispatcher.postToAll(RemoteProcessDisconnected(remoteEnvAddress))
+      }
     } else {
       // If the channel is closed before connecting, its remoteAddress will be null. In this case,
       // we can ignore it since we don't fire "Associated".
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index a61d0479aacdbcdf19a450f103f9fc77ddb28ec3..6d153eb04e04f3fecb86b97da98789a0f7ec494e 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -545,6 +545,48 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     }
   }
 
+  test("network events between non-client-mode RpcEnvs") {
+    val events = new mutable.ArrayBuffer[(Any, Any)] with mutable.SynchronizedBuffer[(Any, Any)]
+    env.setupEndpoint("network-events-non-client", new ThreadSafeRpcEndpoint {
+      override val rpcEnv = env
+
+      override def receive: PartialFunction[Any, Unit] = {
+        case "hello" =>
+        case m => events += "receive" -> m
+      }
+
+      override def onConnected(remoteAddress: RpcAddress): Unit = {
+        events += "onConnected" -> remoteAddress
+      }
+
+      override def onDisconnected(remoteAddress: RpcAddress): Unit = {
+        events += "onDisconnected" -> remoteAddress
+      }
+
+      override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
+        events += "onNetworkError" -> remoteAddress
+      }
+
+    })
+
+    val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = false)
+    // Use anotherEnv to find out the RpcEndpointRef
+    val rpcEndpointRef = anotherEnv.setupEndpointRef(
+      "local", env.address, "network-events-non-client")
+    val remoteAddress = anotherEnv.address
+    rpcEndpointRef.send("hello")
+    eventually(timeout(5 seconds), interval(5 millis)) {
+      assert(events.contains(("onConnected", remoteAddress)))
+    }
+
+    anotherEnv.shutdown()
+    anotherEnv.awaitTermination()
+    eventually(timeout(5 seconds), interval(5 millis)) {
+      assert(events.contains(("onConnected", remoteAddress)))
+      assert(events.contains(("onDisconnected", remoteAddress)))
+    }
+  }
+
   test("sendWithReply: unserializable error") {
     env.setupEndpoint("sendWithReply-unserializable-error", new RpcEndpoint {
       override val rpcEnv = env