diff --git a/core/src/main/resources/org/apache/spark/log4j-defaults.properties b/core/src/main/resources/org/apache/spark/log4j-defaults.properties index c99a61f63ea2b50a043d9d99dff728583f155c5f..89eec7d4b7f615f98348b805093f945e208ce757 100644 --- a/core/src/main/resources/org/apache/spark/log4j-defaults.properties +++ b/core/src/main/resources/org/apache/spark/log4j-defaults.properties @@ -10,4 +10,3 @@ log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO -log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4c4ee04cc515e4c492191709b4e48a3616ed9e09..3c61c10820ba948e476763cff1b5e78aca1dfc2f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1986,7 +1986,7 @@ object SparkContext extends Logging { case "yarn-client" => val scheduler = try { val clazz = - Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler") + Class.forName("org.apache.spark.scheduler.cluster.YarnScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 8ae4f243ec1ae2b2d14c01b5f86080cdb130f610..bbed8ddc6bafc004e8d3d6380d6b719231db73cf 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -149,7 +149,7 @@ class SparkContextSchedulerCreationSuite } test("yarn-client") { - testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler") + testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnScheduler") } def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index d00f29665a58f22c49c65ac3e1c526b2cf8868fa..3849586c6111e8c65bab0a86fd2dce8050e72b96 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -32,6 +32,8 @@ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.util.RackResolver +import org.apache.log4j.{Level, Logger} + import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -60,6 +62,11 @@ private[yarn] class YarnAllocator( import YarnAllocator._ + // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. + if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { + Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) + } + // Visible for testing. val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]] diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4bff8461236196800ee3e24ef9d262c740528632..4e39c1d58011b07573c254ab36ccaad8a02ec004 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -17,12 +17,9 @@ package org.apache.spark.deploy.yarn -import java.lang.{Boolean => JBoolean} import java.io.File -import java.util.{Collections, Set => JSet} import java.util.regex.Matcher import java.util.regex.Pattern -import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.HashMap @@ -32,7 +29,6 @@ import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType} -import org.apache.hadoop.yarn.util.RackResolver import org.apache.hadoop.conf.Configuration import org.apache.spark.{SecurityManager, SparkConf} diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index be55d26f1cf61a4cebe83005c561a50e2cd64d83..72ec4d6b34af6498834cc41583b0d6e67862e618 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -17,33 +17,17 @@ package org.apache.spark.scheduler.cluster -import org.apache.hadoop.yarn.util.RackResolver - import org.apache.spark._ import org.apache.spark.deploy.yarn.ApplicationMaster -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.Utils /** * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of * ApplicationMaster, etc is done */ -private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { +private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) { logInfo("Created YarnClusterScheduler") - // Nothing else for now ... initialize application master : which needs a SparkContext to - // determine how to allocate. - // Note that only the first creation of a SparkContext influences (and ideally, there must be - // only one SparkContext, right ?). Subsequent creations are ignored since executors are already - // allocated by then. - - // By default, rack is unknown - override def getRackForHost(hostPort: String): Option[String] = { - val host = Utils.parseHostPort(hostPort)._1 - Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation) - } - override def postStartHook() { ApplicationMaster.sparkContextInitialized(sc) super.postStartHook() diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala similarity index 77% rename from yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala rename to yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala index 2fa24cc43325e6ef3ef08ef81d79262789b62275..4ebf3af12b3812bc4f1315d28a6c6ffa3e2a0a04 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala @@ -19,14 +19,18 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.util.RackResolver +import org.apache.log4j.{Level, Logger} + import org.apache.spark._ import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils -/** - * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM. - */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { +private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { + + // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. + if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { + Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) + } // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = {