diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 23245043e24656665e2fbb25932d9ab4c9d40457..9d1f1d59dbce1ca93a0ec089858b1aeafa76bca9 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -24,16 +24,17 @@ package org.apache.spark
  * They can be used to implement counters (as in MapReduce) or sums. Spark natively supports
  * accumulators of numeric value types, and programmers can add support for new types.
  *
- * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
- * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
+ * An accumulator is created from an initial value `v` by calling
+ * [[SparkContext#accumulator SparkContext.accumulator]].
+ * Tasks running on the cluster can then add to it using the [[Accumulable#+= +=]] operator.
  * However, they cannot read its value. Only the driver program can read the accumulator's value,
- * using its value method.
+ * using its [[#value]] method.
  *
  * The interpreter session below shows an accumulator being used to add up the elements of an array:
  *
  * {{{
  * scala> val accum = sc.accumulator(0)
- * accum: spark.Accumulator[Int] = 0
+ * accum: org.apache.spark.Accumulator[Int] = 0
  *
  * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
  * ...
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index a79e71ec7c9bf09601fe91f1b142ca16340dc1d6..5987cfea2e9b284d389e30012a0efae91e1b971c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -26,16 +26,14 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 
 /**
- * An Schedulable entity that represent collection of Pools or TaskSetManagers
+ * An Schedulable entity that represents collection of Pools or TaskSetManagers
  */
-
 private[spark] class Pool(
     val poolName: String,
     val schedulingMode: SchedulingMode,
     initMinShare: Int,
     initWeight: Int)
-  extends Schedulable
-  with Logging {
+  extends Schedulable with Logging {
 
   val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
   val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
@@ -56,7 +54,8 @@ private[spark] class Pool(
       case SchedulingMode.FIFO =>
         new FIFOSchedulingAlgorithm()
       case _ =>
-        throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
+        val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
+        throw new IllegalArgumentException(msg)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
index 864941d468af91f0415274d69da72b36791de9d0..18ebbbe78a5b4738401c6f159bd78916516c8229 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
@@ -36,11 +36,7 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
       val stageId2 = s2.stageId
       res = math.signum(stageId1 - stageId2)
     }
-    if (res < 0) {
-      true
-    } else {
-      false
-    }
+    res < 0
   }
 }
 
@@ -52,12 +48,12 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
     val runningTasks2 = s2.runningTasks
     val s1Needy = runningTasks1 < minShare1
     val s2Needy = runningTasks2 < minShare2
-    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
-    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
+    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
+    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
     val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
     val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
-    var compare: Int = 0
 
+    var compare = 0
     if (s1Needy && !s2Needy) {
       return true
     } else if (!s1Needy && s2Needy) {
@@ -67,7 +63,6 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
     } else {
       compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
     }
-
     if (compare < 0) {
       true
     } else if (compare > 0) {
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index bd26bfd848ff1837183b2711d36cf8cf1720cdf9..93ac67e5db0d7ab77a53935d583e6a50ce241330 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -170,9 +170,7 @@ private [util] class SparkShutdownHookManager {
   @volatile private var shuttingDown = false
 
   /**
-   * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
-   * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
-   * the best.
+   * Install a hook to run at shutdown and run all registered hooks in order.
    */
   def install(): Unit = {
     val hookTask = new Runnable() {
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
index 898ac2cc8941bb50da2149a374504c5409052b86..35bc46a5f3435e686d14d1c7c1d13adc31b4c2a6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
@@ -104,9 +104,9 @@ final class Binarizer(override val uid: String)
       case DoubleType =>
         BinaryAttribute.defaultAttr.withName(outputColName).toStructField()
       case _: VectorUDT =>
-        new StructField(outputColName, new VectorUDT, true)
-      case other =>
-        throw new IllegalArgumentException(s"Data type $other is not supported.")
+        StructField(outputColName, new VectorUDT)
+      case _ =>
+        throw new IllegalArgumentException(s"Data type $inputType is not supported.")
     }
 
     if (schema.fieldNames.contains(outputColName)) {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index 774170ff401e96d02702910c9a25fa6f76ea8102..86ce9705a3125487e9b601ac85617bbe2fe441c9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -104,7 +104,7 @@ object MLUtils {
     val (indices, values) = items.tail.filter(_.nonEmpty).map { item =>
       val indexAndValue = item.split(':')
       val index = indexAndValue(0).toInt - 1 // Convert 1-based indices to 0-based.
-    val value = indexAndValue(1).toDouble
+      val value = indexAndValue(1).toDouble
       (index, value)
     }.unzip
 
@@ -119,8 +119,7 @@ object MLUtils {
       previous = current
       i += 1
     }
-
-    (label, indices.toArray, values.toArray)
+    (label, indices, values)
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 516b41cb138b457c836df2a4828e10379d8e30c8..8b1a34f79c42a423a7da1fe6b4f96311138656b8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.trees.TreeNode
 
 /**
- * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can
+ * Given a [[LogicalPlan]], returns a list of `PhysicalPlan`s that can
  * be used for execution. If this strategy does not apply to the give logical operation then an
  * empty list should be returned.
  */
@@ -31,9 +31,10 @@ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends L
 }
 
 /**
- * Abstract class for transforming [[plans.logical.LogicalPlan LogicalPlan]]s into physical plans.
- * Child classes are responsible for specifying a list of [[Strategy]] objects that each of which
- * can return a list of possible physical plan options.  If a given strategy is unable to plan all
+ * Abstract class for transforming [[LogicalPlan]]s into physical plans.
+ * Child classes are responsible for specifying a list of [[GenericStrategy]] objects that
+ * each of which can return a list of possible physical plan options.
+ * If a given strategy is unable to plan all
  * of the remaining operators in the tree, it can call [[planLater]], which returns a placeholder
  * object that will be filled in using other available strategies.
  *
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
index c201822d4479a445823583c3e1f2a452ddde76b1..1be41ffc072c141d30dde0584a2f586d988a5dc4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartit
 import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
- * Apply the all of the GroupExpressions to every input row, hence we will get
+ * Apply all of the GroupExpressions to every input row, hence we will get
  * multiple output rows for a input row.
  * @param projections The group of expressions, all of the group expressions should
  *                    output the same schema specified bye the parameter `output`
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 0bbe970420707ca1cbaab9d16c05842561b40b21..b94b84d77a50272ac1f8aebe7d08fff16beaf213 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -106,16 +106,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
 
   /**
-   * Returns the result of this query as an RDD[InternalRow] by delegating to doExecute after
-   * preparations. Concrete implementations of SparkPlan should override doExecute.
+   * Returns the result of this query as an RDD[InternalRow] by delegating to `doExecute` after
+   * preparations.
+   *
+   * Concrete implementations of SparkPlan should override `doExecute`.
    */
   final def execute(): RDD[InternalRow] = executeQuery {
     doExecute()
   }
 
   /**
-   * Returns the result of this query as a broadcast variable by delegating to doBroadcast after
-   * preparations. Concrete implementations of SparkPlan should override doBroadcast.
+   * Returns the result of this query as a broadcast variable by delegating to `doExecuteBroadcast`
+   * after preparations.
+   *
+   * Concrete implementations of SparkPlan should override `doExecuteBroadcast`.
    */
   final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery {
     doExecuteBroadcast()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 15b4abe806678ea95cc03cb90d7300531914ce17..d6f7b6ed35dbda3cce25027a48c99dc44dd5d2de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -105,7 +105,7 @@ trait CodegenSupport extends SparkPlan {
   protected def doProduce(ctx: CodegenContext): String
 
   /**
-   * Consume the generated columns or row from current SparkPlan, call it's parent's doConsume().
+   * Consume the generated columns or row from current SparkPlan, call its parent's `doConsume()`.
    */
   final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = {
     val inputVars =
@@ -212,8 +212,8 @@ trait CodegenSupport extends SparkPlan {
 /**
  * InputAdapter is used to hide a SparkPlan from a subtree that support codegen.
  *
- * This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes
- * an RDD iterator of InternalRow.
+ * This is the leaf node of a tree with WholeStageCodegen that is used to generate code
+ * that consumes an RDD iterator of InternalRow.
  */
 case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 56a39069511d737b910ff03e65713338a9175a7d..bafbbdf65724db781a6b9702b0f15998cb727a03 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -158,7 +158,7 @@ case class MapPartitionsExec(
  * Applies the given function to each input object.
  * The output of its child must be a single-field row containing the input object.
  *
- * This operator is kind of a safe version of [[ProjectExec]], as it's output is custom object,
+ * This operator is kind of a safe version of [[ProjectExec]], as its output is custom object,
  * we need to use safe row to contain it.
  */
 case class MapElementsExec(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 8e66538575b0c677abbaccaec0ad1dceb77f6833..7b4c035bf3c1e545f0c99007f9a2d228621f7462 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -95,7 +95,7 @@ class FileStreamSource(
     val endId = end.asInstanceOf[LongOffset].offset
 
     assert(startId <= endId)
-    val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten
+    val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
     logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
     logDebug(s"Streaming ${files.mkString(", ")}")
     dataFrameBuilder(files)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index b89144d727514b601318b848dcc3991a4378c93c..e9052a309595a824531570a6f273742bc1e3178c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -47,7 +47,7 @@ class IncrementalExecution(
 
   /**
    * Records the current id for a given stateful operator in the query plan as the `state`
-   * preperation walks the query plan.
+   * preparation walks the query plan.
    */
   private var operatorId = 0
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 7d8b8679c59440d2c19cd212894f824afd375691..6ececb1062ae124d4c26767a4a8f8971e106580f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -84,7 +84,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
     assert(framework != null, "Checkpoint.framework is null")
     assert(graph != null, "Checkpoint.graph is null")
     assert(checkpointTime != null, "Checkpoint.checkpointTime is null")
-    logInfo("Checkpoint for time " + checkpointTime + " validated")
+    logInfo(s"Checkpoint for time $checkpointTime validated")
   }
 }
 
@@ -103,7 +103,10 @@ object Checkpoint extends Logging {
     new Path(checkpointDir, PREFIX + checkpointTime.milliseconds + ".bk")
   }
 
-  /** Get checkpoint files present in the give directory, ordered by oldest-first */
+  /**
+   * @param checkpointDir checkpoint directory to read checkpoint files from
+   * @return checkpoint files from the `checkpointDir` checkpoint directory, ordered by oldest-first
+   */
   def getCheckpointFiles(checkpointDir: String, fsOption: Option[FileSystem] = None): Seq[Path] = {
 
     def sortFunc(path1: Path, path2: Path): Boolean = {
@@ -121,11 +124,11 @@ object Checkpoint extends Logging {
         val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
         filtered.sortWith(sortFunc)
       } else {
-        logWarning("Listing " + path + " returned null")
+        logWarning(s"Listing $path returned null")
         Seq.empty
       }
     } else {
-      logInfo("Checkpoint directory " + path + " does not exist")
+      logWarning(s"Checkpoint directory $path does not exist")
       Seq.empty
     }
   }
@@ -205,7 +208,7 @@ class CheckpointWriter(
       // time of a batch is greater than the batch interval, checkpointing for completing an old
       // batch may run after checkpointing of a new batch. If this happens, checkpoint of an old
       // batch actually has the latest information, so we want to recovery from it. Therefore, we
-      // also use the latest checkpoint time as the file name, so that we can recovery from the
+      // also use the latest checkpoint time as the file name, so that we can recover from the
       // latest checkpoint file.
       //
       // Note: there is only one thread writing the checkpoint files, so we don't need to worry
@@ -216,8 +219,7 @@ class CheckpointWriter(
       while (attempts < MAX_ATTEMPTS && !stopped) {
         attempts += 1
         try {
-          logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile
-            + "'")
+          logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
 
           // Write checkpoint to temp file
           if (fs.exists(tempFile)) {
@@ -237,39 +239,38 @@ class CheckpointWriter(
               fs.delete(backupFile, true) // just in case it exists
             }
             if (!fs.rename(checkpointFile, backupFile)) {
-              logWarning("Could not rename " + checkpointFile + " to " + backupFile)
+              logWarning(s"Could not rename $checkpointFile to $backupFile")
             }
           }
 
           // Rename temp file to the final checkpoint file
           if (!fs.rename(tempFile, checkpointFile)) {
-            logWarning("Could not rename " + tempFile + " to " + checkpointFile)
+            logWarning(s"Could not rename $tempFile to $checkpointFile")
           }
 
           // Delete old checkpoint files
           val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
           if (allCheckpointFiles.size > 10) {
             allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach { file =>
-              logInfo("Deleting " + file)
+              logInfo(s"Deleting $file")
               fs.delete(file, true)
             }
           }
 
           // All done, print success
           val finishTime = System.currentTimeMillis()
-          logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
-            "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
+          logInfo(s"Checkpoint for time $checkpointTime saved to file '$checkpointFile'" +
+            s", took ${bytes.length} bytes and ${finishTime - startTime} ms")
           jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)
           return
         } catch {
           case ioe: IOException =>
-            logWarning("Error in attempt " + attempts + " of writing checkpoint to "
-              + checkpointFile, ioe)
+            val msg = s"Error in attempt $attempts of writing checkpoint to '$checkpointFile'"
+            logWarning(msg, ioe)
             fs = null
         }
       }
-      logWarning("Could not write checkpoint for time " + checkpointTime + " to file "
-        + checkpointFile + "'")
+      logWarning(s"Could not write checkpoint for time $checkpointTime to file '$checkpointFile'")
     }
   }
 
@@ -278,7 +279,7 @@ class CheckpointWriter(
       val bytes = Checkpoint.serialize(checkpoint, conf)
       executor.execute(new CheckpointWriteHandler(
         checkpoint.checkpointTime, bytes, clearCheckpointDataLater))
-      logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
+      logInfo(s"Submitted checkpoint of time ${checkpoint.checkpointTime} to writer queue")
     } catch {
       case rej: RejectedExecutionException =>
         logError("Could not submit checkpoint task to the thread pool executor", rej)
@@ -295,8 +296,8 @@ class CheckpointWriter(
       executor.shutdownNow()
     }
     val endTime = System.currentTimeMillis()
-    logInfo("CheckpointWriter executor terminated ? " + terminated +
-      ", waited for " + (endTime - startTime) + " ms.")
+    logInfo(s"CheckpointWriter executor terminated? $terminated," +
+      s" waited for ${endTime - startTime} ms.")
     stopped = true
   }
 }
@@ -336,20 +337,20 @@ object CheckpointReader extends Logging {
     }
 
     // Try to read the checkpoint files in the order
-    logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
+    logInfo(s"Checkpoint files found: ${checkpointFiles.mkString(",")}")
     var readError: Exception = null
     checkpointFiles.foreach { file =>
-      logInfo("Attempting to load checkpoint from file " + file)
+      logInfo(s"Attempting to load checkpoint from file $file")
       try {
         val fis = fs.open(file)
         val cp = Checkpoint.deserialize(fis, conf)
-        logInfo("Checkpoint successfully loaded from file " + file)
-        logInfo("Checkpoint was generated at time " + cp.checkpointTime)
+        logInfo(s"Checkpoint successfully loaded from file $file")
+        logInfo(s"Checkpoint was generated at time ${cp.checkpointTime}")
         return Some(cp)
       } catch {
         case e: Exception =>
           readError = e
-          logWarning("Error reading checkpoint from file " + file, e)
+          logWarning(s"Error reading checkpoint from file $file", e)
       }
     }
 
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b494ef0dd96658e9234585edf5b8c952a96f4fc4..7ea58afb53dc9125de6f000830c68fc371505875 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -158,7 +158,7 @@ private[spark] class Client(
       val newAppResponse = newApp.getNewApplicationResponse()
       appId = newAppResponse.getApplicationId()
       reportLauncherState(SparkAppHandle.State.SUBMITTED)
-      launcherBackend.setAppId(appId.toString())
+      launcherBackend.setAppId(appId.toString)
 
       // Verify whether the cluster has enough resources for our AM
       verifyClusterResources(newAppResponse)
@@ -168,7 +168,7 @@ private[spark] class Client(
       val appContext = createApplicationSubmissionContext(newApp, containerContext)
 
       // Finally, submit and monitor the application
-      logInfo(s"Submitting application ${appId.getId} to ResourceManager")
+      logInfo(s"Submitting application $appId to ResourceManager")
       yarnClient.submitApplication(appContext)
       appId
     } catch {