From 9f678e97549b19d6d979b22fa4079094ce9fb2c0 Mon Sep 17 00:00:00 2001
From: hyukjinkwon <gurwls223@gmail.com>
Date: Sat, 16 Apr 2016 14:56:23 +0100
Subject: [PATCH] [MINOR] Remove inappropriate type notation and extra
 anonymous closure within functional transformations

## What changes were proposed in this pull request?

This PR removes

- Inappropriate type notations
    For example, from
    ```scala
    words.foreachRDD { (rdd: RDD[String], time: Time) =>
    ...
    ```
    to
    ```scala
    words.foreachRDD { (rdd, time) =>
    ...
    ```

- Extra anonymous closure within functional transformations.
    For example,
    ```scala
    .map(item => {
      ...
    })
    ```

    which can be just simply as below:

    ```scala
    .map { item =>
      ...
    }
    ```

and corrects some obvious style nits.

## How was this patch tested?

This was tested after adding rules in `scalastyle-config.xml`, which ended up with not finding all perfectly.

The rules applied were below:

- For the first correction,

```xml
<check customId="NoExtraClosure" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
    <parameters><parameter name="regex">(?m)\.[a-zA-Z_][a-zA-Z0-9]*\(\s*[^,]+s*=>\s*\{[^\}]+\}\s*\)</parameter></parameters>
</check>
```

```xml
<check customId="NoExtraClosure" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
    <parameters><parameter name="regex">\.[a-zA-Z_][a-zA-Z0-9]*\s*[\{|\(]([^\n>,]+=>)?\s*\{([^()]|(?R))*\}^[,]</parameter></parameters>
</check>
```

- For the second correction
```xml
<check customId="TypeNotation" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
    <parameters><parameter name="regex">\.[a-zA-Z_][a-zA-Z0-9]*\s*[\{|\(]\s*\([^):]*:R))*\}^[,]</parameter></parameters>
</check>
```

**Those rules were not added**

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #12413 from HyukjinKwon/SPARK-style.
---
 .../scala/org/apache/spark/TaskEndReason.scala   |  4 +---
 .../org/apache/spark/api/java/JavaRDDLike.scala  | 10 +++++-----
 .../spark/deploy/master/ui/ApplicationPage.scala |  5 ++---
 .../spark/deploy/worker/DriverRunner.scala       |  7 +++++--
 .../org/apache/spark/rdd/CoalescedRDD.scala      |  8 ++++----
 .../scala/org/apache/spark/rdd/JdbcRDD.scala     |  4 ++--
 .../apache/spark/rdd/ParallelCollectionRDD.scala |  9 ++++-----
 .../examples/mllib/StreamingTestExample.scala    |  4 ++--
 .../streaming/RecoverableNetworkWordCount.scala  |  8 +++-----
 .../examples/streaming/SqlNetworkWordCount.scala |  2 +-
 .../mllib/api/python/Word2VecModelWrapper.scala  |  4 +++-
 .../evaluation/BinaryClassificationMetrics.scala |  3 +--
 .../spark/sql/catalyst/analysis/Analyzer.scala   |  6 +++---
 .../sql/catalyst/expressions/Projection.scala    |  4 +---
 .../spark/sql/catalyst/optimizer/Optimizer.scala | 10 ++++------
 .../sql/execution/joins/ShuffledHashJoin.scala   |  4 +---
 .../spark/sql/execution/stat/StatFunctions.scala |  2 +-
 .../org/apache/spark/sql/hive/hiveUDFs.scala     |  8 +++-----
 .../apache/spark/streaming/dstream/DStream.scala | 16 ++++++++--------
 19 files changed, 54 insertions(+), 64 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 83af226bfd..7487cfe9c5 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -149,9 +149,7 @@ case class ExceptionFailure(
     this(e, accumUpdates, preserveCause = true)
   }
 
