diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index f86fd20e591909670c8c988e3f759774aa989911..477b01968c6efc5a9ce7dc91db3ab961509eca5b 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -94,21 +94,23 @@ private[spark] case class SSLOptions( * are supported by the current Java security provider for this protocol. */ private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) { - Set() + Set.empty } else { var context: SSLContext = null - try { - context = SSLContext.getInstance(protocol.orNull) - /* The set of supported algorithms does not depend upon the keys, trust, or + if (protocol.isEmpty) { + logDebug("No SSL protocol specified") + context = SSLContext.getDefault + } else { + try { + context = SSLContext.getInstance(protocol.get) + /* The set of supported algorithms does not depend upon the keys, trust, or rng, although they will influence which algorithms are eventually used. */ - context.init(null, null, null) - } catch { - case npe: NullPointerException => - logDebug("No SSL protocol specified") - context = SSLContext.getDefault - case nsa: NoSuchAlgorithmException => - logDebug(s"No support for requested SSL protocol ${protocol.get}") - context = SSLContext.getDefault + context.init(null, null, null) + } catch { + case nsa: NoSuchAlgorithmException => + logDebug(s"No support for requested SSL protocol ${protocol.get}") + context = SSLContext.getDefault + } } val providerAlgorithms = context.getServerSocketFactory.getSupportedCipherSuites.toSet diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 3196c1ece15eba7ca6c1cb271095f98371515bd8..45ed9860ea8f993682cde0f797f9b68c635abfa8 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -420,7 +420,7 @@ object SparkEnv extends Logging { if (!conf.contains("spark.scheduler.mode")) { Seq(("spark.scheduler.mode", schedulingMode)) } else { - Seq[(String, String)]() + Seq.empty[(String, String)] } val sparkProperties = (conf.getAll ++ schedulerMode).sorted diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 3f912dc191515d5dff633d879a78d39279094704..a80016dd22fc504d594757501af0b894c14abebf 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -58,8 +58,8 @@ private[spark] object TestUtils { def createJarWithClasses( classNames: Seq[String], toStringValue: String = "", - classNamesWithBase: Seq[(String, String)] = Seq(), - classpathUrls: Seq[URL] = Seq()): URL = { + classNamesWithBase: Seq[(String, String)] = Seq.empty, + classpathUrls: Seq[URL] = Seq.empty): URL = { val tempDir = Utils.createTempDir() val files1 = for (name <- classNames) yield { createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls) @@ -137,7 +137,7 @@ private[spark] object TestUtils { val options = if (classpathUrls.nonEmpty) { Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator)) } else { - Seq() + Seq.empty } compiler.getTask(null, null, null, options.asJava, null, Arrays.asList(sourceFile)).call() @@ -160,7 +160,7 @@ private[spark] object TestUtils { destDir: File, toStringValue: String = "", baseClass: String = null, - classpathUrls: Seq[URL] = Seq()): File = { + classpathUrls: Seq[URL] = Seq.empty): File = { val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("") val sourceFile = new JavaSourceFromString(className, "public class " + className + extendsText + " implements java.io.Serializable {" + diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index fb0405b1a69c6b4092a0345ca53732c6496a6587..6a817524000ee481ab2f5befa8d491c752ca1431 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -974,6 +974,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial } } } + super.finalize() } } // scalastyle:on no.finalize diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index dad928cdcfd0fd2ba6811976a77e2b37bf5009ee..537ab57f9664d7adad66ba4853399bceeda15a87 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -128,8 +128,7 @@ private[spark] object SerDe { } def readBoolean(in: DataInputStream): Boolean = { - val intVal = in.readInt() - if (intVal == 0) false else true + in.readInt() != 0 } def readDate(in: DataInputStream): Date = { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6afe58bff522926f821823c37ff112d44bb1b670..ccbabf09a832335e0930f5bf4270424a606ee516 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -337,7 +337,7 @@ class SparkHadoopUtil extends Logging { if (credentials != null) { credentials.getAllTokens.asScala.map(tokenToString) } else { - Seq() + Seq.empty } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index b2a50bd0557120ff5f2d8fb0cb232d10b62d6421..687fd2d3ffe64e7c365e4dd45549de875fd8a03a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -317,7 +317,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val newLastScanTime = getNewLastScanTime() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) - .getOrElse(Seq[FileStatus]()) + .getOrElse(Seq.empty[FileStatus]) // scan for modified applications, replay and merge them val logInfos: Seq[FileStatus] = statusList .filter { entry => diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 8cfd0f682932d4b3aafeae761b9c1775d319e8f9..e42f41b97202a8d45149d28e16a73392d6ff3c74 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -55,7 +55,7 @@ class MasterWebUI( } def addProxyTargets(id: String, target: String): Unit = { - var endTarget = target.stripSuffix("/") + val endTarget = target.stripSuffix("/") val handler = createProxyHandler("/proxy/" + id, endTarget) attachHandler(handler) proxyHandlers(id) = handler diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index cba4aaffe2caaef0c229592982f99f9e9fbc12b4..12e0dae3f5e5a9b9a6ec2137e45d9994478b03b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -44,7 +44,7 @@ object CommandUtils extends Logging { memory: Int, sparkHome: String, substituteArguments: String => String, - classPaths: Seq[String] = Seq[String](), + classPaths: Seq[String] = Seq.empty, env: Map[String, String] = sys.env): ProcessBuilder = { val localCommand = buildLocalCommand( command, securityMgr, substituteArguments, classPaths, env) @@ -73,7 +73,7 @@ object CommandUtils extends Logging { command: Command, securityMgr: SecurityManager, substituteArguments: String => String, - classPath: Seq[String] = Seq[String](), + classPath: Seq[String] = Seq.empty, env: Map[String, String]): Command = { val libraryPathName = Utils.libraryPathEnvName val libraryPathEntries = command.libraryPathEntries @@ -96,7 +96,7 @@ object CommandUtils extends Logging { command.arguments.map(substituteArguments), newEnvironment, command.classPathEntries ++ classPath, - Seq[String](), // library path already captured in environment variable + Seq.empty, // library path already captured in environment variable // filter out auth secret from java options command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF))) } diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 2610d6f6e45a27c74cf03a319cf899dc1776a0ea..8058a4d5dbdea3e50440558ec9e8af7312b0d476 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -17,6 +17,8 @@ package org.apache +import java.util.Properties + /** * Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to * Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection, @@ -40,9 +42,6 @@ package org.apache * Developer API</span> are intended for advanced users want to extend Spark through lower * level interfaces. These are subject to changes or removal in minor releases. */ - -import java.util.Properties - package object spark { private object SparkBuildInfo { @@ -57,6 +56,9 @@ package object spark { val resourceStream = Thread.currentThread().getContextClassLoader. getResourceAsStream("spark-version-info.properties") + if (resourceStream == null) { + throw new SparkException("Could not find spark-version-info.properties") + } try { val unknownProp = "<unknown>" @@ -71,8 +73,6 @@ package object spark { props.getProperty("date", unknownProp) ) } catch { - case npe: NullPointerException => - throw new SparkException("Error while locating file spark-version-info.properties", npe) case e: Exception => throw new SparkException("Error loading properties from spark-version-info.properties", e) } finally { diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 2cba1febe875914504ed179033308d72d0e6db55..10451a324b0f42c2594419198ca39023ee97d472 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -269,7 +269,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) tries = 0 // if we don't have enough partition groups, create duplicates while (numCreated < targetLen) { - var (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries) + val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries) tries += 1 val pgroup = new PartitionGroup(Some(nxt_replica)) groupArr += pgroup diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 1181371ab425a940e2f32ef2ac2ea5504b92a20d..f4b0ab10155a2149e769b236b59f25c71aa5fe2a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -97,7 +97,7 @@ private[spark] class Pool( } override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { - var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] + val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] val sortedSchedulableQueue = schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 366b92c5f2ada26ac303215d69e8b3386503a873..836769e1723d54e1cf7fc4e51f4f093be65ad66d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -60,7 +60,7 @@ private[spark] class DirectTaskResult[T]( val numUpdates = in.readInt if (numUpdates == 0) { - accumUpdates = Seq() + accumUpdates = Seq.empty } else { val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]] for (i <- 0 until numUpdates) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3968fb7e6356dfd7194c81b8b14f20c0446aadd1..589fe672ade71abe7739b346237d17b3568799cc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -891,7 +891,7 @@ private[spark] class TaskSetManager( override def removeSchedulable(schedulable: Schedulable) {} override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { - var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]() + val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]() sortedTaskSetQueue += this sortedTaskSetQueue } @@ -948,7 +948,7 @@ private[spark] class TaskSetManager( if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTimeMillis() - var medianDuration = successfulTaskDurations.median + val medianDuration = successfulTaskDurations.median val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0b396b794ddcef93606919eff12994742ea3c6ff..a46824a0c6fada0428f647d12d55013e240d0da3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -23,7 +23,6 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Future -import scala.concurrent.duration.Duration import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} import org.apache.spark.internal.Logging @@ -427,11 +426,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * be called in the yarn-client mode when AM re-registers after a failure. * */ protected def reset(): Unit = { - val executors = synchronized { + val executors: Set[String] = synchronized { requestedTotalExecutors = 0 numPendingExecutors = 0 executorsPendingToRemove.clear() - Set() ++ executorDataMap.keys + executorDataMap.keys.toSet } // Remove all the lingering executors that should be removed but not yet. The reason might be diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index adbe3cfd89ea69bf2930b4dbee6d998ad44a311b..aaacabe79ace4b46e36b014d806dee63c781d826 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1275,11 +1275,11 @@ private[spark] class BlockManager( val numPeersToReplicateTo = level.replication - 1 val startTime = System.nanoTime - var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas - var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId] + val peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas + val peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId] var numFailures = 0 - val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_)) + val initialPeers = getPeers(false).filterNot(existingReplicas.contains) var peersForReplication = blockReplicationPolicy.prioritize( blockManagerId, diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index cce7a7611b4206e16af8053f369d99aa985e6690..a7f2caafe04b8a602ae3bb29759c1b99f524bfb5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -241,7 +241,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { }.getOrElse(jobIdTitle) val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse( // New jobs should be shown above old jobs by default. - if (jobSortColumn == jobIdTitle) true else false + jobSortColumn == jobIdTitle ) val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100) val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index 2b0816e35747d966679e5f7bd41e298a886299f4..a30c13592947c68c2a476c4168bb56914b0c0c87 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -115,7 +115,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { if (sc.isDefined && isFairScheduler) { <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq } else { - Seq[Node]() + Seq.empty[Node] } } if (shouldShowActiveStages) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index b164f32b62e978b21130a54fc3c4070b4205ff7b..819fe57e14b2d0e7090804c17f94ae8ec364cc1b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -41,7 +41,7 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { val poolToActiveStages = listener.poolToActiveStages val activeStages = poolToActiveStages.get(poolName) match { case Some(s) => s.values.toSeq - case None => Seq[StageInfo]() + case None => Seq.empty[StageInfo] } val shouldShowActiveStages = activeStages.nonEmpty val activeStagesTable = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 6b3dadc3333166f48b3be8b35efb6a6e8a5bb148..8ed51746ab9d0582099489730075a67f15662914 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -565,7 +565,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val executorTable = new ExecutorTable(stageId, stageAttemptId, parent) val maybeAccumulableTable: Seq[Node] = - if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq() + if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq.empty val aggMetrics = <span class="collapse-aggregated-metrics collapse-table" diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index a28daf7f90451be83900486121294dfb909ea30e..f0a12a28de06900477ad5ffc36e65f1e0da1ddf8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -60,7 +60,7 @@ private[ui] class StageTableBase( }.getOrElse("Stage Id") val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse( // New stages should be shown above old jobs by default. - if (stageSortColumn == "Stage Id") true else false + stageSortColumn == "Stage Id" ) val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100) val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 317e0aa5ea25c23adbab25b5839d6c12444d5d7c..e8ff08f7d88ffb1de2cb6c2c1dfd08aa994a0008 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -51,7 +51,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true) .getOrElse { // Rather than crashing, render an "RDD Not Found" page - return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent) + return UIUtils.headerSparkPage("RDD Not Found", Seq.empty[Node], parent) } // Worker table diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 489688cb0880f3a652b5809fba5f1ca6bbffd779..48a1d7b84b61b63039ab57739f9197e09e5dadec 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -81,7 +81,7 @@ private[spark] object ClosureCleaner extends Logging { val stack = Stack[Class[_]](obj.getClass) while (!stack.isEmpty) { val cr = getClassReader(stack.pop()) - val set = Set[Class[_]]() + val set = Set.empty[Class[_]] cr.accept(new InnerClosureFinder(set), 0) for (cls <- set -- seen) { seen += cls @@ -180,16 +180,18 @@ private[spark] object ClosureCleaner extends Logging { val declaredFields = func.getClass.getDeclaredFields val declaredMethods = func.getClass.getDeclaredMethods - logDebug(" + declared fields: " + declaredFields.size) - declaredFields.foreach { f => logDebug(" " + f) } - logDebug(" + declared methods: " + declaredMethods.size) - declaredMethods.foreach { m => logDebug(" " + m) } - logDebug(" + inner classes: " + innerClasses.size) - innerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer classes: " + outerClasses.size) - outerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer objects: " + outerObjects.size) - outerObjects.foreach { o => logDebug(" " + o) } + if (log.isDebugEnabled) { + logDebug(" + declared fields: " + declaredFields.size) + declaredFields.foreach { f => logDebug(" " + f) } + logDebug(" + declared methods: " + declaredMethods.size) + declaredMethods.foreach { m => logDebug(" " + m) } + logDebug(" + inner classes: " + innerClasses.size) + innerClasses.foreach { c => logDebug(" " + c.getName) } + logDebug(" + outer classes: " + outerClasses.size) + outerClasses.foreach { c => logDebug(" " + c.getName) } + logDebug(" + outer objects: " + outerObjects.size) + outerObjects.foreach { o => logDebug(" " + o) } + } // Fail fast if we detect return statements in closures getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) @@ -201,7 +203,7 @@ private[spark] object ClosureCleaner extends Logging { // Initialize accessed fields with the outer classes first // This step is needed to associate the fields to the correct classes later for (cls <- outerClasses) { - accessedFields(cls) = Set[String]() + accessedFields(cls) = Set.empty[String] } // Populate accessed fields by visiting all fields and methods accessed by this and // all of its inner closures. If transitive cleaning is enabled, this may recursively diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 806d14e7cc119d81c66ac9938061105872d9d418..8406826a228db968271a8af8fd27ccc3c852c041 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -696,7 +696,7 @@ private[spark] object JsonProtocol { val accumulatedValues = { Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match { case Some(values) => values.map(accumulableInfoFromJson) - case None => Seq[AccumulableInfo]() + case None => Seq.empty[AccumulableInfo] } } @@ -726,7 +726,7 @@ private[spark] object JsonProtocol { val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean]) val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match { case Some(values) => values.map(accumulableInfoFromJson) - case None => Seq[AccumulableInfo]() + case None => Seq.empty[AccumulableInfo] } val taskInfo = diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 584337a71cb43df981cc644e90f8acc3881a800c..d661293e529f9e8a210d79b893655448458652c0 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1443,7 +1443,7 @@ private[spark] object Utils extends Logging { var firstUserFile = "<unknown>" var firstUserLine = 0 var insideSpark = true - var callStack = new ArrayBuffer[String]() :+ "<unknown>" + val callStack = new ArrayBuffer[String]() :+ "<unknown>" Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement => // When running under some profilers, the current stack trace might contain some bogus @@ -2438,7 +2438,7 @@ private[spark] object Utils extends Logging { .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName()) } - val EMPTY_USER_GROUPS = Set[String]() + val EMPTY_USER_GROUPS = Set.empty[String] // Returns the groups to which the current user belongs. def getCurrentUserGroups(sparkConf: SparkConf, username: String): Set[String] = { @@ -2587,7 +2587,7 @@ private[spark] object Utils extends Logging { * Unions two comma-separated lists of files and filters out empty strings. */ def unionFileLists(leftList: Option[String], rightList: Option[String]): Set[String] = { - var allFiles = Set[String]() + var allFiles = Set.empty[String] leftList.foreach { value => allFiles ++= value.split(",") } rightList.foreach { value => allFiles ++= value.split(",") } allFiles.filter { _.nonEmpty } diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala index a897cad02ffd7b7077f90ceb386115bb9da58e76..8dbb7ee4e5307973d1715fb69c3873299f40b42d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala @@ -53,16 +53,16 @@ object LocalFileLR { val fileSrc = scala.io.Source.fromFile(args(0)) val lines = fileSrc.getLines().toArray - val points = lines.map(parsePoint _) + val points = lines.map(parsePoint) val ITERATIONS = args(1).toInt // Initialize w to a random value - var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} + val w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { println("On iteration " + i) - var gradient = DenseVector.zeros[Double](D) + val gradient = DenseVector.zeros[Double](D) for (p <- points) { val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y gradient += p.x * scale diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala index fca585c2a362bc0e3d25eb5a2177c162ee28b43a..963c9a56d6cacad53924f0c1bc3d0aca7f080a88 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala @@ -47,12 +47,11 @@ object LocalKMeans { } def closestPoint(p: Vector[Double], centers: HashMap[Int, Vector[Double]]): Int = { - var index = 0 var bestIndex = 0 var closest = Double.PositiveInfinity for (i <- 1 to centers.size) { - val vCurr = centers.get(i).get + val vCurr = centers(i) val tempDist = squaredDistance(p, vCurr) if (tempDist < closest) { closest = tempDist @@ -76,8 +75,8 @@ object LocalKMeans { showWarning() val data = generateData - var points = new HashSet[Vector[Double]] - var kPoints = new HashMap[Int, Vector[Double]] + val points = new HashSet[Vector[Double]] + val kPoints = new HashMap[Int, Vector[Double]] var tempDist = 1.0 while (points.size < K) { @@ -92,11 +91,11 @@ object LocalKMeans { println("Initial centers: " + kPoints) while(tempDist > convergeDist) { - var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) + val closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) - var mappings = closest.groupBy[Int] (x => x._1) + val mappings = closest.groupBy[Int] (x => x._1) - var pointStats = mappings.map { pair => + val pointStats = mappings.map { pair => pair._2.reduceLeft [(Int, (Vector[Double], Int))] { case ((id1, (p1, c1)), (id2, (p2, c2))) => (id1, (p1 + p2, c1 + c2)) } @@ -107,7 +106,7 @@ object LocalKMeans { tempDist = 0.0 for (mapping <- newPoints) { - tempDist += squaredDistance(kPoints.get(mapping._1).get, mapping._2) + tempDist += squaredDistance(kPoints(mapping._1), mapping._2) } for (newP <- newPoints) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala index 13ccc2ae7c3d88b392d11cb87cf29969fb353fb9..eb5221f085937094805d10994b470abbfe6951f1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala @@ -60,12 +60,12 @@ object LocalLR { val data = generateData // Initialize w to a random value - var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} + val w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { println("On iteration " + i) - var gradient = DenseVector.zeros[Double](D) + val gradient = DenseVector.zeros[Double](D) for (p <- data) { val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y gradient += p.x * scale diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 05ac6cbcb35bccd1d20f84332b63769c5614e6db..9d675bbc18f3878368c0b188ae7302fe4d561f35 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -40,8 +40,8 @@ object SparkHdfsLR { def parsePoint(line: String): DataPoint = { val tok = new java.util.StringTokenizer(line, " ") - var y = tok.nextToken.toDouble - var x = new Array[Double](D) + val y = tok.nextToken.toDouble + val x = new Array[Double](D) var i = 0 while (i < D) { x(i) = tok.nextToken.toDouble; i += 1 @@ -78,7 +78,7 @@ object SparkHdfsLR { val ITERATIONS = args(1).toInt // Initialize w to a random value - var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} + val w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala index cb2be091ffcf3de66bf454abe5e00eacf1baedfb..c18e3d31f149eff585bf9bcbcf57c83f1919afae 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala @@ -72,7 +72,7 @@ object SparkLR { val points = spark.sparkContext.parallelize(generateData, numSlices).cache() // Initialize w to a random value - var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} + val w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala index b03701e4915d0682a890ae3950625e7887a4e6d9..19f2d7751bc5424c1c55011e8e2c2f859d12571a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala @@ -251,7 +251,7 @@ object DecisionTreeExample { .setMinInfoGain(params.minInfoGain) .setCacheNodeIds(params.cacheNodeIds) .setCheckpointInterval(params.checkpointInterval) - case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") + case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } stages += dt val pipeline = new Pipeline().setStages(stages.toArray) @@ -278,7 +278,7 @@ object DecisionTreeExample { } else { println(treeModel) // Print model summary. } - case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") + case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } // Evaluate model on training, test data. @@ -294,7 +294,7 @@ object DecisionTreeExample { println("Test data results:") evaluateRegressionModel(pipelineModel, test, labelColName) case _ => - throw new IllegalArgumentException("Algo ${params.algo} not supported.") + throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } spark.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala index 3bd8ff54c22381627415a57a95bcd8101d1d7447..8f3ce4b315bd3d5929fbeae451249a728cd8cc82 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala @@ -190,7 +190,7 @@ object GBTExample { .setCacheNodeIds(params.cacheNodeIds) .setCheckpointInterval(params.checkpointInterval) .setMaxIter(params.maxIter) - case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") + case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } stages += dt val pipeline = new Pipeline().setStages(stages.toArray) @@ -217,7 +217,7 @@ object GBTExample { } else { println(rfModel) // Print model summary. } - case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") + case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } // Evaluate model on training, test data. @@ -233,7 +233,7 @@ object GBTExample { println("Test data results:") DecisionTreeExample.evaluateRegressionModel(pipelineModel, test, labelColName) case _ => - throw new IllegalArgumentException("Algo ${params.algo} not supported.") + throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } spark.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala index a735c218c0d26e57553586d2675183cde6985b4d..3c127a46e1f1034276a78cd6cc49d23cd0f27bd8 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala @@ -198,7 +198,7 @@ object RandomForestExample { .setCheckpointInterval(params.checkpointInterval) .setFeatureSubsetStrategy(params.featureSubsetStrategy) .setNumTrees(params.numTrees) - case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") + case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } stages += dt val pipeline = new Pipeline().setStages(stages.toArray) @@ -225,7 +225,7 @@ object RandomForestExample { } else { println(rfModel) // Print model summary. } - case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") + case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } // Evaluate model on training, test data. @@ -241,7 +241,7 @@ object RandomForestExample { println("Test data results:") DecisionTreeExample.evaluateRegressionModel(pipelineModel, test, labelColName) case _ => - throw new IllegalArgumentException("Algo ${params.algo} not supported.") + throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } spark.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala index 0ad0465a023cfd166aed93607e165534fc7cfb78..fa47e12857f0cc4bf145b0980be401fe16d3e89a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala @@ -211,7 +211,7 @@ object DecisionTreeRunner { case Regression => (origExamples, null, 0) case _ => - throw new IllegalArgumentException("Algo ${params.algo} not supported.") + throw new IllegalArgumentException(s"Algo $algo not supported.") } // Create training, test sets. diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index f86b8f586d2a0513f15d5803a5893e52c0748b4a..5915d9f99a93962b6138d8b2e3fc8a1202eba6bf 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -378,7 +378,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L zkUtils.getLeaderForPartition(topic, partition).isDefined && Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && - leaderAndInSyncReplicas.isr.size >= 1 + leaderAndInSyncReplicas.isr.nonEmpty case _ => false diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 62cdf5b1134e4b31be356f6f9824780ca9ed08b8..d9fc9cc206647e0f5e445eccef22bd401b8f128f 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -156,7 +156,7 @@ private[spark] class KafkaRDD[K, V]( val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host == prefHost) val execs = if (prefExecs.isEmpty) allExecs else prefExecs if (execs.isEmpty) { - Seq() + Seq.empty } else { // execs is sorted, tp.hashCode depends only on topic and partition, so consistent index val index = Math.floorMod(tp.hashCode, execs.length) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 8273c2b49f6b5d41d7af686dc16b7c162b752584..6c7024ea4b5a5cd87b402111ef47a36a60361d94 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -257,7 +257,7 @@ private[kafka010] class KafkaTestUtils extends Logging { zkUtils.getLeaderForPartition(topic, partition).isDefined && Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && - leaderAndInSyncReplicas.isr.size >= 1 + leaderAndInSyncReplicas.isr.nonEmpty case _ => false diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 26349f4d88a19da36ba8b9b19facfc622bb53049..0e6a340a680ba737e66a2f99b41cbf4340df76f1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -388,7 +388,7 @@ class EdgePartition[ val aggregates = new Array[A](vertexAttrs.length) val bitset = new BitSet(vertexAttrs.length) - var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset) + val ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset) var i = 0 while (i < size) { val localSrcId = localSrcIds(i) @@ -433,7 +433,7 @@ class EdgePartition[ val aggregates = new Array[A](vertexAttrs.length) val bitset = new BitSet(vertexAttrs.length) - var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset) + val ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset) index.iterator.foreach { cluster => val clusterSrcId = cluster._1 val clusterPos = cluster._2 diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 5d2a53782b55d0135d95310c7a40f35d02e37af9..34e1253ff42ab8b13a0101ad60e1d7dcaddfa72f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -74,7 +74,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def getCheckpointFiles: Seq[String] = { Seq(vertices.getCheckpointFile, replicatedVertexView.edges.getCheckpointFile).flatMap { case Some(path) => Seq(path) - case None => Seq() + case None => Seq.empty } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index 2b3e5f98c4fe5da0ff8c0cc00516ce27fd6e12c2..419731146df7faf88f01c1f2112ecab8ddd1d244 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -18,6 +18,7 @@ package org.apache.spark.graphx.util import scala.annotation.tailrec +import scala.collection.mutable import scala.reflect.ClassTag import scala.util._ @@ -133,7 +134,7 @@ object GraphGenerators extends Logging { throw new IllegalArgumentException( s"numEdges must be <= $numEdgesUpperBound but was $numEdges") } - var edges: Set[Edge[Int]] = Set() + var edges = mutable.Set.empty[Edge[Int]] while (edges.size < numEdges) { if (edges.size % 100 == 0) { logDebug(edges.size + " edges") diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index b234bc4c2df4f95abf938fa19c02434bc9820d97..65b09e571264cd6c4582e947a080a7d6ddb4a126 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -736,7 +736,7 @@ class LogisticRegression @Since("1.2.0") ( b_k' = b_k - \mean(b_k) }}} */ - val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing + val rawIntercepts = histogram.map(math.log1p) // add 1 for smoothing (log1p(x) = log(1+x)) val rawMean = rawIntercepts.sum / rawIntercepts.length rawIntercepts.indices.foreach { i => initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean) @@ -820,7 +820,7 @@ class LogisticRegression @Since("1.2.0") ( val interceptVec = if ($(fitIntercept) || !isMultinomial) { Vectors.zeros(numCoefficientSets) } else { - Vectors.sparse(numCoefficientSets, Seq()) + Vectors.sparse(numCoefficientSets, Seq.empty) } // separate intercepts and coefficients from the combined matrix allCoefMatrix.foreachActive { (classIndex, featureIndex, value) => diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala index 2dd565a782719c050cdee0f9afe2657cdd895e75..32835fb3aa6d13ded09db27a9981f09125499115 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala @@ -99,7 +99,7 @@ private[ml] case class ParsedRFormula(label: ColumnRef, terms: Seq[Term]) { }).map(_.distinct) // Deduplicates feature interactions, for example, a:b is the same as b:a. - var seen = mutable.Set[Set[String]]() + val seen = mutable.Set[Set[String]]() validInteractions.flatMap { case t if seen.contains(t.toSet) => None diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 91cd229704a372133e66bcea7e2c08dce3fd03f9..ccc61feee82cfc693dd737d3f6b67fe58fb0b364 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -286,7 +286,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String s"training is not needed.") } if (handlePersistence) instances.unpersist() - val coefficients = Vectors.sparse(numFeatures, Seq()) + val coefficients = Vectors.sparse(numFeatures, Seq.empty) val intercept = yMean val model = copyValues(new LinearRegressionModel(uid, coefficients, intercept)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 98e50c5b45cfd94fd7931f798d82f0c88ed0d1dc..49043b5acb8078542e2e97b376abc47734a48630 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -363,7 +363,7 @@ class KMeans private ( // to their squared distance from the centers. Note that only distances between points // and new centers are computed in each iteration. var step = 0 - var bcNewCentersList = ArrayBuffer[Broadcast[_]]() + val bcNewCentersList = ArrayBuffer[Broadcast[_]]() while (step < initializationSteps) { val bcNewCenters = data.context.broadcast(newCenters) bcNewCentersList += bcNewCenters diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index 7695aabf4313d67e436da3edc11b3a03bb998487..c7c1a5404e5e8a2d2bc523aca94234b35c9ad38d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -78,13 +78,13 @@ private[mllib] object EigenValueDecomposition { require(n * ncv.toLong <= Integer.MAX_VALUE && ncv * (ncv.toLong + 8) <= Integer.MAX_VALUE, s"k = $k and/or n = $n are too large to compute an eigendecomposition") - var ido = new intW(0) - var info = new intW(0) - var resid = new Array[Double](n) - var v = new Array[Double](n * ncv) - var workd = new Array[Double](n * 3) - var workl = new Array[Double](ncv * (ncv + 8)) - var ipntr = new Array[Int](11) + val ido = new intW(0) + val info = new intW(0) + val resid = new Array[Double](n) + val v = new Array[Double](n * ncv) + val workd = new Array[Double](n * 3) + val workl = new Array[Double](ncv * (ncv + 8)) + val ipntr = new Array[Int](11) // call ARPACK's reverse communication, first iteration with ido = 0 arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd, diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index efedebe301387b505f0d6964463295a0aa49915f..21ec287e497d4a871b643b5d9b61f147a6deaada 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -257,7 +257,7 @@ object LBFGS extends Logging { (denseGrad1, loss1 + loss2) } - val zeroSparseVector = Vectors.sparse(n, Seq()) + val zeroSparseVector = Vectors.sparse(n, Seq.empty) val (gradientSum, lossSum) = data.treeAggregate((zeroSparseVector, 0.0))(seqOp, combOp) // broadcasted model is not needed anymore diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala index b760347bcb6fb1c0674ce01c820b0bc67f20b36b..ee51d332399e0e568dd6813127dc80fbbcd2a225 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala @@ -57,7 +57,7 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging { var preCol = -1 var preVal = Double.NaN var startRank = -1.0 - var cachedUids = ArrayBuffer.empty[Long] + val cachedUids = ArrayBuffer.empty[Long] val flush: () => Iterable[(Long, (Int, Double))] = () => { val averageRank = startRank + (cachedUids.size - 1) / 2.0 val output = cachedUids.map { uid => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala index 551ea357950bab19665933e45b96b1756561acb5..80c6ef0ea1aa17bd56d9fa101a5e4e3deb6af76c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala @@ -133,7 +133,7 @@ class StreamingTest @Since("1.6.0") () extends Logging with Serializable { if (time.milliseconds > data.slideDuration.milliseconds * peacePeriod) { rdd } else { - data.context.sparkContext.parallelize(Seq()) + data.context.sparkContext.parallelize(Seq.empty) } } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 41f3a0451aa8a270358452cc45eda81686af469d..b9db1df2d1919763bebe7d361283d83454eafbaf 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -589,7 +589,7 @@ object PySparkAssembly { val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") zipFile.delete() zipRecursive(src, zipFile) - Seq[File]() + Seq.empty[File] }).value ) @@ -810,7 +810,7 @@ object TestSettings { require(d.mkdir() || d.isDirectory(), s"Failed to create directory $d") } } - Seq[File]() + Seq.empty[File] }).value, concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), // Remove certain packages from Scaladoc diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala index a6bb5d5915022fd40301cb1af6255c433255118a..022191d0070fd2e7f4b373d9806783025c264fad 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala @@ -112,7 +112,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") <td>Last Task Status</td> <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td> </tr> - }.getOrElse(Seq[Node]()) + }.getOrElse(Seq.empty[Node]) } private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = { @@ -175,6 +175,6 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") {state.retries} </td> </tr> - }.getOrElse(Seq[Node]()) + }.getOrElse(Seq.empty[Node]) } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 062ed1f93fa52aacea305393389faf95779a8d2e..7ec116c74b10fa53c83837050216603b0b9dbaaf 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -333,7 +333,7 @@ trait MesosSchedulerUtils extends Logging { try { splitter.split(constraintsVal).asScala.toMap.mapValues(v => if (v == null || v.isEmpty) { - Set[String]() + Set.empty[String] } else { v.split(',').toSet } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7745709e07fe534b61af8f98b2a9ab493ebc3ca3..501e7e3c6961dd07afe8a476153f621f135b13ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2334,8 +2334,9 @@ object TimeWindowing extends Rule[LogicalPlan] { val windowExpressions = p.expressions.flatMap(_.collect { case t: TimeWindow => t }).toSet + val numWindowExpr = windowExpressions.size // Only support a single window expression for now - if (windowExpressions.size == 1 && + if (numWindowExpr == 1 && windowExpressions.head.timeColumn.resolved && windowExpressions.head.checkInputDataTypes().isSuccess) { @@ -2402,7 +2403,7 @@ object TimeWindowing extends Rule[LogicalPlan] { renamedPlan.withNewChildren(substitutedPlan :: Nil) } - } else if (windowExpressions.size > 1) { + } else if (numWindowExpr > 1) { p.failAnalysis("Multiple time window expressions would result in a cartesian product " + "of rows, therefore they are currently not supported.") } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index c863ba434120d6419fee943c23e7dc26b295493d..83a23cc97e45f88b6a7e3923cfbbf070d51f5978 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -228,10 +228,10 @@ case class ArrayContains(left: Expression, right: Expression) override def dataType: DataType = BooleanType override def inputTypes: Seq[AbstractDataType] = right.dataType match { - case NullType => Seq() + case NullType => Seq.empty case _ => left.dataType match { case n @ ArrayType(element, _) => Seq(n, element) - case _ => Seq() + case _ => Seq.empty } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala index db7baf6e9bc7d3a6891be44242488d9ab4baa37a..064ca68b7a628ec554adc856ae6dcac36f37877b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -150,7 +150,7 @@ object JoinReorderDP extends PredicateHelper with Logging { // Create the initial plans: each plan is a single item with zero cost. val itemIndex = items.zipWithIndex val foundPlans = mutable.Buffer[JoinPlanMap](itemIndex.map { - case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) + case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set.empty, Cost(0, 0)) }.toMap) // Build filters from the join graph to be used by the search algorithm. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 6c83f4790004fd9c058064b89942adfa00795c58..79a6c8663a56b76b06c8b1a140dbee274f43110a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -134,7 +134,7 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { private def collectGroupingExpressions(plan: LogicalPlan): ExpressionSet = plan match { case Aggregate(groupingExpressions, aggregateExpressions, child) => ExpressionSet.apply(groupingExpressions) - case _ => ExpressionSet(Seq()) + case _ => ExpressionSet(Seq.empty) } def apply(plan: LogicalPlan): LogicalPlan = plan transform { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index ad359e714bcc7d326fee8bdfdc2d236b905f590a..45c1d3d430e0d0acd7f7b2aba6812c4852393424 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -877,7 +877,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging // Reverse the contexts to have them in the same sequence as in the SQL statement & turn them // into expressions. - val expressions = contexts.reverse.map(expression) + val expressions = contexts.reverseMap(expression) // Create a balanced tree. def reduceToExpressionTree(low: Int, high: Int): Expression = high - low match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 7f370fb731b2f5aa0b0d0d290a6d10dc700fb5a5..8d034c21a496034d37ace4c39458d62f5a45735a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -173,7 +173,7 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { val (plans, conditions) = flattenJoin(j) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) - case _ => (Seq((plan, parentJoinType)), Seq()) + case _ => (Seq((plan, parentJoinType)), Seq.empty) } def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index e13db85c7a76e4db837b2791ee4ffa07bb404d39..74820eb97d08167c8ba83f0461f43b840be3a1b0 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -47,7 +47,7 @@ case class FilterEstimation(plan: Filter) extends Logging { // Estimate selectivity of this filter predicate, and update column stats if needed. // For not-supported condition, set filter selectivity to a conservative estimate 100% - val filterSelectivity = calculateFilterSelectivity(plan.condition).getOrElse(BigDecimal(1.0)) + val filterSelectivity = calculateFilterSelectivity(plan.condition).getOrElse(BigDecimal(1)) val filteredRowCount: BigInt = ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity) val newColStats = if (filteredRowCount == 0) { @@ -83,13 +83,13 @@ case class FilterEstimation(plan: Filter) extends Logging { : Option[BigDecimal] = { condition match { case And(cond1, cond2) => - val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(BigDecimal(1.0)) - val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(BigDecimal(1.0)) + val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(BigDecimal(1)) + val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(BigDecimal(1)) Some(percent1 * percent2) case Or(cond1, cond2) => - val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(BigDecimal(1.0)) - val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(BigDecimal(1.0)) + val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(BigDecimal(1)) + val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(BigDecimal(1)) Some(percent1 + percent2 - (percent1 * percent2)) // Not-operator pushdown @@ -464,7 +464,7 @@ case class FilterEstimation(plan: Filter) extends Logging { (numericLiteral > max, numericLiteral <= min) } - var percent = BigDecimal(1.0) + var percent = BigDecimal(1) if (noOverlap) { percent = 0.0 } else if (completeOverlap) { @@ -630,7 +630,7 @@ case class FilterEstimation(plan: Filter) extends Logging { ) } - var percent = BigDecimal(1.0) + var percent = BigDecimal(1) if (noOverlap) { percent = 0.0 } else if (completeOverlap) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala index f4d5a4471d896aa055f77ccd5561803926679efd..9ee777529aedaa57cb6099b733b05170fea15366 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala @@ -609,9 +609,9 @@ class MathExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(BRound(floatPi, scale), floatResults(i), EmptyRow) } - val bdResults: Seq[BigDecimal] = Seq(BigDecimal(3.0), BigDecimal(3.1), BigDecimal(3.14), - BigDecimal(3.142), BigDecimal(3.1416), BigDecimal(3.14159), - BigDecimal(3.141593), BigDecimal(3.1415927)) + val bdResults: Seq[BigDecimal] = Seq(BigDecimal(3), BigDecimal("3.1"), BigDecimal("3.14"), + BigDecimal("3.142"), BigDecimal("3.1416"), BigDecimal("3.14159"), + BigDecimal("3.141593"), BigDecimal("3.1415927")) (0 to 7).foreach { i => checkEvaluation(Round(bdPi, i), bdResults(i), EmptyRow) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala index 144f3d688d402fc72085e640ce6d6fabe2afd2a0..3193d1320ad9d600379694db144c727f4d0283d2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala @@ -109,8 +109,8 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester { test("small decimals represented as unscaled long") { checkCompact(new Decimal(), true) - checkCompact(Decimal(BigDecimal(10.03)), false) - checkCompact(Decimal(BigDecimal(1e20)), false) + checkCompact(Decimal(BigDecimal("10.03")), false) + checkCompact(Decimal(BigDecimal("100000000000000000000")), false) checkCompact(Decimal(17L), true) checkCompact(Decimal(17), true) checkCompact(Decimal(17L, 2, 1), true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index ba7ca84f229fc57b0a44eee560536fe0a2283f05..dae160f1bbb18b9d54803c7b3b8c8b147b48777f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -671,11 +671,11 @@ case class AlterTableRecoverPartitionsCommand( } else { logWarning( s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it") - Seq() + Seq.empty } } else { logWarning(s"ignore ${new Path(path, name)}") - Seq() + Seq.empty } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index a521fd132385218b0d7f4a60183b86a0c028b73e..658d13768a976b86f6ae544e8ed9de1623a33821 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -23,7 +23,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.Partition import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext} -import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType @@ -81,7 +80,7 @@ private[sql] object JDBCRelation extends Logging { val column = partitioning.column var i: Int = 0 var currentValue: Long = lowerBound - var ans = new ArrayBuffer[Partition]() + val ans = new ArrayBuffer[Partition]() while (i < numPartitions) { val lBound = if (i != 0) s"$column >= $currentValue" else null currentValue += stride diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 87fbf8b1bc9c4a22262e363d940d771edfc6c403..64eea26a9f98e19852c17e2715cc3873807367bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -220,7 +220,7 @@ class ParquetFileFormat val needMerged: Seq[FileStatus] = if (mergeRespectSummaries) { - Seq() + Seq.empty } else { filesByType.data } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 34391818f3b9ae1a2bbd66db75e41fce65392fd3..9e4e02b99bbdacb5aa0a0da06e1772f8a0d90bb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -35,8 +35,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState import org.apache.spark.sql.execution.streaming.GroupStateImpl import org.apache.spark.sql.streaming.GroupStateTimeout import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils - /** * Physical version of `ObjectProducer`. @@ -403,8 +401,7 @@ case class FlatMapGroupsInRExec( Seq(groupingAttributes.map(SortOrder(_, Ascending))) override protected def doExecute(): RDD[InternalRow] = { - val isSerializedRData = - if (outputSchema == SERIALIZED_R_DATA_SCHEMA) true else false + val isSerializedRData = outputSchema == SERIALIZED_R_DATA_SCHEMA val serializerForR = if (!isSerializedRData) { SerializationFormats.ROW } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala index d2178e971ec20fbf3d6d3e56fcb3983e468714ff..b9835c7dbb025b4cae7e3997372af4d05851860c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala @@ -34,8 +34,7 @@ case class MapPartitionsRWrapper( outputSchema: StructType) extends (Iterator[Any] => Iterator[Any]) { def apply(iter: Iterator[Any]): Iterator[Any] = { // If the content of current DataFrame is serialized R data? - val isSerializedRData = - if (inputSchema == SERIALIZED_R_DATA_SCHEMA) true else false + val isSerializedRData = inputSchema == SERIALIZED_R_DATA_SCHEMA val (newIter, deserializer, colNames) = if (!isSerializedRData) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 408c8f81f17ba66bbdca913e4a8a04c497159bd8..e37033b19a8eb16a3097195d2b28c14ac9a705c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -170,12 +170,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( private def compact(batchId: Long, logs: Array[T]): Boolean = { val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs - if (super.add(batchId, compactLogs(allLogs).toArray)) { - true - } else { - // Return false as there is another writer. - false - } + // Return false as there is another writer. + super.add(batchId, compactLogs(allLogs).toArray) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5ee596e06d5c53232ab9fce1b03fb318537ecc67..5711262654a165078149a426d737853e4e8b12a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -609,7 +609,7 @@ class StreamExecution( } // A list of attributes that will need to be updated. - var replacements = new ArrayBuffer[(Attribute, Attribute)] + val replacements = new ArrayBuffer[(Attribute, Attribute)] // Replace sources in the logical plan with data that has arrived since the last batch. val withNewSources = logicalPlan transform { case StreamingExecutionRelation(source, output) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 4568b67024acb2a3201938f7407482a790e6e4e8..d802557b36ec9f08480beeecdbee0a1231e1c0d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -65,9 +65,9 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { checkAnswer( decimalData.groupBy("a").agg(sum("b")), - Seq(Row(new java.math.BigDecimal(1.0), new java.math.BigDecimal(3.0)), - Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(3.0)), - Row(new java.math.BigDecimal(3.0), new java.math.BigDecimal(3.0))) + Seq(Row(new java.math.BigDecimal(1), new java.math.BigDecimal(3)), + Row(new java.math.BigDecimal(2), new java.math.BigDecimal(3)), + Row(new java.math.BigDecimal(3), new java.math.BigDecimal(3))) ) val decimalDataWithNulls = spark.sparkContext.parallelize( @@ -80,10 +80,10 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { DecimalData(null, 2) :: Nil).toDF() checkAnswer( decimalDataWithNulls.groupBy("a").agg(sum("b")), - Seq(Row(new java.math.BigDecimal(1.0), new java.math.BigDecimal(1.0)), - Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(1.0)), - Row(new java.math.BigDecimal(3.0), new java.math.BigDecimal(3.0)), - Row(null, new java.math.BigDecimal(2.0))) + Seq(Row(new java.math.BigDecimal(1), new java.math.BigDecimal(1)), + Row(new java.math.BigDecimal(2), new java.math.BigDecimal(1)), + Row(new java.math.BigDecimal(3), new java.math.BigDecimal(3)), + Row(null, new java.math.BigDecimal(2))) ) } @@ -259,19 +259,19 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { checkAnswer( decimalData.agg(avg('a)), - Row(new java.math.BigDecimal(2.0))) + Row(new java.math.BigDecimal(2))) checkAnswer( decimalData.agg(avg('a), sumDistinct('a)), // non-partial - Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(6)) :: Nil) + Row(new java.math.BigDecimal(2), new java.math.BigDecimal(6)) :: Nil) checkAnswer( decimalData.agg(avg('a cast DecimalType(10, 2))), - Row(new java.math.BigDecimal(2.0))) + Row(new java.math.BigDecimal(2))) // non-partial checkAnswer( decimalData.agg(avg('a cast DecimalType(10, 2)), sumDistinct('a cast DecimalType(10, 2))), - Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(6)) :: Nil) + Row(new java.math.BigDecimal(2), new java.math.BigDecimal(6)) :: Nil) } test("null average") { @@ -520,9 +520,9 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { test("SQL decimal test (used for catching certain decimal handling bugs in aggregates)") { checkAnswer( decimalData.groupBy('a cast DecimalType(10, 2)).agg(avg('b cast DecimalType(10, 2))), - Seq(Row(new java.math.BigDecimal(1.0), new java.math.BigDecimal(1.5)), - Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(1.5)), - Row(new java.math.BigDecimal(3.0), new java.math.BigDecimal(1.5)))) + Seq(Row(new java.math.BigDecimal(1), new java.math.BigDecimal("1.5")), + Row(new java.math.BigDecimal(2), new java.math.BigDecimal("1.5")), + Row(new java.math.BigDecimal(3), new java.math.BigDecimal("1.5")))) } test("SPARK-17616: distinct aggregate combined with a non-partial aggregate") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 3f3a6221d20b6b5bad2cd1efa3ffa0ef8484f1ae..7c500728bdec9afc9e2f1746728a5214ecc00ef7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1167,7 +1167,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-6899: type should match when using codegen") { - checkAnswer(decimalData.agg(avg('a)), Row(new java.math.BigDecimal(2.0))) + checkAnswer(decimalData.agg(avg('a)), Row(new java.math.BigDecimal(2))) } test("SPARK-7133: Implement struct, array, and map field accessor") { @@ -1971,7 +1971,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("SPARK-19691 Calculating percentile of decimal column fails with ClassCastException") { val df = spark.range(1).selectExpr("CAST(id as DECIMAL) as x").selectExpr("percentile(x, 0.5)") - checkAnswer(df, Row(BigDecimal(0.0)) :: Nil) + checkAnswer(df, Row(BigDecimal(0)) :: Nil) } test("SPARK-19893: cannot run set operations with map type") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index c78ec6d9a89ff7838c7c03f56406e459f2f7921f..e95f6dba4607960d4eedad33a4e05c479e0da929 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1546,10 +1546,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Seq(Row(d))) checkAnswer( df.selectExpr("b * a + b"), - Seq(Row(BigDecimal(2.12321)))) + Seq(Row(BigDecimal("2.12321")))) checkAnswer( df.selectExpr("b * a - b"), - Seq(Row(BigDecimal(0.12321)))) + Seq(Row(BigDecimal("0.12321")))) checkAnswer( df.selectExpr("b * a * b"), Seq(Row(d))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala index bcc23510499533771bbf89ecc45ad2ad42ef8850..a12efc835691b504ea5a859d61f6a0d17ce9ae4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -387,7 +387,7 @@ class StringFunctionsSuite extends QueryTest with SharedSQLContext { Row("6.4817")) checkAnswer( - df.select(format_number(lit(BigDecimal(7.128381)), 4)), // not convert anything + df.select(format_number(lit(BigDecimal("7.128381")), 4)), // not convert anything Row("7.1284")) intercept[AnalysisException] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 1cde137edbb91f69bef5fdd279f6afba89278478..80e5dd161d1f3d6cc3cf80903db6db7436ed30e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -826,7 +826,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { StructField("b", DecimalType(2, 2), true):: Nil) assert(expectedSchema === jsonDF.schema) - checkAnswer(jsonDF, Row(1.0E-39D, BigDecimal(0.01))) + checkAnswer(jsonDF, Row(1.0E-39D, BigDecimal("0.01"))) val mergedJsonDF = spark.read .option("prefersDecimal", "true") @@ -839,7 +839,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(expectedMergedSchema === mergedJsonDF.schema) checkAnswer( mergedJsonDF, - Row(1.0E-39D, BigDecimal(0.01)) :: + Row(1.0E-39D, BigDecimal("0.01")) :: Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index b4f3de9961209227f19388c9a81dac188a480c84..84b34d5ad26d1fd8d8c4a9c141cbdea7645046a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -676,7 +676,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha 1.5.toFloat, 4.5, new java.math.BigDecimal(new BigInteger("212500"), 5), - new java.math.BigDecimal(2.125), + new java.math.BigDecimal("2.125"), java.sql.Date.valueOf("2015-05-23"), new Timestamp(0), "This is a string, /[]?=:", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 2a522a1431f451483c2366cccabf5946491e57fd..be6339f7ddec3830e10b521ad6be8b6f595febb8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -245,7 +245,7 @@ private[spark] object HiveUtils extends Logging { val loader = new IsolatedClientLoader( version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), sparkConf = conf, - execJars = Seq(), + execJars = Seq.empty, hadoopConf = hadoopConf, config = newTemporaryConfiguration(useInMemoryDerby = true), isolationOn = false, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 16c1103dd1ea393eb90e84233884a19bfa2bfd14..f238b9a4f7f6f14d90f9bf66122469feca0aad32 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -162,8 +162,8 @@ class HadoopTableReader( if (!sparkSession.sessionState.conf.verifyPartitionPath) { partitionToDeserializer } else { - var existPathSet = collection.mutable.Set[String]() - var pathPatternSet = collection.mutable.Set[String]() + val existPathSet = collection.mutable.Set[String]() + val pathPatternSet = collection.mutable.Set[String]() partitionToDeserializer.filter { case (partition, partDeserializer) => def updateExistPathSetByPathPattern(pathPatternStr: String) { @@ -181,8 +181,8 @@ class HadoopTableReader( } val partPath = partition.getDataLocation - val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size(); - var pathPatternStr = getPathPatternByPath(partNum, partPath) + val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size() + val pathPatternStr = getPathPatternByPath(partNum, partPath) if (!pathPatternSet.contains(pathPatternStr)) { pathPatternSet += pathPatternStr updateExistPathSetByPathPattern(pathPatternStr) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index be024adac8eb0ca31dac71dad89e917fe8ccc3a6..bde9a81c65a4efd5c0eae7d76f2b6edaf36aa0f9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -391,7 +391,7 @@ private[hive] class HiveClientImpl( val sortColumnNames = if (allAscendingSorted) { sortColumnOrders.map(_.getCol) } else { - Seq() + Seq.empty } Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames)) } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala index 3de1f4aeb74dcaf98a0d101b78b86276211e465b..11fd8c56e631bf23430b105e5684a78ffa093d53 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala @@ -90,7 +90,7 @@ class HiveInspectorSuite extends SparkFunSuite with HiveInspectors { Literal(0.asInstanceOf[Double]) :: Literal("0") :: Literal(java.sql.Date.valueOf("2014-09-23")) :: - Literal(Decimal(BigDecimal(123.123))) :: + Literal(Decimal(BigDecimal("123.123"))) :: Literal(new java.sql.Timestamp(123123)) :: Literal(Array[Byte](1, 2, 3)) :: Literal.create(Seq[Int](1, 2, 3), ArrayType(IntegerType)) :: diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index 6a2c23a015529497a07384ebf615eb6254a5efc5..3eedcf7e0874e086fd3b407f078e53ba7947649d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -21,12 +21,9 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf import org.scalatest.BeforeAndAfterAll -import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, InSet, LessThan, LessThanOrEqual, Like, Literal, Or} +import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, In, InSet} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.hive.HiveUtils -import org.apache.spark.sql.types.{ByteType, IntegerType, StringType} // TODO: Refactor this to `HivePartitionFilteringSuite` class HiveClientSuite(version: String) @@ -146,7 +143,7 @@ class HiveClientSuite(version: String) 0 to 23, "aa" :: "ab" :: "ba" :: "bb" :: Nil, { case expr @ In(v, list) if expr.inSetConvertible => - InSet(v, Set() ++ list.map(_.eval(EmptyRow))) + InSet(v, list.map(_.eval(EmptyRow)).toSet) }) } @@ -165,7 +162,7 @@ class HiveClientSuite(version: String) 0 to 23, "ab" :: "ba" :: Nil, { case expr @ In(v, list) if expr.inSetConvertible => - InSet(v, Set() ++ list.map(_.eval(EmptyRow))) + InSet(v, list.map(_.eval(EmptyRow)).toSet) }) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index a34f6c73fea862d834b16d88f385c5787d43b20d..f3b4ff2d1d80c54b149ccf5bbae1f786682af007 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -458,7 +458,7 @@ class StreamingContext private[streaming] ( queue: Queue[RDD[T]], oneAtATime: Boolean = true ): InputDStream[T] = { - queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) + queueStream(queue, oneAtATime, sc.makeRDD(Seq.empty[T], 1)) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index a0a40fcee26d9eecbbb947924b012e642ee721c7..4a0ec31b5f3c8f22b44c7ea5da2f70db735940d6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -153,7 +153,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T def context(): StreamingContext = dstream.context /** Return a new DStream by applying a function to all elements of this DStream. */ - def map[R](f: JFunction[T, R]): JavaDStream[R] = { + def map[U](f: JFunction[T, U]): JavaDStream[U] = { new JavaDStream(dstream.map(f)(fakeClassTag))(fakeClassTag) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index 5bf1dabf08f45df708fa0d49be5acd18bdfc1f57..d1a5e9179370838efbf5cbc68bb840bf7fe715d2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -76,7 +76,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( // Re-apply the update function to the old state RDD val updateFuncLocal = updateFunc val finalFunc = (iterator: Iterator[(K, S)]) => { - val i = iterator.map(t => (t._1, Seq[V](), Option(t._2))) + val i = iterator.map(t => (t._1, Seq.empty[V], Option(t._2))) updateFuncLocal(validTime, i) } val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala index 408936653c7906474804b691227b8674c462506e..eb9996ece3779aca5ce6d9b4b611afaf9b6c4393 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -63,7 +63,6 @@ object RawTextHelper { var i = 0 var len = 0 - var done = false var value: (String, Long) = null var swap: (String, Long) = null var count = 0