diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index 281216612fc191f7945bdb5094ffb381d2e31a38..dd3eed8affe39b33e6b86bfb479c5ae8621ab122 100644 --- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -33,7 +33,8 @@ object Bagel extends Logging { * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often * this will be an empty array, i.e. sc.parallelize(Array[K, Message]()). * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a - * given vertex into one message before sending (which often involves network I/O). + * given vertex into one message before sending (which often involves network + * I/O). * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices * after each superstep and provides the result to each vertex in the next * superstep. diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 15a0d24fd954eaa9d5acde7adbc0aae8210ed680..b38af2497d3d95c1a220e5d36034d0242b42301d 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -32,7 +32,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */ def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, - storageLevel: StorageLevel): Iterator[T] = { + storageLevel: StorageLevel): Iterator[T] = { val key = RDDBlockId(rdd.id, split.index) logDebug("Looking for partition " + key) blockManager.get(key) match { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala index 2f2cbd182c967526b6746cc8031de9f9bdc56917..1f20aa3dfa39bfe747aca9103ddc7c59a646edb5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala @@ -33,8 +33,7 @@ private[spark] trait AppClientListener { /** Dead means that we couldn't find any Masters to connect to, and have given up. */ def dead(): Unit - def executorAdded( - fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit + def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 82bf655212fcc2a30afef218d1d72b955abb80f4..0bb9a9a937ff059b513892d24571a2cac6e769b7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -166,8 +166,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act System.exit(0) } - case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) - => { + case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => + { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( host, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { @@ -176,7 +176,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - sender, workerWebUiPort, publicAddress) + sender, workerUiPort, publicAddress) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 64ecf22399e395f620863b51d722553008b67444..04f9a22a25a94cd8212635490cf51c1db5da79d7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -123,7 +123,8 @@ private[spark] class IndexPage(parent: MasterWebUI) { </div> <div> - {if (hasDrivers) { + { + if (hasDrivers) { <div class="row-fluid"> <div class="span12"> <h4> Completed Drivers </h4> diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index f411eb9cec89f288802f4109866024e4110a1f65..2ceccc703d2912581687a9179286baa6b55a281f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -50,7 +50,7 @@ object CommandUtils extends Logging { .map(p => List("-Djava.library.path=" + p)) .getOrElse(Nil) val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")) - .map(Utils.splitCommandString).getOrElse(Nil) + .map(Utils.splitCommandString).getOrElse(Nil) val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil) val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M") diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 24d0a7deb57d04e9b123d4830ac24917589ee008..a78d6ac70f8caa86d42776b784bf2ff20d20ca7a 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -74,8 +74,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi new LinkedBlockingDeque[Runnable]()) private val serverChannel = ServerSocketChannel.open() - private val connectionsByKey = new HashMap[SelectionKey, Connection] - with SynchronizedMap[SelectionKey, Connection] + private val connectionsByKey = + new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection] private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection] private val messageStatuses = new HashMap[Int, MessageStatus] @@ -445,10 +445,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi assert (sendingConnectionManagerId == remoteConnectionManagerId) messageStatuses.synchronized { - for (s <- messageStatuses.values if - s.connectionManagerId == sendingConnectionManagerId) { - logInfo("Notifying " + s) - s.synchronized { + for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) { + logInfo("Notifying " + s) + s.synchronized { s.attempted = true s.acked = false s.markDone() @@ -574,7 +573,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi val promise = Promise[Option[Message]] val status = new MessageStatus( message, connectionManagerId, s => promise.success(s.ackMessage)) - messageStatuses.synchronized { + messageStatuses.synchronized { messageStatuses += ((message.id, status)) } sendMessage(connectionManagerId, message) @@ -684,8 +683,11 @@ private[spark] object ConnectionManager { println("--------------------------") val size = 10 * 1024 * 1024 val count = 10 - val buffers = Array.tabulate(count)(i => ByteBuffer.allocate(size * (i + 1)).put( - Array.tabulate[Byte](size * (i + 1))(x => x.toByte))) + val buffers = Array.tabulate(count) { i => + val bufferLen = size * (i + 1) + val bufferContent = Array.tabulate[Byte](bufferLen)(x => x.toByte) + ByteBuffer.allocate(bufferLen).put(bufferContent) + } buffers.foreach(_.flip) val mb = buffers.map(_.remaining).reduceLeft(_ + _) / 1024.0 / 1024.0 diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala index 820045aa21813b3f234bb9812bc36206d38cab6c..8e5c5296cb8d1890fa3a4cb28395e08b5138cbd8 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala @@ -77,12 +77,13 @@ private[spark] object ConnectionManagerTest extends Logging{ buffer.flip val startTime = System.currentTimeMillis - val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map(slaveConnManagerId => - { - val bufferMessage = Message.createBufferMessage(buffer.duplicate) - logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]") - connManager.sendMessageReliably(slaveConnManagerId, bufferMessage) - }) + val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map{ slaveConnManagerId => + { + val bufferMessage = Message.createBufferMessage(buffer.duplicate) + logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]") + connManager.sendMessageReliably(slaveConnManagerId, bufferMessage) + } + } val results = futures.map(f => Await.result(f, awaitTime)) val finishTime = System.currentTimeMillis Thread.sleep(5000) diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala index 9e03956ba0df979c3a61c06539765627a6bc35dd..162d49bf6161732db9a83005fe293efeb5e09156 100644 --- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala +++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala @@ -52,20 +52,19 @@ private[spark] object SenderTest { val dataMessage = Message.createBufferMessage(buffer.duplicate) val startTime = System.currentTimeMillis /*println("Started timer at " + startTime)*/ - val responseStr = - manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) match { - case Some(response) => - val buffer = response.asInstanceOf[BufferMessage].buffers(0) - new String(buffer.array) - case None => "none" - } + val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) + .map { response => + val buffer = response.asInstanceOf[BufferMessage].buffers(0) + new String(buffer.array) + }.getOrElse("none") + val finishTime = System.currentTimeMillis val mb = size / 1024.0 / 1024.0 val ms = finishTime - startTime // val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms at " + (mb / ms // * 1000.0) + " MB/s" - val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" + (mb / ms * - 1000.0).toInt + "MB/s) | Response = " + responseStr + val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" + + (mb / ms * 1000.0).toInt + "MB/s) | Response = " + responseStr println(resultStr) }) } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 42e1ef8375284bd82333de055aafd1922948e304..dc345b2df079baf1b6754b38fb704f90ae63701e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -199,8 +199,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc def next(): (String, Partition) = { if (it.hasNext) { it.next() - } - else { + } else { it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning it.next() } @@ -291,9 +290,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc val r1 = rnd.nextInt(groupArr.size) val r2 = rnd.nextInt(groupArr.size) val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2) - if (prefPart == None) { - // if no preferred locations, just use basic power of two - return minPowerOfTwo + if (prefPart.isEmpty) { + // if no preferred locations, just use basic power of two + return minPowerOfTwo } val prefPartActual = prefPart.get diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 56c7777600a6a8a2ec2c5e77c7f3b8071e5f6344..f270c1ac217575e9caa4a42eec24403b20a2617b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -39,8 +39,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag]( override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt override def equals(other: Any): Boolean = other match { - case that: ParallelCollectionPartition[_] => (this.rddId == that.rddId && - this.slice == that.slice) + case that: ParallelCollectionPartition[_] => + this.rddId == that.rddId && this.slice == that.slice case _ => false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 0544f81f1ce86a5840d6b38cab2974ce8954d656..77b1682b3e47c28a56b7eaea777d240f3b8164ae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -36,8 +36,8 @@ private[spark] object ResultTask { val metadataCleaner = new MetadataCleaner( MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues, new SparkConf) - def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _) - : Array[Byte] = { + def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = + { synchronized { val old = serializedInfoCache.get(stageId).orNull if (old != null) { @@ -56,8 +56,8 @@ private[spark] object ResultTask { } } - def deserializeInfo(stageId: Int, bytes: Array[Byte]) - : (RDD[_], (TaskContext, Iterator[_]) => _) = { + def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) = + { val loader = Thread.currentThread.getContextClassLoader val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) val ser = SparkEnv.get.closureSerializer.newInstance() diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index d25f0a63547e629b7afbc10855b458b34285d698..129153c732d9a71f8f734cf7d7ca4edc6b55967a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -100,13 +100,13 @@ class StatsReportListener extends SparkListener with Logging { //shuffle write showBytesDistribution("shuffle bytes written:", - (_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten}) + (_,metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten)) //fetch & io showMillisDistribution("fetch wait time:", - (_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime}) + (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime)) showBytesDistribution("remote bytes read:", - (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead}) + (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead)) showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize)) //runtime breakdown @@ -152,8 +152,8 @@ private[spark] object StatsReportListener extends Logging { logInfo("\t" + quantiles.mkString("\t")) } - def showDistribution(heading: String, - dOpt: Option[Distribution], formatNumber: Double => String) { + def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String) + { dOpt.foreach { d => showDistribution(heading, d, formatNumber)} } @@ -162,9 +162,11 @@ private[spark] object StatsReportListener extends Logging { showDistribution(heading, dOpt, f _) } - def showDistribution(heading:String, format: String, - getMetric: (TaskInfo,TaskMetrics) => Option[Double]) - (implicit stage: SparkListenerStageCompleted) { + def showDistribution( + heading: String, + format: String, + getMetric: (TaskInfo, TaskMetrics) => Option[Double]) + (implicit stage: SparkListenerStageCompleted) { showDistribution(heading, extractDoubleDistribution(stage, getMetric), format) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 542deb98c130456e3bd6a02c5e550293e11329e4..780a3a15dd15b3724348ccaacea0c9a9d0cc8656 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -291,7 +291,7 @@ private[spark] class BlockManager( throw new Exception("Block " + blockId + " not found on disk, though it should be") } } else { - doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] + doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala index 5ded9ab35982085d2957c96db12959a42a1f0577..dc62b1efaa7d4977555d1fd72deb56f770a74656 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala @@ -26,7 +26,7 @@ import org.apache.spark.network._ private[spark] class BlockMessageArray(var blockMessages: Seq[BlockMessage]) - extends Seq[BlockMessage] with Logging { + extends Seq[BlockMessage] with Logging { def this(bm: BlockMessage) = this(Array(bm)) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index b3deb41e761c843ac7136359eab7cd04841e7813..ade8ba1323b80004af7d335df22698826aac3595 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -94,12 +94,14 @@ private[spark] object JettyUtils extends Logging { } /** - * Attempts to start a Jetty server at the supplied hostName:port which uses the supplied handlers. + * Attempts to start a Jetty server at the supplied hostName:port which uses the supplied + * handlers. * * If the desired port number is contented, continues incrementing ports until a free port is * found. Returns the chosen port and the jetty Server object. */ - def startJettyServer(hostName: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = { + def startJettyServer(hostName: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) + = { val handlersToRegister = handlers.map { case(path, handler) => val contextHandler = new ContextHandler(path) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index b95c8f43b08f88cf5993fddf1baaca074479a9b7..547a194d58a5cb8f40f7176fb1efed398d3c5612 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -48,8 +48,8 @@ private[spark] object UIUtils { case _ => <li><a href={prependBaseUri("/environment")}>Environment</a></li> } val executors = page match { - case Executors => <li class="active"><a href={prependBaseUri("/executors")}>Executors</a> - </li> + case Executors => + <li class="active"><a href={prependBaseUri("/executors")}>Executors</a></li> case _ => <li><a href={prependBaseUri("/executors")}>Executors</a></li> } @@ -66,7 +66,8 @@ private[spark] object UIUtils { <div class="navbar navbar-static-top"> <div class="navbar-inner"> <a href={prependBaseUri("/")} class="brand"> - <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} /></a> + <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} /> + </a> <ul class="nav"> {jobs} {storage} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index 9412a48330d85ca834e5f6d215191bcc4b3b14d5..22bc97ada18be789e5bf25c63b13377941cf2176 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -61,9 +61,8 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis } <tr> <td> - <a href= - {"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}> - {p.name}</a></td> + <a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>{p.name}</a> + </td> <td>{p.minShare}</td> <td>{p.weight}</td> <td>{activeStages}</td> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 08107a3f62232646523ef9acc7024930fe13dff6..b6e98942ab811bea1f8db5fb80bd1c5aa04584fc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -223,28 +223,27 @@ private[spark] class StagePage(parent: JobProgressUI) { val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L) val serializationTime = metrics.map(m => m.resultSerializationTime).getOrElse(0L) - val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead} + val maybeShuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => s.remoteBytesRead) val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") - val shuffleReadReadable = maybeShuffleRead.map{Utils.bytesToString(_)}.getOrElse("") + val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") val maybeShuffleWrite = - metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten} + metrics.flatMap{m => m.shuffleWriteMetrics}.map(s => s.shuffleBytesWritten) val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("") - val shuffleWriteReadable = maybeShuffleWrite.map{Utils.bytesToString(_)}.getOrElse("") + val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("") - val maybeWriteTime = metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleWriteTime} + val maybeWriteTime = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => s.shuffleWriteTime) val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("") - val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms => + val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map{ ms => if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("") - val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled} + val maybeMemoryBytesSpilled = metrics.map(m => m.memoryBytesSpilled) val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") - val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)} - .getOrElse("") + val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled} val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") - val diskBytesSpilledReadable = maybeDiskBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("") + val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") <tr> <td>{info.index}</td> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 01b647917964277b2e3735d72ae14ed095037410..999a94fc2d008aba97443d41c48af35163bcc167 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -59,8 +59,8 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr </table> } - private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int) - : Seq[Node] = { + private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = + { val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) val startWidth = "width: %s%%".format((started.toDouble/total)*100) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 3eb0f081e4cf67fceb09be663148cb062e617d88..c0c057be8defc3fe93dab818bb6fae044d61e75e 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -180,8 +180,8 @@ private[spark] object ClosureCleaner extends Logging { } } -private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) - extends ClassVisitor(ASM4) { +private[spark] +class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { new MethodVisitor(ASM4) { diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index 5f86795183a9d6512748534a7063e46796d5434c..17c6481c18463fc60e802bb4a9afcf883060e52e 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -241,7 +241,7 @@ private[spark] object SizeEstimator extends Logging { } else if (cls == classOf[Double]) { DOUBLE_SIZE } else { - throw new IllegalArgumentException( + throw new IllegalArgumentException( "Non-primitive class " + cls + " passed to primitiveSize()") } } diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala index 0097dade190f6117ec70d9791eec01541396a800..4d2f45df85fc6c465c3feb64efde015de88f894d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala @@ -22,19 +22,21 @@ import org.apache.spark.SparkContext object BroadcastTest { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: BroadcastTest <master> [slices] [numElem] [broadcastAlgo] [blockSize]") + System.err.println("Usage: BroadcastTest <master> [slices] [numElem] [broadcastAlgo]" + + " [blockSize]") System.exit(1) - } - + } + val bcName = if (args.length > 3) args(3) else "Http" val blockSize = if (args.length > 4) args(4) else "4096" - System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory") + System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + + "BroadcastFactory") System.setProperty("spark.broadcast.blockSize", blockSize) val sc = new SparkContext(args(0), "Broadcast Test", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) - + val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 @@ -42,7 +44,7 @@ object BroadcastTest { for (i <- 0 until arr1.length) { arr1(i) = i } - + for (i <- 0 until 3) { println("Iteration " + i) println("===========") diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala index 33bf7151a7cc493bfce74029a3734bbc138fbd9a..3e3a3b2d50abe19cb67d07fa1dd0d54dd85a3409 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala @@ -146,68 +146,68 @@ assume Words keys as utf8; set Words['3musk001']['book'] = 'The Three Musketeers'; set Words['3musk001']['para'] = 'On the first Monday of the month of April, 1625, the market town of Meung, in which the author of ROMANCE OF THE ROSE was born, appeared to - be in as perfect a state of revolution as if the Huguenots had just made - a second La Rochelle of it. Many citizens, seeing the women flying - toward the High Street, leaving their children crying at the open doors, - hastened to don the cuirass, and supporting their somewhat uncertain - courage with a musket or a partisan, directed their steps toward the - hostelry of the Jolly Miller, before which was gathered, increasing - every minute, a compact group, vociferous and full of curiosity.'; + be in as perfect a state of revolution as if the Huguenots had just made + a second La Rochelle of it. Many citizens, seeing the women flying + toward the High Street, leaving their children crying at the open doors, + hastened to don the cuirass, and supporting their somewhat uncertain + courage with a musket or a partisan, directed their steps toward the + hostelry of the Jolly Miller, before which was gathered, increasing + every minute, a compact group, vociferous and full of curiosity.'; set Words['3musk002']['book'] = 'The Three Musketeers'; set Words['3musk002']['para'] = 'In those times panics were common, and few days passed without some city or other registering in its archives an event of this kind. There were - nobles, who made war against each other; there was the king, who made - war against the cardinal; there was Spain, which made war against the - king. Then, in addition to these concealed or public, secret or open - wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels, - who made war upon everybody. The citizens always took up arms readily - against thieves, wolves or scoundrels, often against nobles or - Huguenots, sometimes against the king, but never against cardinal or - Spain. It resulted, then, from this habit that on the said first Monday - of April, 1625, the citizens, on hearing the clamor, and seeing neither - the red-and-yellow standard nor the livery of the Duc de Richelieu, - rushed toward the hostel of the Jolly Miller. When arrived there, the - cause of the hubbub was apparent to all'; + nobles, who made war against each other; there was the king, who made + war against the cardinal; there was Spain, which made war against the + king. Then, in addition to these concealed or public, secret or open + wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels, + who made war upon everybody. The citizens always took up arms readily + against thieves, wolves or scoundrels, often against nobles or + Huguenots, sometimes against the king, but never against cardinal or + Spain. It resulted, then, from this habit that on the said first Monday + of April, 1625, the citizens, on hearing the clamor, and seeing neither + the red-and-yellow standard nor the livery of the Duc de Richelieu, + rushed toward the hostel of the Jolly Miller. When arrived there, the + cause of the hubbub was apparent to all'; set Words['3musk003']['book'] = 'The Three Musketeers'; set Words['3musk003']['para'] = 'You ought, I say, then, to husband the means you have, however large the sum may be; but you ought also to endeavor to perfect yourself in - the exercises becoming a gentleman. I will write a letter today to the - Director of the Royal Academy, and tomorrow he will admit you without - any expense to yourself. Do not refuse this little service. Our - best-born and richest gentlemen sometimes solicit it without being able - to obtain it. You will learn horsemanship, swordsmanship in all its - branches, and dancing. You will make some desirable acquaintances; and - from time to time you can call upon me, just to tell me how you are - getting on, and to say whether I can be of further service to you.'; + the exercises becoming a gentleman. I will write a letter today to the + Director of the Royal Academy, and tomorrow he will admit you without + any expense to yourself. Do not refuse this little service. Our + best-born and richest gentlemen sometimes solicit it without being able + to obtain it. You will learn horsemanship, swordsmanship in all its + branches, and dancing. You will make some desirable acquaintances; and + from time to time you can call upon me, just to tell me how you are + getting on, and to say whether I can be of further service to you.'; set Words['thelostworld001']['book'] = 'The Lost World'; set Words['thelostworld001']['para'] = 'She sat with that proud, delicate profile of hers outlined against the red curtain. How beautiful she was! And yet how aloof! We had been - friends, quite good friends; but never could I get beyond the same - comradeship which I might have established with one of my - fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly, - and perfectly unsexual. My instincts are all against a woman being too - frank and at her ease with me. It is no compliment to a man. Where - the real sex feeling begins, timidity and distrust are its companions, - heritage from old wicked days when love and violence went often hand in - hand. The bent head, the averted eye, the faltering voice, the wincing - figure--these, and not the unshrinking gaze and frank reply, are the - true signals of passion. Even in my short life I had learned as much - as that--or had inherited it in that race memory which we call instinct.'; + friends, quite good friends; but never could I get beyond the same + comradeship which I might have established with one of my + fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly, + and perfectly unsexual. My instincts are all against a woman being too + frank and at her ease with me. It is no compliment to a man. Where + the real sex feeling begins, timidity and distrust are its companions, + heritage from old wicked days when love and violence went often hand in + hand. The bent head, the averted eye, the faltering voice, the wincing + figure--these, and not the unshrinking gaze and frank reply, are the + true signals of passion. Even in my short life I had learned as much + as that--or had inherited it in that race memory which we call instinct.'; set Words['thelostworld002']['book'] = 'The Lost World'; set Words['thelostworld002']['para'] = 'I always liked McArdle, the crabbed, old, round-backed, red-headed news editor, and I rather hoped that he liked me. Of course, Beaumont was - the real boss; but he lived in the rarefied atmosphere of some Olympian - height from which he could distinguish nothing smaller than an - international crisis or a split in the Cabinet. Sometimes we saw him - passing in lonely majesty to his inner sanctum, with his eyes staring - vaguely and his mind hovering over the Balkans or the Persian Gulf. He - was above and beyond us. But McArdle was his first lieutenant, and it - was he that we knew. The old man nodded as I entered the room, and he - pushed his spectacles far up on his bald forehead.'; + the real boss; but he lived in the rarefied atmosphere of some Olympian + height from which he could distinguish nothing smaller than an + international crisis or a split in the Cabinet. Sometimes we saw him + passing in lonely majesty to his inner sanctum, with his eyes staring + vaguely and his mind hovering over the Balkans or the Persian Gulf. He + was above and beyond us. But McArdle was his first lieutenant, and it + was he that we knew. The old man nodded as I entered the room, and he + pushed his spectacles far up on his bald forehead.'; */ diff --git a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala index b3eb611dd228f7f6bc48f609b980f30ce30ca8a7..fdb976dfc6ababdb3da8488b6a1cf6baeb0afb89 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala @@ -29,8 +29,9 @@ object ExceptionHandlingTest { val sc = new SparkContext(args(0), "ExceptionHandlingTest", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) sc.parallelize(0 until sc.defaultParallelism).foreach { i => - if (math.random > 0.75) + if (math.random > 0.75) { throw new Exception("Testing exception handling") + } } System.exit(0) diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala index 39752fdd0eec4aba8ba3c02cb7678863e5923c71..36534e59353cdb099d22b0d04ddfb2dded7e714f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala @@ -24,7 +24,8 @@ import java.util.Random object GroupByTest { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]") + System.err.println( + "Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]") System.exit(1) } @@ -35,7 +36,7 @@ object GroupByTest { val sc = new SparkContext(args(0), "GroupBy Test", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) - + val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random var arr1 = new Array[(Int, Array[Byte])](numKVPairs) @@ -48,7 +49,7 @@ object GroupByTest { }.cache // Enforce that everything has been calculated and in cache pairs1.count - + println(pairs1.groupByKey(numReducers).count) System.exit(0) diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala index 9ab5f5a48620be26b439c5bf8dc4f1fc918b9cce..737c4441398cd5c8af38584a87e3f8f8abe4d64a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala @@ -28,7 +28,7 @@ object LocalFileLR { def parsePoint(line: String): DataPoint = { val nums = line.split(' ').map(_.toDouble) - DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) + DataPoint(new Vector(nums.slice(1, D + 1)), nums(0)) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala index a730464ea158ef5219c25f3e9feda6f2d1d591c0..3895675b3b003603cb54ac92815cad655f1752b0 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala @@ -80,7 +80,11 @@ object LocalKMeans { var mappings = closest.groupBy[Int] (x => x._1) - var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))}) + var pointStats = mappings.map { pair => + pair._2.reduceLeft [(Int, (Vector, Int))] { + case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1 + y2)) + } + } var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)} diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala index 544c7824690ef6f7755564ea1a4f755b5edcbcd9..fcaba6bb4fb85f08cb83fd7d8ab2232f022cd5b3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala @@ -50,10 +50,10 @@ object LogQuery { val dataSet = if (args.length == 2) sc.textFile(args(1)) else sc.parallelize(exampleApacheLogs) - + // scalastyle:off val apacheLogRegex = """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r - + // scalastyle:on /** Tracks the total query count and number of aggregate bytes for a particular group. */ class Stats(val count: Int, val numBytes: Int) extends Serializable { def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes) diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala index 31c6d108f34aed514bb10cb2b9a7765ba212cd26..966478fe4a2589d19626ffc319c9f8065043b288 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala @@ -24,7 +24,8 @@ import java.util.Random object SkewedGroupByTest { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]") + System.err.println( + "Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]") System.exit(1) } diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 39819064edbaa814159c6b3abdcfeececa3b7c33..cf1fc3e808c766d682d685d493c11af98967788a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -56,7 +56,8 @@ object SparkHdfsLR { val sc = new SparkContext(args(0), "SparkHdfsLR", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(), InputFormatInfo.computePreferredLocations( - Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)))) + Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) + )) val lines = sc.textFile(inputPath) val points = lines.map(parsePoint _).cache() val ITERATIONS = args(2).toInt diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala index cfafbaf23e4c22c06d485cb24be0b35ff7e44fd7..b97cb8fb02823e556340fa74d0b3c2ff9c69e4bd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala @@ -43,16 +43,18 @@ class PageRankUtils extends Serializable { val terminate = superstep >= 10 val outbox: Array[PRMessage] = - if (!terminate) - self.outEdges.map(targetId => - new PRMessage(targetId, newValue / self.outEdges.size)) - else + if (!terminate) { + self.outEdges.map(targetId => new PRMessage(targetId, newValue / self.outEdges.size)) + } else { Array[PRMessage]() + } (new PRVertex(newValue, self.outEdges, !terminate), outbox) } - def computeNoCombiner(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int): (PRVertex, Array[PRMessage]) = + def computeNoCombiner(numVertices: Long, epsilon: Double) + (self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int) + : (PRVertex, Array[PRMessage]) = computeWithCombiner(numVertices, epsilon)(self, messages match { case Some(msgs) => Some(msgs.map(_.value).sum) case None => None @@ -81,7 +83,8 @@ class PRVertex() extends Vertex with Serializable { } override def toString(): String = { - "PRVertex(value=%f, outEdges.length=%d, active=%s)".format(value, outEdges.length, active.toString) + "PRVertex(value=%f, outEdges.length=%d, active=%s)" + .format(value, outEdges.length, active.toString) } } diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala index 4c0de469645ab401a45fa2ef4f096da3cee58229..25bd55ca88b94c4d54209e95c38f117cfab1ca54 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala @@ -33,7 +33,8 @@ import scala.xml.{XML,NodeSeq} object WikipediaPageRank { def main(args: Array[String]) { if (args.length < 5) { - System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>") + System.err.println( + "Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>") System.exit(-1) } val sparkConf = new SparkConf() @@ -61,24 +62,26 @@ object WikipediaPageRank { val fields = line.split("\t") val (title, body) = (fields(1), fields(3).replace("\\n", "\n")) val links = - if (body == "\\N") + if (body == "\\N") { NodeSeq.Empty - else + } else { try { XML.loadString(body) \\ "link" \ "target" } catch { case e: org.xml.sax.SAXParseException => - System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body) + System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body) NodeSeq.Empty } + } val outEdges = links.map(link => new String(link.text)).toArray val id = new String(title) (id, new PRVertex(1.0 / numVertices, outEdges)) }) - if (usePartitioner) + if (usePartitioner) { vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache - else + } else { vertices = vertices.cache + } println("Done parsing input file.") // Do the computation @@ -92,7 +95,7 @@ object WikipediaPageRank { utils.computeWithCombiner(numVertices, epsilon)) // Print the result - System.err.println("Articles with PageRank >= "+threshold+":") + System.err.println("Articles with PageRank >= " + threshold + ":") val top = (result .filter { case (id, vertex) => vertex.value >= threshold } diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala index 2cf273a702d24be266eb54f3a1ea59c381c39cc5..27afa6b642758f97593bcbee44f6a3d8a2d8845f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala @@ -31,7 +31,8 @@ import org.apache.spark.rdd.RDD object WikipediaPageRankStandalone { def main(args: Array[String]) { if (args.length < 5) { - System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> <numIterations> <host> <usePartitioner>") + System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> " + + "<numIterations> <host> <usePartitioner>") System.exit(-1) } val sparkConf = new SparkConf() @@ -51,10 +52,11 @@ object WikipediaPageRankStandalone { val input = sc.textFile(inputFile) val partitioner = new HashPartitioner(sc.defaultParallelism) val links = - if (usePartitioner) + if (usePartitioner) { input.map(parseArticle _).partitionBy(partitioner).cache() - else + } else { input.map(parseArticle _).cache() + } val n = links.count() val defaultRank = 1.0 / n val a = 0.15 @@ -62,10 +64,11 @@ object WikipediaPageRankStandalone { // Do the computation val startTime = System.currentTimeMillis val ranks = - pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner, sc.defaultParallelism) + pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner, + sc.defaultParallelism) // Print the result - System.err.println("Articles with PageRank >= "+threshold+":") + System.err.println("Articles with PageRank >= " + threshold + ":") val top = (ranks .filter { case (id, rank) => rank >= threshold } @@ -75,7 +78,7 @@ object WikipediaPageRankStandalone { val time = (System.currentTimeMillis - startTime) / 1000.0 println("Completed %d iterations in %f seconds: %f seconds per iteration" - .format(numIterations, time, time / numIterations)) + .format(numIterations, time, time / numIterations)) System.exit(0) } @@ -84,16 +87,17 @@ object WikipediaPageRankStandalone { val (title, body) = (fields(1), fields(3).replace("\\n", "\n")) val id = new String(title) val links = - if (body == "\\N") + if (body == "\\N") { NodeSeq.Empty - else + } else { try { XML.loadString(body) \\ "link" \ "target" } catch { case e: org.xml.sax.SAXParseException => - System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body) + System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body) NodeSeq.Empty } + } val outEdges = links.map(link => new String(link.text)).toArray (id, outEdges) } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index bc0d1633f1c1ef1095a61dacf2d5d30e6754fbc3..3d7b390724e772e21553ec4cf2d5062600ef2f01 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -132,7 +132,7 @@ object FeederActor { * To run this example locally, you may run Feeder Actor as * `$ ./bin/run-example org.apache.spark.streaming.examples.FeederActor 127.0.1.1 9999` * and then run the example - * `$ ./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999` + * `./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999` */ object ActorWordCount { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index d9cb7326bb97d6e4b317af5c11ef69a4c008bc0d..6bccd1d88401aa00057a03c8ca69a88cf51db615 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -26,6 +26,7 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.util.RawTextHelper._ +// scalastyle:off /** * Consumes messages from one or more topics in Kafka and does wordcount. * Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads> @@ -38,6 +39,7 @@ import org.apache.spark.streaming.util.RawTextHelper._ * Example: * `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1` */ +// scalastyle:on object KafkaWordCount { def main(args: Array[String]) { if (args.length < 5) { @@ -56,7 +58,7 @@ object KafkaWordCount { val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1l)) + val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) wordCounts.print() diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index eb61caf8c85b92042ba56352ba756ea9c2eccea9..0a68ac84c242434a0b88cf7d282218d8873dcf70 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -64,6 +64,7 @@ object MQTTPublisher { } } +// scalastyle:off /** * A sample wordcount with MqttStream stream * @@ -71,7 +72,8 @@ object MQTTPublisher { * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto` * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/ - * Example Java code for Mqtt Publisher and Subscriber can be found here https://bitbucket.org/mkjinesh/mqttclient + * Example Java code for Mqtt Publisher and Subscriber can be found here + * https://bitbucket.org/mkjinesh/mqttclient * Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic> * In local mode, <master> should be 'local[n]' with n > 1 * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running. @@ -81,6 +83,7 @@ object MQTTPublisher { * and run the example as * `$ ./bin/run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo` */ +// scalastyle:on object MQTTWordCount { def main(args: Array[String]) { @@ -93,7 +96,7 @@ object MQTTWordCount { val Seq(master, brokerUrl, topic) = args.toSeq - val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), + val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index 5656d487a57cc9706146b35a14527557958d6836..d4c4d86b3466ccdf729803a6f4322c1cf42059e5 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -21,6 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel +// scalastyle:off /** * Counts words in text encoded with UTF8 received from the network every second. * @@ -33,6 +34,7 @@ import org.apache.spark.storage.StorageLevel * and then run the example * `$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999` */ +// scalastyle:on object NetworkWordCount { def main(args: Array[String]) { if (args.length < 3) { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala index aa82bf3c6bd8e1fb08e0abef95e6ef467a3d3dde..56d10a964b71be1c534ec8d4e532b056d4ec3cc1 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -30,8 +30,8 @@ import java.nio.charset.Charset * * Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file> * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. - * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. - * <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data + * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive + * data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data * <output-file> file to which the word counts will be appended * * In local mode, <master> should be 'local[n]' with n > 1 @@ -54,11 +54,13 @@ import java.nio.charset.Charset * * To run this example in a local standalone cluster with automatic driver recovery, * - * `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> <path-to-examples-jar> \ + * `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \ + * <path-to-examples-jar> \ * org.apache.spark.streaming.examples.RecoverableNetworkWordCount <cluster-url> \ * localhost 9999 ~/checkpoint ~/out` * - * <path-to-examples-jar> would typically be <spark-dir>/examples/target/scala-XX/spark-examples....jar + * <path-to-examples-jar> would typically be + * <spark-dir>/examples/target/scala-XX/spark-examples....jar * * Refer to the online documentation for more details. */ @@ -96,11 +98,12 @@ object RecoverableNetworkWordCount { System.err.println("You arguments were " + args.mkString("[", ", ", "]")) System.err.println( """ - |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file> - | <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. - | <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. - | <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data - | <output-file> file to which the word counts will be appended + |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> + | <output-file> <master> is the Spark master URL. In local mode, <master> should be + | 'local[n]' with n > 1. <hostname> and <port> describe the TCP server that Spark + | Streaming would connect to receive data. <checkpoint-directory> directory to + | HDFS-compatible file system which checkpoint data <output-file> file to which the + | word counts will be appended | |In local mode, <master> should be 'local[n]' with n > 1 |Both <checkpoint-directory> and <output-file> must be absolute paths diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index bbd44948b6fa5f13bafe639ed9af69ecf8ad3943..8a654f8fada21d6dc20bf102995b1f9ab8ecf7b0 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -24,7 +24,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.twitter._ - +// scalastyle:off /** * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. @@ -34,15 +34,19 @@ import org.apache.spark.streaming.twitter._ * the same approach could be used for computing popular topics for example. * <p> * <p> - * <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> - * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data structure - * for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc), - * that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the - * estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold - * percentage of the overall total count. + * <a href= + * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> + * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data + * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency + * of any given element, etc), that uses space sub-linear in the number of elements in the + * stream. Once elements are added to the CMS, the estimated count of an element can be computed, + * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total + * count. * <p><p> - * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation. + * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the + * reduce operation. */ +// scalastyle:on object TwitterAlgebirdCMS { def main(args: Array[String]) { if (args.length < 1) { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index c6215fd0d7561818338b17c8ae3b4ac1177122a8..45771d7050eebb405995e9b4bddfd34cfc8d3724 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -29,8 +29,7 @@ import org.apache.spark.streaming.twitter._ * a windowed and global estimate of the unique user IDs occurring in a Twitter stream. * <p> * <p> - * This <a href= "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data - * -mining/"> + * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> * blog post</a> and this * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html"> * blog post</a> diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index 85b4ce5e819502e30f80c8be81cc21cf68f22c25..35be7ffa1e872e43abfbdf0b7df6eb45f56310eb 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -53,11 +53,13 @@ object SimpleZeroMQPublisher { } } +// scalastyle:off /** * A sample wordcount with ZeroMQStream stream * * To work with zeroMQ, some native libraries have to be installed. - * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software) + * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide] + * (http://www.zeromq.org/intro:get-the-software) * * Usage: ZeroMQWordCount <master> <zeroMQurl> <topic> * In local mode, <master> should be 'local[n]' with n > 1 @@ -68,6 +70,7 @@ object SimpleZeroMQPublisher { * and run the example as * `$ ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` */ +// scalastyle:on object ZeroMQWordCount { def main(args: Array[String]) { if (args.length < 3) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 799a9dd1ee5b59630b5f7101c9ecf42d4d974563..f2296a865e1b3e909d378484f87eefd368b348d6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -67,8 +67,7 @@ class EdgeRDD[@specialized ED: ClassTag]( } private[graphx] def mapEdgePartitions[ED2: ClassTag]( - f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]) - : EdgeRDD[ED2] = { + f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = { new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter => val (pid, ep) = iter.next() Iterator(Tuple2(pid, f(pid, ep))) diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala index 0392a6051fc3bf6ad4b00552367d787a6cf6c9b3..a88a5e14539ecff465b96873a935388ec7e2e33d 100644 --- a/project/project/SparkPluginBuild.scala +++ b/project/project/SparkPluginBuild.scala @@ -20,5 +20,7 @@ import sbt._ object SparkPluginDef extends Build { lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener) /* This is not published in a Maven repository, so we get it from GitHub directly */ - lazy val junitXmlListener = uri("git://github.com/chenkelmann/junit_xml_listener.git#3f8029fbfda54dc7a68b1afd2f885935e1090016") + lazy val junitXmlListener = uri( + "https://github.com/chenkelmann/junit_xml_listener.git#3f8029fbfda54dc7a68b1afd2f885935e1090016" + ) } diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 752723267633e209e3651fcba3363a5b63b457f6..ee968c53b3e4b7b456a3c025a00e4b69045d8a74 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -1,4 +1,21 @@ -<!-- If you wish to turn off checking for a section of code, you can put a comment in the source before and after the section, with the following syntax: --> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<!-- If you wish to turn off checking for a section of code, you can put a comment in the source + before and after the section, with the following syntax: --> <!-- // scalastyle:off --> <!-- ... --> <!-- // naughty stuff --> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 4d778dc4d43b4e7bf180bc3b55fb3e1341d3f947..baf80fe2a91b73c445e7f3b69b68c56043163ab6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -128,7 +128,8 @@ class CheckpointWriter( while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 try { - logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + "'") + logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + + "'") // Write checkpoint to temp file fs.delete(tempFile, true) // just in case it exists @@ -167,11 +168,13 @@ class CheckpointWriter( return } catch { case ioe: IOException => - logWarning("Error in attempt " + attempts + " of writing checkpoint to " + checkpointFile, ioe) + logWarning("Error in attempt " + attempts + " of writing checkpoint to " + + checkpointFile, ioe) reset() } } - logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + checkpointFile + "'") + logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + + checkpointFile + "'") } } @@ -220,7 +223,8 @@ class CheckpointWriter( private[streaming] object CheckpointReader extends Logging { - def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = { + def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = + { val checkpointPath = new Path(checkpointDir) def fs = checkpointPath.getFileSystem(hadoopConf) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 0683113bd0b51ad62c18f6a3f66a5a818a20c7a1..fde46705d89fb1f05a27e250cd998555bbf1d5a6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -153,7 +153,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def validate() { this.synchronized { assert(batchDuration != null, "Batch duration has not been set") - //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low") + //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + + // " is very low") assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala index 04c994c136932a19bce2a12e088444e5feacb280..16479a01272aa37510e4dc84f7bf863746dd3fa9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala @@ -33,7 +33,8 @@ class Interval(val beginTime: Time, val endTime: Time) { def < (that: Interval): Boolean = { if (this.duration != that.duration) { - throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]") + throw new Exception("Comparing two intervals with different durations [" + this + ", " + + that + "]") } this.endTime < that.endTime } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 64fe204cdf7a56a6afa273924f7f6f3ab0a297a6..7aa7ead29b46996b8faffd31d288d4345366e0f1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -78,8 +78,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD has a single element generated by counting the number - * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the - * window() operation. This is equivalent to window(windowDuration, slideDuration).count() + * of elements in a window over this DStream. windowDuration and slideDuration are as defined in + * the window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = { dstream.countByWindow(windowDuration, slideDuration) @@ -87,8 +87,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD contains the count of distinct elements in - * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with - * Spark's default number of partitions. + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs + * with Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -103,8 +103,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD contains the count of distinct elements in - * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - * partitions. + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 62cfa0a229db14b9ceecb930b511fd92aa0f768f..4dcd0e4c51ec386b8802c27887d6d4172b8fadf9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -151,8 +151,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the - * partitioning of each RDD. + * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * thepartitioning of each RDD. */ def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = { dstream.reduceByKey(func, partitioner) @@ -160,8 +160,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more - * information. + * combineByKey for RDDs. Please refer to combineByKey in + * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -175,8 +175,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more - * information. + * combineByKey for RDDs. Please refer to combineByKey in + * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -241,7 +241,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. */ def groupByKeyAndWindow( windowDuration: Duration, @@ -315,7 +316,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. */ def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], @@ -403,7 +405,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. * @param filterFunc function to filter expired key-value pairs; * only pairs that satisfy the function are retained * set this to null if you do not want to filter @@ -479,7 +482,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. * @tparam S State type */ def updateStateByKey[S]( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 921b56143af255c2a38d6713beeadf1997e3f07e..2268160dccc1fefebc415416c6d1ff2977a74459 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -65,8 +65,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param appName Name to be used when registering with the scheduler * @param batchDuration The time interval at which streaming data will be divided into batches * @param sparkHome The SPARK_HOME directory on the slave nodes - * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local - * file system or an HDFS, HTTP, HTTPS, or FTP URL. + * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the + * local file system or an HDFS, HTTP, HTTPS, or FTP URL. */ def this( master: String, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 906a16e508cd83490c986bcab67d5ec9b1edfe21..903e3f3c9b7137b34d06d8fbadcbf6325838bdfd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -114,7 +114,8 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } override def toString() = { - "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]" + "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + + currentCheckpointFiles.mkString("\n") + "\n]" } @throws(classOf[IOException]) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 27303390d9e03cdd5b06a4e8c8afedcc8538ce41..226844c2284e33f8f1d7c2ae717ecd841cf7445f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -53,7 +53,8 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) } else { // Time is valid, but check it it is more than lastValidTime if (lastValidTime != null && time < lastValidTime) { - logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime) + logWarning("isTimeValid called with " + time + " where as last valid time is " + + lastValidTime) } lastValidTime = time true diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index ce153f065d35adbcbd833e01ed6e98e45dbf9586..0dc6704603f8222b8278fa3d74a1ad3a15958326 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -80,7 +80,8 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte private[streaming] sealed trait NetworkReceiverMessage private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage -private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) extends NetworkReceiverMessage +private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) + extends NetworkReceiverMessage private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage /** @@ -202,8 +203,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging } /** - * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts them into - * appropriately named blocks at regular intervals. This class starts two threads, + * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts + * them into appropriately named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index fb9df2f48eae37903e285db20c892491bc83e52d..f3c58aede092aceb59b2b9a81ffc08eace6e25df 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -38,11 +38,12 @@ import org.apache.spark.streaming.{Time, Duration} * these functions. */ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) -extends Serializable { + extends Serializable { private[streaming] def ssc = self.ssc - private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { + private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) + = { new HashPartitioner(numPartitions) } @@ -63,8 +64,8 @@ extends Serializable { } /** - * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]] - * is used to control the partitioning of each RDD. + * Return a new DStream by applying `groupByKey` on each RDD. The supplied + * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { val createCombiner = (v: V) => ArrayBuffer[V](v) @@ -94,8 +95,8 @@ extends Serializable { /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the - * partitioning of each RDD. + * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. */ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) @@ -113,7 +114,8 @@ extends Serializable { mergeCombiner: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true): DStream[(K, C)] = { - new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) + new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, + mapSideCombine) } /** @@ -138,7 +140,8 @@ extends Serializable { * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ - def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = { + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = + { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) } @@ -170,7 +173,8 @@ extends Serializable { * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner partitioner for controlling the partitioning of each RDD in the new + * DStream. */ def groupByKeyAndWindow( windowDuration: Duration, @@ -239,7 +243,8 @@ extends Serializable { slideDuration: Duration, numPartitions: Int ): DStream[(K, V)] = { - reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, + defaultPartitioner(numPartitions)) } /** @@ -315,7 +320,8 @@ extends Serializable { * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner partitioner for controlling the partitioning of each RDD in the new + * DStream. * @param filterFunc Optional function to filter expired key-value pairs; * only pairs that satisfy the function are retained */ @@ -373,7 +379,8 @@ extends Serializable { * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. * @tparam S State type */ def updateStateByKey[S: ClassTag]( @@ -395,7 +402,8 @@ extends Serializable { * this function may generate a different a tuple with a different key * than the input key. It is up to the developer to decide whether to * remember the partitioner despite the key being changed. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @tparam S State type */ @@ -438,7 +446,8 @@ extends Serializable { * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int) + : DStream[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(numPartitions)) } @@ -566,7 +575,8 @@ extends Serializable { prefix: String, suffix: String )(implicit fm: ClassTag[F]) { - saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, + fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -580,7 +590,7 @@ extends Serializable { valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf - ) { + ) { val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) @@ -596,7 +606,8 @@ extends Serializable { prefix: String, suffix: String )(implicit fm: ClassTag[F]) { - saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, + fm.runtimeClass.asInstanceOf[Class[F]]) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index 7a6b1ea35eb13163e15f0a77ec19f67a56bc0b9a..ca0a8ae47864d21f13d56268345af398ee2e29a0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -87,7 +87,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( val invReduceF = invReduceFunc val currentTime = validTime - val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime) + val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, + currentTime) val previousWindow = currentWindow - slideDuration logDebug("Window time = " + windowDuration) @@ -125,7 +126,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs // Cogroup the reduced RDDs and merge the reduced values - val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner) + val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], + partitioner) //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ val numOldValues = oldRDDs.size diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index 4ecba03ab5d2f84691d75b26c3caffdb4e362b65..57429a15329a119d2191c8de77c6cfcb63c8602e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -48,7 +48,8 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) val rdds = new ArrayBuffer[RDD[T]]() parents.map(_.getOrCompute(validTime)).foreach(_ match { case Some(rdd) => rdds += rdd - case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime) + case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + + validTime) }) if (rdds.size > 0) { Some(new UnionRDD(ssc.sc, rdds)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 6301772468737afe993c30cd3a84989ce57e7258..24289b714f99e292402a61a57164d2e33c69eff1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -31,13 +31,15 @@ class WindowedDStream[T: ClassTag]( _slideDuration: Duration) extends DStream[T](parent.ssc) { - if (!_windowDuration.isMultipleOf(parent.slideDuration)) + if (!_windowDuration.isMultipleOf(parent.slideDuration)) { throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " + "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + } - if (!_slideDuration.isMultipleOf(parent.slideDuration)) + if (!_slideDuration.isMultipleOf(parent.slideDuration)) { throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " + "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + } // Persist parent level by default, as those RDDs are going to be obviously reused. parent.persist(StorageLevel.MEMORY_ONLY_SER) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index b5f11d344068d828330377bfa37c251e2f208736..c7306248b19504d7c50552e59640df2c42e46587 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -46,8 +46,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime))) - private lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { - new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) + private lazy val checkpointWriter = + if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { + new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) } else { null } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index 0d9733fa69a12a67f386fa736f379aebfe06619c..e4fa163f2e069b8889405e90d5ba77340a675cbe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -34,9 +34,12 @@ import org.apache.spark.streaming.{Time, StreamingContext} import org.apache.spark.util.AkkaUtils private[streaming] sealed trait NetworkInputTrackerMessage -private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage -private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage -private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage +private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) + extends NetworkInputTrackerMessage +private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) + extends NetworkInputTrackerMessage +private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) + extends NetworkInputTrackerMessage /** * This class manages the execution of the receivers of NetworkInputDStreams. Instance of @@ -66,7 +69,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { } if (!networkInputStreams.isEmpty) { - actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker") + actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), + "NetworkInputTracker") receiverExecutor.start() logInfo("NetworkInputTracker started") } @@ -102,7 +106,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { throw new Exception("Register received for unexpected id " + streamId) } receiverInfo += ((streamId, receiverActor)) - logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) + logInfo("Registered receiver for network stream " + streamId + " from " + + sender.path.address) sender ! true } case AddBlocks(streamId, blockIds, metadata) => { @@ -153,12 +158,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { }) // Right now, we only honor preferences if all receivers have them - val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _) + val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined) + .reduce(_ && _) // Create the parallel collection of receivers to distributed them on the worker nodes val tempRDD = if (hasLocationPreferences) { - val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString))) + val receiversWithPreferences = + receivers.map(r => (r, Seq(r.getLocationPreference().toString))) ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences) } else { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 3063cf10a39f32fa1f9545c1ab2cdbb350db2529..18811fc2b01d856b45bec568964af88c8a6a9dc0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -23,7 +23,8 @@ import java.util.concurrent.LinkedBlockingQueue /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */ private[spark] class StreamingListenerBus() extends Logging { - private val listeners = new ArrayBuffer[StreamingListener]() with SynchronizedBuffer[StreamingListener] + private val listeners = new ArrayBuffer[StreamingListener]() + with SynchronizedBuffer[StreamingListener] /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 6a45bc2f8acbca587039e7ef8cac414cfa8f69cd..2bb616cfb8b08d14d2123d790fd38fde69a6443f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -407,10 +407,11 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) } } } - if (!done) + if (!done) { logError("Could not generate file " + hadoopFile) - else + } else { logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) + } Thread.sleep(interval) localFile.delete() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala index 179fd7593982cc0b902d0c9ffe22ac29b8179c0b..2b8cdb72b8d0e112b5d4a4c49c9c73d2c49527ff 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala @@ -71,8 +71,12 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu } } else { // Calculate how much time we should sleep to bring ourselves to the desired rate. - // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala) - val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS) + // Based on throttler in Kafka + // scalastyle:off + // (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala) + // scalastyle:on + val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), + SECONDS) if (sleepTime > 0) Thread.sleep(sleepTime) waitToWrite(numBytes) }