-  def exception: Option[Throwable] = exceptionWrapper.flatMap {
-    (w: ThrowableSerializationWrapper) => Option(w.exception)
-  }
+  def exception: Option[Throwable] = exceptionWrapper.flatMap(w => Option(w.exception))
 
   override def toErrorString: String =
     if (fullStackTrace == null) {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 4212027122..6f3b8faf03 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -105,7 +105,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return a new RDD by applying a function to all elements of this RDD.
    */
   def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = {
-    new JavaDoubleRDD(rdd.map(x => f.call(x).doubleValue()))
+    new JavaDoubleRDD(rdd.map(f.call(_).doubleValue()))
   }
 
   /**
@@ -131,7 +131,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
     def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala
-    new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
+    new JavaDoubleRDD(rdd.flatMap(fn).map(_.doubleValue()))
   }
 
   /**
@@ -173,7 +173,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
     def fn: (Iterator[T]) => Iterator[jl.Double] = {
       (x: Iterator[T]) => f.call(x.asJava).asScala
     }
-    new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
+    new JavaDoubleRDD(rdd.mapPartitions(fn).map(_.doubleValue()))
   }
 
   /**
@@ -196,7 +196,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
       (x: Iterator[T]) => f.call(x.asJava).asScala
     }
     new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
-      .map(x => x.doubleValue()))
+      .map(_.doubleValue()))
   }
 
   /**
@@ -215,7 +215,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Applies a function f to each partition of this RDD.
    */
   def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
-    rdd.foreachPartition((x => f.call(x.asJava)))
+    rdd.foreachPartition(x => f.call(x.asJava))
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 1b18cf0ded..96274958d1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -35,9 +35,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
   def render(request: HttpServletRequest): Seq[Node] = {
     val appId = request.getParameter("appId")
     val state = master.askWithRetry[MasterStateResponse](RequestMasterState)
-    val app = state.activeApps.find(_.id == appId).getOrElse({
-      state.completedApps.find(_.id == appId).getOrElse(null)
-    })
+    val app = state.activeApps.find(_.id == appId)
+      .getOrElse(state.completedApps.find(_.id == appId).orNull)
     if (app == null) {
       val msg = <div class="row-fluid">No running application with ID {appId}</div>
       return UIUtils.basicSparkPage(msg, "Not Found")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index aad2e91b25..f4376dedea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -68,7 +68,10 @@ private[deploy] class DriverRunner(
 
   private var clock: Clock = new SystemClock()
   private var sleeper = new Sleeper {
-    def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed})
+    def sleep(seconds: Int): Unit = (0 until seconds).takeWhile { _ =>
+      Thread.sleep(1000)
+      !killed
+    }
   }
 
   /** Starts a thread to run and manage the driver. */
@@ -116,7 +119,7 @@ private[deploy] class DriverRunner(
   /** Terminate this driver (or prevent it from ever starting if not yet started) */
   private[worker] def kill() {
     synchronized {
-      process.foreach(p => p.destroy())
+      process.foreach(_.destroy())
       killed = true
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 90d9735cb3..35665ab7c0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -190,11 +190,11 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
 
     // initializes/resets to start iterating from the beginning
     def resetIterator(): Iterator[(String, Partition)] = {
-      val iterators = (0 to 2).map( x =>
-        prev.partitions.iterator.flatMap(p => {
+      val iterators = (0 to 2).map { x =>
+        prev.partitions.iterator.flatMap { p =>
           if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
-        } )
-      )
+        }
+      }
       iterators.reduceLeft((x, y) => x ++ y)
     }
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index 526138093d..5426bf80ba 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -65,11 +65,11 @@ class JdbcRDD[T: ClassTag](
   override def getPartitions: Array[Partition] = {
     // bounds are inclusive, hence the + 1 here and - 1 on end
     val length = BigInt(1) + upperBound - lowerBound
-    (0 until numPartitions).map(i => {
+    (0 until numPartitions).map { i =>
       val start = lowerBound + ((i * length) / numPartitions)
       val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
       new JdbcPartition(i, start.toLong, end.toLong)
-    }).toArray
+    }.toArray
   }
 
   override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T]
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index bb84e4af15..34a1c112cb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -129,7 +129,7 @@ private object ParallelCollectionRDD {
     }
     seq match {
       case r: Range =>
-        positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
+        positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
           // If the range is inclusive, use inclusive range for the last slice
           if (r.isInclusive && index == numSlices - 1) {
             new Range.Inclusive(r.start + start * r.step, r.end, r.step)
@@ -137,7 +137,7 @@ private object ParallelCollectionRDD {
           else {
             new Range(r.start + start * r.step, r.start + end * r.step, r.step)
           }
-        }).toSeq.asInstanceOf[Seq[Seq[T]]]
+        }.toSeq.asInstanceOf[Seq[Seq[T]]]
       case nr: NumericRange[_] =>
         // For ranges of Long, Double, BigInteger, etc
         val slices = new ArrayBuffer[Seq[T]](numSlices)
@@ -150,10 +150,9 @@ private object ParallelCollectionRDD {
         slices
       case _ =>
         val array = seq.toArray // To prevent O(n^2) operations for List etc
-        positions(array.length, numSlices).map({
-          case (start, end) =>
+        positions(array.length, numSlices).map { case (start, end) =>
             array.slice(start, end).toSeq
-        }).toSeq
+        }.toSeq
     }
   }
 }
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala
index 49f5df3944..ae4dee24c6 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala
@@ -59,10 +59,10 @@ object StreamingTestExample {
 
     val conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample")
     val ssc = new StreamingContext(conf, batchDuration)
-    ssc.checkpoint({
+    ssc.checkpoint {
       val dir = Utils.createTempDir()
       dir.toString
-    })
+    }
 
     // $example on$
     val data = ssc.textFileStream(dataDir).map(line => line.split(",") match {
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index bb2af9cd72..aa762b27dc 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -115,8 +115,8 @@ object RecoverableNetworkWordCount {
     // words in input stream of \n delimited text (eg. generated by 'nc')
     val lines = ssc.socketTextStream(ip, port)
     val words = lines.flatMap(_.split(" "))
-    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
-    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
+    val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
+    wordCounts.foreachRDD { (rdd, time) =>
       // Get or register the blacklist Broadcast
       val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
       // Get or register the droppedWordsCounter Accumulator
@@ -158,9 +158,7 @@ object RecoverableNetworkWordCount {
     }
     val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
     val ssc = StreamingContext.getOrCreate(checkpointDirectory,
-      () => {
-        createContext(ip, port, outputPath, checkpointDirectory)
-      })
+      () => createContext(ip, port, outputPath, checkpointDirectory))
     ssc.start()
     ssc.awaitTermination()
   }
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
index 918e124065..ad6a89e320 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
@@ -59,7 +59,7 @@ object SqlNetworkWordCount {
     val words = lines.flatMap(_.split(" "))
 
     // Convert RDDs of the words DStream to DataFrame and run SQL query
-    words.foreachRDD { (rdd: RDD[String], time: Time) =>
+    words.foreachRDD { (rdd, time) =>
       // Get the singleton instance of SQLContext
       val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
       import sqlContext.implicits._
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala
index 05273c3434..4b4ed2291d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala
@@ -56,7 +56,9 @@ private[python] class Word2VecModelWrapper(model: Word2VecModel) {
   }
 
   def getVectors: JMap[String, JList[Float]] = {
-    model.getVectors.map({case (k, v) => (k, v.toList.asJava)}).asJava
+    model.getVectors.map { case (k, v) =>
+      (k, v.toList.asJava)
+    }.asJava
   }
 
   def save(sc: SparkContext, path: String): Unit = model.save(sc, path)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
index 0a7a45b4f4..92cd7f22dc 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
@@ -189,8 +189,7 @@ class BinaryClassificationMetrics @Since("1.3.0") (
       Iterator(agg)
     }.collect()
     val partitionwiseCumulativeCounts =
-      agg.scanLeft(new BinaryLabelCounter())(
-        (agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c)
+      agg.scanLeft(new BinaryLabelCounter())((agg, c) => agg.clone() += c)
     val totalCount = partitionwiseCumulativeCounts.last
     logInfo(s"Total counts: $totalCount")
     val cumulativeCounts = binnedCounts.mapPartitionsWithIndex(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 37ff6ab6f6..6591559426 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -172,8 +172,8 @@ class Analyzer(
     private def assignAliases(exprs: Seq[NamedExpression]) = {
       exprs.zipWithIndex.map {
         case (expr, i) =>
-          expr transformUp {
-            case u @ UnresolvedAlias(child, optionalAliasName) => child match {
+          expr.transformUp { case u @ UnresolvedAlias(child, optionalAliasName) =>
+            child match {
               case ne: NamedExpression => ne
               case e if !e.resolved => u
               case g: Generator => MultiAlias(g, Nil)
@@ -215,7 +215,7 @@ class Analyzer(
      *  represented as the bit masks.
      */
     def bitmasks(r: Rollup): Seq[Int] = {
-      Seq.tabulate(r.groupByExprs.length + 1)(idx => {(1 << idx) - 1})
+      Seq.tabulate(r.groupByExprs.length + 1)(idx => (1 << idx) - 1)
     }
 
     /*
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index 354311c5e7..27ad8e4cf2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -168,9 +168,7 @@ object FromUnsafeProjection {
    * Returns an UnsafeProjection for given Array of DataTypes.
    */
   def apply(fields: Seq[DataType]): Projection = {
-    create(fields.zipWithIndex.map(x => {
-      new BoundReference(x._2, x._1, true)
-    }))
+    create(fields.zipWithIndex.map(x => new BoundReference(x._2, x._1, true)))
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6c8f8f40dd..c46bdfb2b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -314,11 +314,9 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
       assert(children.nonEmpty)
       val (deterministic, nondeterministic) = partitionByDeterministic(condition)
       val newFirstChild = Filter(deterministic, children.head)
-      val newOtherChildren = children.tail.map {
-        child => {
-          val rewrites = buildRewrites(children.head, child)
-          Filter(pushToRight(deterministic, rewrites), child)
-        }
+      val newOtherChildren = children.tail.map { child =>
+        val rewrites = buildRewrites(children.head, child)
+        Filter(pushToRight(deterministic, rewrites), child)
       }
       Filter(nondeterministic, Union(newFirstChild +: newOtherChildren))
 
@@ -360,7 +358,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
     case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- a.references).nonEmpty =>
       val newOutput = e.output.filter(a.references.contains(_))
       val newProjects = e.projections.map { proj =>
-        proj.zip(e.output).filter { case (e, a) =>
+        proj.zip(e.output).filter { case (_, a) =>
           newOutput.contains(a)
         }.unzip._1
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index 0c3e3c3fc1..f021f3758c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -60,9 +60,7 @@ case class ShuffledHashJoin(
     val context = TaskContext.get()
     val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
     // This relation is usually used until the end of task.
-    context.addTaskCompletionListener((t: TaskContext) =>
-      relation.close()
-    )
+    context.addTaskCompletionListener(_ => relation.close())
     relation
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index d603f63a08..9afbd0e994 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -431,7 +431,7 @@ private[sql] object StatFunctions extends Logging {
       s"exceed 1e4. Currently $columnSize")
     val table = counts.groupBy(_.get(0)).map { case (col1Item, rows) =>
       val countsRow = new GenericMutableRow(columnSize + 1)
-      rows.foreach { (row: Row) =>
+      rows.foreach { row =>
         // row.get(0) is column 1
         // row.get(1) is column 2
         // row.get(2) is the frequency
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 784b018353..5aab4132bc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -82,7 +82,7 @@ private[hive] case class HiveSimpleUDF(
 
   // TODO: Finish input output types.
   override def eval(input: InternalRow): Any = {
-    val inputs = wrap(children.map(c => c.eval(input)), arguments, cached, inputDataTypes)
+    val inputs = wrap(children.map(_.eval(input)), arguments, cached, inputDataTypes)
     val ret = FunctionRegistry.invoke(
       method,
       function,
@@ -152,10 +152,8 @@ private[hive] case class HiveGenericUDF(
     var i = 0
     while (i < children.length) {
       val idx = i
-      deferredObjects(i).asInstanceOf[DeferredObjectAdapter].set(
-        () => {
-          children(idx).eval(input)
-        })
+      deferredObjects(i).asInstanceOf[DeferredObjectAdapter]
+        .set(() => children(idx).eval(input))
       i += 1
     }
     unwrap(function.evaluate(deferredObjects), returnInspector)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 58842f9c2f..583f5a48d1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -593,7 +593,7 @@ abstract class DStream[T: ClassTag] (
    * of this DStream.
    */
   def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
-    this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+    this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2)
   }
 
   /**
@@ -615,7 +615,7 @@ abstract class DStream[T: ClassTag] (
    */
   def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
       : DStream[(T, Long)] = ssc.withScope {
-    this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
+    this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
   }
 
   /**
@@ -624,7 +624,7 @@ abstract class DStream[T: ClassTag] (
    */
   def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
     val cleanedF = context.sparkContext.clean(foreachFunc, false)
-    foreachRDD((r: RDD[T], t: Time) => cleanedF(r), displayInnerRDDOps = true)
+    foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
   }
 
   /**
@@ -663,7 +663,7 @@ abstract class DStream[T: ClassTag] (
     // DStreams can't be serialized with closures, we can't proactively check
     // it for serializability and so we pass the optional false to SparkContext.clean
     val cleanedF = context.sparkContext.clean(transformFunc, false)
-    transform((r: RDD[T], t: Time) => cleanedF(r))
+    transform((r: RDD[T], _: Time) => cleanedF(r))
   }
 
   /**
@@ -806,7 +806,7 @@ abstract class DStream[T: ClassTag] (
       windowDuration: Duration,
       slideDuration: Duration
     ): DStream[T] = ssc.withScope {
-      this.map(x => (1, x))
+      this.map((1, _))
           .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
           .map(_._2)
   }
@@ -845,7 +845,7 @@ abstract class DStream[T: ClassTag] (
       numPartitions: Int = ssc.sc.defaultParallelism)
       (implicit ord: Ordering[T] = null)
       : DStream[(T, Long)] = ssc.withScope {
-    this.map(x => (x, 1L)).reduceByKeyAndWindow(
+    this.map((_, 1L)).reduceByKeyAndWindow(
       (x: Long, y: Long) => x + y,
       (x: Long, y: Long) => x - y,
       windowDuration,
@@ -895,9 +895,9 @@ abstract class DStream[T: ClassTag] (
     logInfo(s"Slicing from $fromTime to $toTime" +
       s" (aligned to $alignedFromTime and $alignedToTime)")
 
-    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
+    alignedFromTime.to(alignedToTime, slideDuration).flatMap { time =>
       if (time >= zeroTime) getOrCompute(time) else None
-    })
+    }
   }
 
   /**
-- 
GitLab