From c8cc27611002f4310bc386df858059700a3fec9d Mon Sep 17 00:00:00 2001 From: "Y.CORP.YAHOO.COM\\tgraves" <tgraves@thatenemy-lm.champ.corp.yahoo.com> Date: Tue, 3 Sep 2013 10:50:21 -0500 Subject: [PATCH] Review comment changes and update to org.apache packaging --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/SparkEnv.scala | 2 +- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../scala/org/apache/spark/ui/SparkUI.scala | 1 - .../scala/org/apache/spark/ui/UIUtils.scala | 43 +++++++------------ .../scala/org/apache/spark/util/Utils.scala | 2 +- docs/running-on-yarn.md | 6 +-- .../spark/deploy/yarn/ApplicationMaster.scala | 17 ++++---- .../yarn/ApplicationMasterArguments.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 5 ++- .../spark/deploy/yarn/ClientArguments.scala | 2 +- .../spark/deploy/yarn/WorkerRunnable.scala | 3 +- .../deploy/yarn/YarnAllocationHandler.scala | 3 +- .../cluster/YarnClusterScheduler.scala | 1 + 14 files changed, 41 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f2641851cb..89318712a5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -196,7 +196,7 @@ class SparkContext( case "yarn-standalone" => val scheduler = try { - val clazz = Class.forName("spark.scheduler.cluster.YarnClusterScheduler") + val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) cons.newInstance(this).asInstanceOf[ClusterScheduler] } catch { diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 478e5a0aaf..29968c273c 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -62,7 +62,7 @@ class SparkEnv ( val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) if(yarnMode) { try { - Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] + Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] } catch { case th: Throwable => throw new SparkException("Unable to load YARN support", th) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 3f8a96fc47..9a2cf20de7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -50,7 +50,7 @@ private[spark] class SparkDeploySchedulerBackend( "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(null) val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome, - sc.ui.appHttpUIAddress) + "http://" + sc.ui.appUIAddress) client = new Client(sc.env.actorSystem, master, appDesc, this) client.start() diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 5d54420e72..48eb096063 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -79,7 +79,6 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { server.foreach(_.stop()) } - private[spark] def appHttpUIAddress = "http://" + appUIAddress private[spark] def appUIAddress = host + ":" + boundPort.getOrElse("-1") } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 9190b8ac2c..d3af75630e 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -33,49 +33,38 @@ private[spark] object UIUtils { return uiRoot + resource } - private[spark] val storageStr = addBaseUri("/storage") - private[spark] val stagesStr = addBaseUri("/stages") - private[spark] val envStr = addBaseUri("/environment") - private[spark] val executorsStr = addBaseUri("/executors") - private[spark] val bootstrapMinCssStr = addBaseUri("/static/bootstrap.min.css") - private[spark] val webuiCssStr = addBaseUri("/static/webui.css") - private[spark] val sortTableStr = addBaseUri("/static/sorttable.js") - private[spark] val sparkLogoHdStr = addBaseUri("/static/spark-logo-77x50px-hd.png") - private[spark] val sparkLogoStr = addBaseUri("/static/spark_logo.png") - - /** Returns a spark page with correctly formatted headers */ def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value) : Seq[Node] = { val jobs = page match { - case Stages => <li class="active"><a href={stagesStr}>Stages</a></li> - case _ => <li><a href="/stages">Stages</a></li> + case Stages => <li class="active"><a href={"%s/stages".format(UIUtils.addBaseUri())}>Stages</a></li> + case _ => <li><a href={"%s/stages".format(UIUtils.addBaseUri())}>Stages</a></li> } val storage = page match { - case Storage => <li class="active"><a href={storageStr}>Storage</a></li> - case _ => <li><a href={storageStr}>Storage</a></li> + case Storage => <li class="active"><a href={"%s/storage".format(UIUtils.addBaseUri())}>Storage</a></li> + case _ => <li><a href={"%s/storage".format(UIUtils.addBaseUri())}>Storage</a></li> } val environment = page match { - case Environment => <li class="active"><a href={envStr}>Environment</a></li> - case _ => <li><a href={envStr}>Environment</a></li> + case Environment => <li class="active"><a href={"%s/environment".format(UIUtils.addBaseUri())}>Environment</a></li> + case _ => <li><a href={"%s/environment".format(UIUtils.addBaseUri())}>Environment</a></li> } val executors = page match { - case Executors => <li class="active"><a href={executorsStr}>Executors</a></li> - case _ => <li><a href={executorsStr}>Executors</a></li> + case Executors => <li class="active"><a href={"%s/executors".format(UIUtils.addBaseUri())}>Executors</a></li> + case _ => <li><a href={"%s/executors".format(UIUtils.addBaseUri())}>Executors</a></li> } <html> <head> <meta http-equiv="Content-type" content="text/html; charset=utf-8" /> - <link rel="stylesheet" href={bootstrapMinCssStr} type="text/css" /> - <link rel="stylesheet" href={webuiCssStr} type="text/css" /> - <script src={sortTableStr}></script> + <link rel="stylesheet" href={"%s/static/bootstrap.min.css".format(UIUtils.addBaseUri())} type="text/css" /> + <link rel="stylesheet" href={"%s/static/webui.css".format(UIUtils.addBaseUri())} type="text/css" /> + <script src={"%s/static/sorttable.js".format(UIUtils.addBaseUri())} ></script> <title>{sc.appName} - {title}</title> </head> <body> <div class="navbar navbar-static-top"> <div class="navbar-inner"> - <a href="/" class="brand"><img src={sparkLogoHdStr} /></a> + <a href={"%s/".format(UIUtils.addBaseUri())} class="brand"><img src={"%s/static/spark-logo-77x50px-hd.png".format(UIUtils.addBaseUri())} /></a> <ul class="nav"> {jobs} {storage} @@ -105,9 +94,9 @@ private[spark] object UIUtils { <html> <head> <meta http-equiv="Content-type" content="text/html; charset=utf-8" /> - <link rel="stylesheet" href={bootstrapMinCssStr} type="text/css" /> - <link rel="stylesheet" href="/static/webui.css" type="text/css" /> - <script src={sortTableStr}></script> + <link rel="stylesheet" href={"%s/static/bootstrap.min.css".format(UIUtils.addBaseUri())} type="text/css" /> + <link rel="stylesheet" href={"%s/static/webui.css".format(UIUtils.addBaseUri())} type="text/css" /> + <script src={"%s/static/sorttable.js".format(UIUtils.addBaseUri())} ></script> <title>{title}</title> </head> <body> @@ -115,7 +104,7 @@ private[spark] object UIUtils { <div class="row-fluid"> <div class="span12"> <h3 style="vertical-align: middle; display: inline-block;"> - <img src="/static/spark-logo-77x50px-hd.png" style="margin-right: 15px;" /> + <img src={"%s/static/spark-logo-77x50px-hd.png".format(UIUtils.addBaseUri())} style="margin-right: 15px;" /> {title} </h3> </div> diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index bb47fc0a2c..468800b2bd 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -613,7 +613,7 @@ private[spark] object Utils extends Logging { * A regular expression to match classes of the "core" Spark API that we want to skip when * finding the call site of a method. */ - private val SPARK_CLASS_REGEX = """^spark(\.api\.java)?(\.rdd)?\.[A-Z]""".r + private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String, val firstUserLine: Int, val firstUserClass: String) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index fded896185..93421efcbc 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -42,7 +42,7 @@ This would be used to connect to the cluster, write to the dfs and submit jobs t The command to launch the YARN Client is as follows: - SPARK_JAR=<SPARK_YARN_JAR_FILE> ./spark-class spark.deploy.yarn.Client \ + SPARK_JAR=<SPARK_YARN_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \ --jar <YOUR_APP_JAR_FILE> \ --class <APP_MAIN_CLASS> \ --args <APP_MAIN_ARGUMENTS> \ @@ -54,9 +54,9 @@ The command to launch the YARN Client is as follows: For example: - SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./spark-class spark.deploy.yarn.Client \ + SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./spark-class org.apache.spark.deploy.yarn.Client \ --jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \ - --class spark.examples.SparkPi \ + --class org.apache.spark.examples.SparkPi \ --args yarn-standalone \ --num-workers 3 \ --master-memory 4g \ diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index bbeca245a8..858b58d338 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -29,7 +29,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import scala.collection.JavaConversions._ -import org.apache.spark.{SparkContext, Logging, Utils} +import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.util.Utils import org.apache.hadoop.security.UserGroupInformation import java.security.PrivilegedExceptionAction @@ -56,10 +57,14 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() - // TODO: Uncomment when hadoop is on a version which has this fixed. + // Workaround until hadoop moves to something which has + // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line) + // ignore result + // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times + // Hence args.workerCores = numCore disabled above. Any better option ? + // Compute number of threads for akka //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() - //if (minimumMemory > 0) { // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) @@ -70,12 +75,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // args.workerCores = numCore // } //} - - // Workaround until hadoop moves to something which has - // https://issues.apache.org/jira/browse/HADOOP-8406 - // ignore result - // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times - // Hence args.workerCores = numCore disabled above. Any better option ? // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf) ApplicationMaster.register(this) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index f47e23b63f..f76a5ddd39 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -80,7 +80,7 @@ class ApplicationMasterArguments(val args: Array[String]) { System.err.println("Unknown/unsupported param " + unknownParam) } System.err.println( - "Usage: spark.deploy.yarn.ApplicationMaster [options] \n" + + "Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] \n" + "Options:\n" + " --jar JAR_PATH Path to your application's JAR file (required)\n" + " --class CLASS_NAME Name of your application's main class (required)\n" + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 48e737ed79..844c707834 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -33,7 +33,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ -import org.apache.spark.{Logging, Utils} +import org.apache.spark.Logging +import org.apache.spark.util.Utils import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils} import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.spark.deploy.SparkHadoopUtil @@ -254,7 +255,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val commands = List[String](javaCommand + " -server " + JAVA_OPTS + - " spark.deploy.yarn.ApplicationMaster" + + " org.apache.spark.deploy.yarn.ApplicationMaster" + " --class " + args.userClass + " --jar " + args.userJar + userArgsToString(args) + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 6cbfadc23b..cd651904d2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -98,7 +98,7 @@ class ClientArguments(val args: Array[String]) { System.err.println("Unknown/unsupported param " + unknownParam) } System.err.println( - "Usage: spark.deploy.yarn.Client [options] \n" + + "Usage: org.apache.spark.deploy.yarn.Client [options] \n" + "Options:\n" + " --jar JAR_PATH Path to your application's JAR file (required)\n" + " --class CLASS_NAME Name of your application's main class (required)\n" + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 72dcf7178e..6229167cb4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -37,7 +37,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap -import org.apache.spark.{Logging, Utils} +import org.apache.spark.Logging +import org.apache.spark.util.Utils class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String, slaveId: String, hostname: String, workerMemory: Int, workerCores: Int) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 0a3b3abc74..6d6ef149cc 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -17,7 +17,8 @@ package org.apache.spark.deploy.yarn -import org.apache.spark.{Logging, Utils} +import org.apache.spark.Logging +import org.apache.spark.util.Utils import org.apache.spark.scheduler.SplitInfo import scala.collection import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container} diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index bbc96cfef7..29b3f22e13 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster import org.apache.spark._ import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler} +import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration /** -- GitLab