From 254eaa4d350dafe19f1715e80eb816856a126c21 Mon Sep 17 00:00:00 2001
From: Sandy Ryza <sandy@cloudera.com>
Date: Fri, 30 Jan 2015 11:31:54 -0600
Subject: [PATCH] SPARK-5393. Flood of util.RackResolver log messages after
 SPARK-1714

Previously I had tried to solve this with by adding a line in Spark's log4j-defaults.properties.

The issue with the message in log4j-defaults.properties was that the log4j.properties packaged inside Hadoop was getting picked up instead. While it would be ideal to fix that as well, we still want to quiet this in situations where a user supplies their own custom log4j properties.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #4192 from sryza/sandy-spark-5393 and squashes the following commits:

4d5dedc [Sandy Ryza] Only set log level if unset
46e07c5 [Sandy Ryza] SPARK-5393. Flood of util.RackResolver log messages after SPARK-1714
---
 .../org/apache/spark/log4j-defaults.properties |  1 -
 .../scala/org/apache/spark/SparkContext.scala  |  2 +-
 .../SparkContextSchedulerCreationSuite.scala   |  2 +-
 .../spark/deploy/yarn/YarnAllocator.scala      |  7 +++++++
 .../deploy/yarn/YarnSparkHadoopUtil.scala      |  4 ----
 .../cluster/YarnClusterScheduler.scala         | 18 +-----------------
 ...sterScheduler.scala => YarnScheduler.scala} | 12 ++++++++----
 7 files changed, 18 insertions(+), 28 deletions(-)
 rename yarn/src/main/scala/org/apache/spark/scheduler/cluster/{YarnClientClusterScheduler.scala => YarnScheduler.scala} (77%)

diff --git a/core/src/main/resources/org/apache/spark/log4j-defaults.properties b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
index c99a61f63e..89eec7d4b7 100644
--- a/core/src/main/resources/org/apache/spark/log4j-defaults.properties
+++ b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
@@ -10,4 +10,3 @@ log4j.logger.org.eclipse.jetty=WARN
 log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
 log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
 log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
-log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4c4ee04cc5..3c61c10820 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1986,7 +1986,7 @@ object SparkContext extends Logging {
       case "yarn-client" =>
         val scheduler = try {
           val clazz =
-            Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
+            Class.forName("org.apache.spark.scheduler.cluster.YarnScheduler")
           val cons = clazz.getConstructor(classOf[SparkContext])
           cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
 
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 8ae4f243ec..bbed8ddc6b 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -149,7 +149,7 @@ class SparkContextSchedulerCreationSuite
   }
 
   test("yarn-client") {
-    testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
+    testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnScheduler")
   }
 
   def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index d00f29665a..3849586c61 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -32,6 +32,8 @@ import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.util.RackResolver
 
+import org.apache.log4j.{Level, Logger}
+
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -60,6 +62,11 @@ private[yarn] class YarnAllocator(
 
   import YarnAllocator._
 
+  // RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
+  if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
+    Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
+  }
+
   // Visible for testing.
   val allocatedHostToContainersMap =
     new HashMap[String, collection.mutable.Set[ContainerId]]
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4bff846123..4e39c1d580 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,12 +17,9 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.lang.{Boolean => JBoolean}
 import java.io.File
-import java.util.{Collections, Set => JSet}
 import java.util.regex.Matcher
 import java.util.regex.Pattern
-import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable.HashMap
 
@@ -32,7 +29,6 @@ import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
-import org.apache.hadoop.yarn.util.RackResolver
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SecurityManager, SparkConf}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index be55d26f1c..72ec4d6b34 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -17,33 +17,17 @@
 
 package org.apache.spark.scheduler.cluster
 
-import org.apache.hadoop.yarn.util.RackResolver
-
 import org.apache.spark._
 import org.apache.spark.deploy.yarn.ApplicationMaster
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
 
 /**
  * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
  * ApplicationMaster, etc is done
  */
-private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
+private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
 
   logInfo("Created YarnClusterScheduler")
 
-  // Nothing else for now ... initialize application master : which needs a SparkContext to
-  // determine how to allocate.
-  // Note that only the first creation of a SparkContext influences (and ideally, there must be
-  // only one SparkContext, right ?). Subsequent creations are ignored since executors are already
-  // allocated by then.
-
-  // By default, rack is unknown
-  override def getRackForHost(hostPort: String): Option[String] = {
-    val host = Utils.parseHostPort(hostPort)._1
-    Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
-  }
-
   override def postStartHook() {
     ApplicationMaster.sparkContextInitialized(sc)
     super.postStartHook()
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
similarity index 77%
rename from yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
rename to yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
index 2fa24cc433..4ebf3af12b 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
@@ -19,14 +19,18 @@ package org.apache.spark.scheduler.cluster
 
 import org.apache.hadoop.yarn.util.RackResolver
 
+import org.apache.log4j.{Level, Logger}
+
 import org.apache.spark._
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.Utils
 
-/**
- * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
- */
-private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
+private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
+
+  // RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
+  if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
+    Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
+  }
 
   // By default, rack is unknown
   override def getRackForHost(hostPort: String): Option[String] = {
-- 
GitLab