diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 9bbd635ab9bb002ce3934ffffec1b156eacfa45f..481026eaa2106e2e4310067906e9adf18c65f479 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -24,7 +24,8 @@ import scala.concurrent.duration._
 
 import akka.actor._
 import akka.pattern.ask
-import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
+
 import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
@@ -110,6 +111,12 @@ private[spark] class Client(
       }
     }
 
+    private def isPossibleMaster(remoteUrl: Address) = {
+      masterUrls.map(s => Master.toAkkaUrl(s))
+        .map(u => AddressFromURIString(u).hostPort)
+        .contains(remoteUrl.hostPort)
+    }
+
     override def receive = {
       case RegisteredApplication(appId_, masterUrl) =>
         appId = appId_
@@ -145,6 +152,9 @@ private[spark] class Client(
         logWarning(s"Connection to $address failed; waiting for master to reconnect...")
         markDisconnected()
 
+      case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
+        logWarning(s"Could not connect to $address: $cause")
+
       case StopClient =>
         markDead()
         sender ! true
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 362cea5e3e462a1c27687ee55ee0c26358cd3e5a..7df7e3d8e561cb97f8a0708c35f7cb97faaba073 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -21,6 +21,8 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
 
 import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
 import com.typesafe.config.ConfigFactory
+import org.apache.log4j.{Level, Logger}
+
 import org.apache.spark.SparkConf
 
 /**
@@ -47,8 +49,15 @@ private[spark] object AkkaUtils {
     val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt
 
     val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt
-    val lifecycleEvents =
-      if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
+    val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean
+    val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
+    if (!akkaLogLifecycleEvents) {
+      // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
+      // See: https://www.assembla.com/spaces/akka/tickets/3787#/
+      Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
+    }
+
+    val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off"
 
     val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt
     val akkaFailureDetector =
@@ -73,7 +82,10 @@ private[spark] object AkkaUtils {
       |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
       |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
       |akka.actor.default-dispatcher.throughput = $akkaBatchSize
+      |akka.log-config-on-start = $logAkkaConfig
       |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
+      |akka.log-dead-letters = $lifecycleEvents
+      |akka.log-dead-letters-during-shutdown = $lifecycleEvents
       """.stripMargin)
 
     val actorSystem = if (indestructible) {