From 77f836799639ea939a1773cef2f4828b381f5ca2 Mon Sep 17 00:00:00 2001
From: Sean Owen <sowen@cloudera.com>
Date: Wed, 16 Apr 2014 09:34:59 -0700
Subject: [PATCH] SPARK-1497. Fix scalastyle warnings in YARN, Hive code

(I wasn't sure how to automatically set `SPARK_YARN=true` and `SPARK_HIVE=true` when running scalastyle, but these are the errors that turn up.)

Author: Sean Owen <sowen@cloudera.com>

Closes #413 from srowen/SPARK-1497 and squashes the following commits:

f0c9318 [Sean Owen] Fix more scalastyle warnings in yarn
80bf4c3 [Sean Owen] Add YARN alpha / YARN profile to scalastyle check
026319c [Sean Owen] Fix scalastyle warnings in YARN, Hive code
---
 dev/scalastyle                                |  4 ++++
 .../spark/deploy/yarn/ExecutorLauncher.scala  | 21 ++++++++++++-------
 .../deploy/yarn/YarnAllocationHandler.scala   | 11 +++++-----
 .../spark/deploy/yarn/ApplicationMaster.scala |  3 ++-
 .../spark/deploy/yarn/ExecutorLauncher.scala  |  8 ++++---
 .../deploy/yarn/YarnAllocationHandler.scala   |  7 ++++---
 6 files changed, 34 insertions(+), 20 deletions(-)

diff --git a/dev/scalastyle b/dev/scalastyle
index 19955b9aaa..7b572f6a89 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -18,6 +18,10 @@
 #
 
 echo -e "q\n" | sbt/sbt clean scalastyle > scalastyle.txt
+# Check style with YARN alpha built too
+SPARK_YARN=true sbt/sbt yarn/scalastyle >> scalastyle.txt
+# Check style with YARN built too
+SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt yarn/scalastyle >> scalastyle.txt
 ERRORS=$(cat scalastyle.txt | grep -e "\<error\>")
 if test ! -z "$ERRORS"; then
     echo -e "Scalastyle checks failed at following occurrences:\n$ERRORS"
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 7b0e020263..21f14576ef 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -37,7 +37,8 @@ import org.apache.spark.scheduler.SplitInfo
 class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
   extends Logging {
 
-  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
@@ -63,7 +64,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     override def preStart() {
       logInfo("Listen to driver: " + driverUrl)
       driver = context.actorSelection(driverUrl)
-      // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
+      // Send a hello message thus the connection is actually established, thus we can
+      // monitor Lifecycle Events.
       driver ! "Hello"
       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
     }
@@ -104,8 +106,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     // Allocate all containers
     allocateExecutors()
 
-    // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
-    // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+    // Launch a progress reporter thread, else app will get killed after expiration
+    // (def: 10mins) timeout ensure that progress is sent before
+    // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
 
     val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
     // we want to be reasonably responsive without causing too many requests to RM.
@@ -163,8 +166,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
       .asInstanceOf[RegisterApplicationMasterRequest]
     appMasterRequest.setApplicationAttemptId(appAttemptId)
-    // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
-    // Users can then monitor stderr/stdout on that node if required.
+    // Setting this to master host,port - so that the ApplicationReport at client has
+    // some sensible info. Users can then monitor stderr/stdout on that node if required.
     appMasterRequest.setHost(Utils.localHostName())
     appMasterRequest.setRpcPort(0)
     // What do we provide here ? Might make sense to expose something sensible later ?
@@ -213,7 +216,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     // TODO: This is a bit ugly. Can we make it nicer?
     // TODO: Handle container failure
     while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
-      yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
+      yarnAllocator.allocateContainers(
+        math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
       Thread.sleep(100)
     }
 
@@ -230,7 +234,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
         while (!driverClosed) {
           val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
           if (missingExecutorCount > 0) {
-            logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers")
+            logInfo("Allocating " + missingExecutorCount +
+              " containers to make up for (potentially ?) lost containers")
             yarnAllocator.allocateContainers(missingExecutorCount)
           }
           else sendProgress()
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 2056667af5..d6d46a5f6c 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -225,8 +225,8 @@ private[yarn] class YarnAllocationHandler(
         val executorHostname = container.getNodeId.getHost
         val containerId = container.getId
 
-        assert(
-          container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+        assert( container.getResource.getMemory >=
+          (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
 
         if (numExecutorsRunningNow > maxExecutors) {
           logInfo("""Ignoring container %s at host %s, since we already have the required number of
@@ -393,9 +393,10 @@ private[yarn] class YarnAllocationHandler(
 
       // default.
     if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
-      logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty)
-      resourceRequests = List(
-        createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
+      logDebug("numExecutors: " + numExecutors + ", host preferences: " +
+        preferredHostToCount.isEmpty)
+      resourceRequests = List(createResourceRequest(
+        AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
     }
     else {
       // request for all hosts in preferred nodes and for numExecutors - 
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 61af0f9ac5..581cfe43b6 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -137,7 +137,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
 
     val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
-    System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
+    System.setProperty(
+      "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
   }
 
   /** Get the Yarn approved local directories. */
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index b697f10391..67ed591c78 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -65,7 +65,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     override def preStart() {
       logInfo("Listen to driver: " + driverUrl)
       driver = context.actorSelection(driverUrl)
-      // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
+      // Send a hello message thus the connection is actually established,
+      // thus we can monitor Lifecycle Events.
       driver ! "Hello"
       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
     }
@@ -95,8 +96,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     // Allocate all containers
     allocateExecutors()
 
-    // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
-    // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+    // Launch a progress reporter thread, else app will get killed after expiration
+    // (def: 10mins) timeout ensure that progress is sent before
+    // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
 
     val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
     // we want to be reasonably responsive without causing too many requests to RM.
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index e31c4060e8..4fafae1aff 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -276,7 +276,8 @@ private[yarn] class YarnAllocationHandler(
               allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
             }
           }
-          logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(driverUrl, executorHostname))
+          logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(
+            driverUrl, executorHostname))
           val executorRunnable = new ExecutorRunnable(
             container,
             conf,
@@ -314,8 +315,8 @@ private[yarn] class YarnAllocationHandler(
           // `pendingReleaseContainers`.
           pendingReleaseContainers.remove(containerId)
         } else {
-          // Decrement the number of executors running. The next iteration of the ApplicationMaster's
-          // reporting thread will take care of allocating.
+          // Decrement the number of executors running. The next iteration of
+          // the ApplicationMaster's reporting thread will take care of allocating.
           numExecutorsRunning.decrementAndGet()
           logInfo("Completed container %s (state: %s, exit status: %s)".format(
             containerId,
-- 
GitLab