Skip to content
Snippets Groups Projects
Commit a2e7e049 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Merge pull request #333 from pwendell/logging-silence

Quiet ERROR-level Akka Logs

This fixes an issue I've seen where akka logs a bunch of things at ERROR level when connecting to a standalone cluster, even in the normal case. I noticed that even when lifecycle logging was disabled, the netty code inside of akka still logged away via akka's EndpointWriter class. There are also some other log streams that I think are new in akka 2.2.1 that I've disabled.

Finally, I added some better logging to the standalone client. This makes it more clear when a connection failure occurs what is going on. Previously it never explicitly said if a connection attempt had failed.

The commit messages here have some more detail.
parents 5b0986a1 675d7eb4
No related branches found
No related tags found
No related merge requests found
......@@ -24,7 +24,8 @@ import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
......@@ -110,6 +111,12 @@ private[spark] class Client(
}
}
private def isPossibleMaster(remoteUrl: Address) = {
masterUrls.map(s => Master.toAkkaUrl(s))
.map(u => AddressFromURIString(u).hostPort)
.contains(remoteUrl.hostPort)
}
override def receive = {
case RegisteredApplication(appId_, masterUrl) =>
appId = appId_
......@@ -145,6 +152,9 @@ private[spark] class Client(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
logWarning(s"Could not connect to $address: $cause")
case StopClient =>
markDead()
sender ! true
......
......@@ -21,6 +21,8 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
/**
......@@ -47,8 +49,15 @@ private[spark] object AkkaUtils {
val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt
val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt
val lifecycleEvents =
if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
if (!akkaLogLifecycleEvents) {
// As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
// See: https://www.assembla.com/spaces/akka/tickets/3787#/
Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
}
val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off"
val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt
val akkaFailureDetector =
......@@ -73,7 +82,10 @@ private[spark] object AkkaUtils {
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
|akka.log-config-on-start = $logAkkaConfig
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
|akka.log-dead-letters = $lifecycleEvents
|akka.log-dead-letters-during-shutdown = $lifecycleEvents
""".stripMargin)
val actorSystem = if (indestructible) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment