diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index 86e2061b9f0cb47c6bce071f87ece0806a8af62f..a2003d8049464947b13752f0ed4de8ee8829e8cb 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -26,7 +26,7 @@ class Accumulator[T] ( // Called by Java when deserializing an object private def readObject(in: ObjectInputStream) { - in.defaultReadObject + in.defaultReadObject() value_ = zero deserialized = true Accumulators.register(this, false) @@ -53,18 +53,22 @@ private object Accumulators { return lastId } - def register(a: Accumulator[_], original: Boolean): Unit = synchronized { - if (original) { - originals(a.id) = a - } else { - val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map()) - accums(a.id) = a + def register(a: Accumulator[_], original: Boolean) { + synchronized { + if (original) { + originals(a.id) = a + } else { + val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map()) + accums(a.id) = a + } } } // Clear the local (non-original) accumulators for the current thread - def clear: Unit = synchronized { - localAccums.remove(Thread.currentThread) + def clear() { + synchronized { + localAccums.remove(Thread.currentThread) + } } // Get the values of the local accumulators for the current thread (by ID) @@ -77,10 +81,12 @@ private object Accumulators { } // Add values to the original accumulators with some given IDs - def add(values: Map[Long, Any]): Unit = synchronized { - for ((id, value) <- values) { - if (originals.contains(id)) { - originals(id).asInstanceOf[Accumulator[Any]] += value + def add(values: Map[Long, Any]) { + synchronized { + for ((id, value) <- values) { + if (originals.contains(id)) { + originals(id).asInstanceOf[Accumulator[Any]] += value + } } } } diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index e00a0d80fa25a15e4bf884912613566acba5ab63..010203d1cad83e5c0e0c25a3ec1f24706c28bd76 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -41,7 +41,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { val (blockId, blockOption) = x blockOption match { case Some(block) => { - val values = block.asInstanceOf[Iterator[Any]] + val values = block for(value <- values) { val v = value.asInstanceOf[(K, V)] func(v._1, v._2) diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/CartesianRDD.scala index 38afa59b29d1d379648774658f699e99c074a149..e26041555a024737ecb2c6e248ab9a5d57dc4f4a 100644 --- a/core/src/main/scala/spark/CartesianRDD.scala +++ b/core/src/main/scala/spark/CartesianRDD.scala @@ -1,7 +1,7 @@ package spark class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable { - override val index = idx + override val index: Int = idx } class CartesianRDD[T: ClassManifest, U:ClassManifest]( @@ -24,7 +24,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( array } - override def splits = splits_.asInstanceOf[Array[Split]] + override def splits = splits_ override def preferredLocations(split: Split) = { val currSplit = split.asInstanceOf[CartesianSplit] diff --git a/core/src/main/scala/spark/ClosureCleaner.scala b/core/src/main/scala/spark/ClosureCleaner.scala index 699fdc29820ef21f61860c10544cd6841e7139e0..3b83d23a132956b147b4881718409cb31a289919 100644 --- a/core/src/main/scala/spark/ClosureCleaner.scala +++ b/core/src/main/scala/spark/ClosureCleaner.scala @@ -76,7 +76,7 @@ object ClosureCleaner extends Logging { } } - def clean(func: AnyRef): Unit = { + def clean(func: AnyRef) { // TODO: cache outerClasses / innerClasses / accessedFields val outerClasses = getOuterClasses(func) val innerClasses = getInnerClasses(func) @@ -109,7 +109,7 @@ object ClosureCleaner extends Logging { // Clone the closure objects themselves, nulling out any fields that are not // used in the closure we're working on or any of its inner closures. for ((cls, obj) <- outerPairs) { - outer = instantiateClass(cls, outer, inInterpreter); + outer = instantiateClass(cls, outer, inInterpreter) for (fieldName <- accessedFields(cls)) { val field = cls.getDeclaredField(fieldName) field.setAccessible(true) @@ -139,10 +139,10 @@ object ClosureCleaner extends Logging { return cons.newInstance(params: _*).asInstanceOf[AnyRef] } else { // Use reflection to instantiate object without calling constructor - val rf = sun.reflect.ReflectionFactory.getReflectionFactory(); - val parentCtor = classOf[java.lang.Object].getDeclaredConstructor(); + val rf = sun.reflect.ReflectionFactory.getReflectionFactory() + val parentCtor = classOf[java.lang.Object].getDeclaredConstructor() val newCtor = rf.newConstructorForSerialization(cls, parentCtor) - val obj = newCtor.newInstance().asInstanceOf[AnyRef]; + val obj = newCtor.newInstance().asInstanceOf[AnyRef] if (outer != null) { //logInfo("3: Setting $outer on " + cls + " to " + outer); val field = cls.getDeclaredField("$outer") diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala index 3543c8afa8a081f201f630df60dcb6f915c01115..6959917d14f2f2b0267c2c68fd0bc90827d9046e 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/CoGroupedRDD.scala @@ -11,7 +11,7 @@ case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplit case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable { - override val index = idx + override val index: Int = idx override def hashCode(): Int = idx } diff --git a/core/src/main/scala/spark/DaemonThreadFactory.scala b/core/src/main/scala/spark/DaemonThreadFactory.scala index cb30cb2ac8ee189c7eeecbb55eea965f6117b003..003880c5e838bacb9a4e9388f1b2021e74701931 100644 --- a/core/src/main/scala/spark/DaemonThreadFactory.scala +++ b/core/src/main/scala/spark/DaemonThreadFactory.scala @@ -7,7 +7,7 @@ import java.util.concurrent.ThreadFactory */ private object DaemonThreadFactory extends ThreadFactory { override def newThread(r: Runnable): Thread = { - val t = new Thread(r); + val t = new Thread(r) t.setDaemon(true) return t } diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index 3d70cf1737f5e44ca8c74d7bac6f18aa390a195a..9ead0d287011f2d8708df5f59a0b8ea4a16ea1ae 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -67,7 +67,7 @@ class Executor extends org.apache.mesos.Executor with Logging { class TaskRunner(info: MTaskInfo, d: ExecutorDriver) extends Runnable { - override def run() = { + override def run() { val tid = info.getTaskId.getValue SparkEnv.set(env) Thread.currentThread.setContextClassLoader(classLoader) diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala index 598a18fe7243db804229e84863e29c40300ce871..f282a4023b126857c2af23dc6ffae2fab8d8db8f 100644 --- a/core/src/main/scala/spark/HadoopRDD.scala +++ b/core/src/main/scala/spark/HadoopRDD.scala @@ -26,7 +26,7 @@ class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit) override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt - override val index = idx + override val index: Int = idx } /** diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index 790603581fe696d0280652556a21cbe2d140a5fb..12b6a0954c8d0808388a07bae1837682e7b29745 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -119,7 +119,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl private def getOutputCommitter(): OutputCommitter = { if (committer == null) { - committer = conf.value.getOutputCommitter().asInstanceOf[OutputCommitter] + committer = conf.value.getOutputCommitter } return committer } @@ -149,11 +149,11 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl } private def setConfParams() { - conf.value.set("mapred.job.id", jID.value.toString); - conf.value.set("mapred.tip.id", taID.value.getTaskID.toString); - conf.value.set("mapred.task.id", taID.value.toString); - conf.value.setBoolean("mapred.task.is.map", true); - conf.value.setInt("mapred.task.partition", splitID); + conf.value.set("mapred.job.id", jID.value.toString) + conf.value.set("mapred.tip.id", taID.value.getTaskID.toString) + conf.value.set("mapred.task.id", taID.value.toString) + conf.value.setBoolean("mapred.task.is.map", true) + conf.value.setInt("mapred.task.partition", splitID) } } diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala index c17ec995d49e7428ad3ba0badf60a18631380a92..d11ba5167dcafbe421f9ee727f88bd887032a845 100644 --- a/core/src/main/scala/spark/JavaSerializer.scala +++ b/core/src/main/scala/spark/JavaSerializer.scala @@ -49,7 +49,7 @@ class JavaSerializerInstance extends SerializerInstance { } def deserializeStream(s: InputStream): DeserializationStream = { - new JavaDeserializationStream(s, currentThread.getContextClassLoader) + new JavaDeserializationStream(s, Thread.currentThread.getContextClassLoader) } def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = { diff --git a/core/src/main/scala/spark/Logging.scala b/core/src/main/scala/spark/Logging.scala index 54bd57f6d3c94d2c17160f3ddaf38b1485f12e50..5ff61af72cf2bb52b68dae33be48ae02eead61b6 100644 --- a/core/src/main/scala/spark/Logging.scala +++ b/core/src/main/scala/spark/Logging.scala @@ -17,7 +17,7 @@ trait Logging { // Method to get or create the logger for this object def log: Logger = { if (log_ == null) { - var className = this.getClass().getName() + var className = this.getClass.getName // Ignore trailing $'s in the class names for Scala objects if (className.endsWith("$")) { className = className.substring(0, className.length - 1) @@ -28,31 +28,46 @@ trait Logging { } // Log methods that take only a String - def logInfo(msg: => String) = if (log.isInfoEnabled /*&& msg.contains("job finished in")*/) log.info(msg) + def logInfo(msg: => String) { + if (log.isInfoEnabled) log.info(msg) + } - def logDebug(msg: => String) = if (log.isDebugEnabled) log.debug(msg) + def logDebug(msg: => String) { + if (log.isDebugEnabled) log.debug(msg) + } - def logTrace(msg: => String) = if (log.isTraceEnabled) log.trace(msg) + def logTrace(msg: => String) { + if (log.isTraceEnabled) log.trace(msg) + } - def logWarning(msg: => String) = if (log.isWarnEnabled) log.warn(msg) + def logWarning(msg: => String) { + if (log.isWarnEnabled) log.warn(msg) + } - def logError(msg: => String) = if (log.isErrorEnabled) log.error(msg) + def logError(msg: => String) { + if (log.isErrorEnabled) log.error(msg) + } // Log methods that take Throwables (Exceptions/Errors) too - def logInfo(msg: => String, throwable: Throwable) = + def logInfo(msg: => String, throwable: Throwable) { if (log.isInfoEnabled) log.info(msg) + } - def logDebug(msg: => String, throwable: Throwable) = + def logDebug(msg: => String, throwable: Throwable) { if (log.isDebugEnabled) log.debug(msg) + } - def logTrace(msg: => String, throwable: Throwable) = + def logTrace(msg: => String, throwable: Throwable) { if (log.isTraceEnabled) log.trace(msg) + } - def logWarning(msg: => String, throwable: Throwable) = + def logWarning(msg: => String, throwable: Throwable) { if (log.isWarnEnabled) log.warn(msg, throwable) + } - def logError(msg: => String, throwable: Throwable) = + def logError(msg: => String, throwable: Throwable) { if (log.isErrorEnabled) log.error(msg, throwable) + } // Method for ensuring that logging is initialized, to avoid having multiple // threads do it concurrently (as SLF4J initialization is not thread safe). diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/NewHadoopRDD.scala index cd42586aa6b63e69f5c9a7bcaf2ef1ffaa29075c..d024d38aa91e3309508b4d5267cb5e6fb8296ad3 100644 --- a/core/src/main/scala/spark/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/NewHadoopRDD.scala @@ -18,7 +18,7 @@ class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit val serializableHadoopSplit = new SerializableWritable(rawSplit) - override def hashCode(): Int = (41 * (41 + rddId) + index).toInt + override def hashCode(): Int = (41 * (41 + rddId) + index) } class NewHadoopRDD[K, V]( @@ -69,7 +69,7 @@ class NewHadoopRDD[K, V]( finished = !reader.nextKeyValue havePair = !finished if (finished) { - reader.close + reader.close() } } !finished diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala index 21f68f21c2c408ab2c22d994ca647fd5d3469988..d79007ab40191fa49a51596f7535c61ad4d9e726 100644 --- a/core/src/main/scala/spark/ParallelCollection.scala +++ b/core/src/main/scala/spark/ParallelCollection.scala @@ -18,7 +18,7 @@ class ParallelCollectionSplit[T: ClassManifest]( case _ => false } - override val index = slice + override val index: Int = slice } class ParallelCollection[T: ClassManifest]( diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 1191523ccc752bdfffaa4063a252635d9e773f54..1710ff58b32f0c5d24cc64e9a557cec69816de78 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -130,12 +130,12 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial if (num > initialCount) { total = maxSelected - fraction = Math.min(multiplier * (maxSelected + 1) / initialCount, 1.0) + fraction = math.min(multiplier * (maxSelected + 1) / initialCount, 1.0) } else if (num < 0) { throw(new IllegalArgumentException("Negative number of elements requested")) } else { - fraction = Math.min(multiplier * (num + 1) / initialCount, 1.0) - total = num.toInt + fraction = math.min(multiplier * (num + 1) / initialCount, 1.0) + total = num } var samples = this.sample(withReplacement, fraction, seed).collect() diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/SampledRDD.scala index c9a9e53d18475e3be1a86577de984dd948695a2a..8ef40d8d9e6777ff812cd7c04c5c3c48cdd50049 100644 --- a/core/src/main/scala/spark/SampledRDD.scala +++ b/core/src/main/scala/spark/SampledRDD.scala @@ -3,7 +3,7 @@ package spark import java.util.Random class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable { - override val index = prev.index + override val index: Int = prev.index } class SampledRDD[T: ClassManifest]( @@ -15,7 +15,7 @@ class SampledRDD[T: ClassManifest]( @transient val splits_ = { - val rg = new Random(seed); + val rg = new Random(seed) prev.splits.map(x => new SampledRDDSplit(x, rg.nextInt)) } @@ -28,7 +28,7 @@ class SampledRDD[T: ClassManifest]( override def compute(splitIn: Split) = { val split = splitIn.asInstanceOf[SampledRDDSplit] - val rg = new Random(split.seed); + val rg = new Random(split.seed) // Sampling with replacement (TODO: use reservoir sampling to make this more efficient?) if (withReplacement) { val oldData = prev.iterator(split.prev).toArray diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala index 5434197ecad3330fb000b6c5a3238453e16a3b19..594dbd235fb4b0de38429f299ce38678f0c7f347 100644 --- a/core/src/main/scala/spark/ShuffledRDD.scala +++ b/core/src/main/scala/spark/ShuffledRDD.scala @@ -40,7 +40,7 @@ class ShuffledRDD[K, V, C]( return new Iterator[(K, C)] { var iter = combiners.entrySet().iterator() - def hasNext(): Boolean = iter.hasNext() + def hasNext: Boolean = iter.hasNext() def next(): (K, C) = { val entry = iter.next() diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index fc364b530707601694e712b93aa6bb4b6725e111..cba70794e7f069fe56de5aed4605eff646fecd89 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -424,18 +424,6 @@ object SparkContext { implicit def stringToText(s: String) = new Text(s) private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = { - def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { - val c = { - if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { - classManifest[T].erasure - } else { - implicitly[T => Writable].getClass.getMethods()(0).getReturnType - } - // TODO: use something like WritableConverter to avoid reflection - } - c.asInstanceOf[Class[ _ <: Writable]] - } - def anyToWritable[U <% Writable](u: U): Writable = u new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], diff --git a/core/src/main/scala/spark/Split.scala b/core/src/main/scala/spark/Split.scala index 831f7672e628b0e58ffcba86ac2b90f895d27781..90d4b47c553c535ff3271da0ddceb66a7ed832f9 100644 --- a/core/src/main/scala/spark/Split.scala +++ b/core/src/main/scala/spark/Split.scala @@ -7,7 +7,7 @@ trait Split extends Serializable { /** * Get the split's index within its parent RDD */ - val index: Int + def index: Int // A better default implementation of HashCode override def hashCode(): Int = index diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/UnionRDD.scala index 17522e2bbb6d1077d4d8caefc778753229d820d2..0e8164d6abf4cfe85e92c8484de77e7bb8c0d2ef 100644 --- a/core/src/main/scala/spark/UnionRDD.scala +++ b/core/src/main/scala/spark/UnionRDD.scala @@ -11,7 +11,7 @@ class UnionSplit[T: ClassManifest]( def iterator() = rdd.iterator(split) def preferredLocations() = rdd.preferredLocations(split) - override val index = idx + override val index: Int = idx } class UnionRDD[T: ClassManifest]( diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 89624eb37013a89223f56274062fd184b2a0be9b..17670e077ab7e205e9ae1be97987005d4e3fe0ec 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -16,7 +16,7 @@ object Utils { val bos = new ByteArrayOutputStream() val oos = new ObjectOutputStream(bos) oos.writeObject(o) - oos.close + oos.close() return bos.toByteArray } @@ -48,7 +48,7 @@ object Utils { j += 1 } if (j > i) { - buf += s.substring(i, j); + buf += s.substring(i, j) } i = j while (i < s.length && !isAlpha(s.charAt(i))) { diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index 5a873dca3de3beb7aed587ad41cc5faeee23d7d5..e009d4e7dbff9b5093135c7ab003937982b4c79b 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -60,7 +60,7 @@ extends Broadcast[T] with Logging with Serializable { sendBroadcast } - def sendBroadcast(): Unit = { + def sendBroadcast() { logInfo("Local host address: " + hostAddress) // Store a persistent copy in HDFS @@ -96,7 +96,7 @@ extends Broadcast[T] with Logging with Serializable { // Must always come AFTER guideMR is created while (guidePort == -1) { guidePortLock.synchronized { - guidePortLock.wait + guidePortLock.wait() } } @@ -108,7 +108,7 @@ extends Broadcast[T] with Logging with Serializable { // Must always come AFTER serveMR is created while (listenPort == -1) { listenPortLock.synchronized { - listenPortLock.wait + listenPortLock.wait() } } @@ -127,8 +127,8 @@ extends Broadcast[T] with Logging with Serializable { SourceInfo(hostAddress, guidePort, totalBlocks, totalBytes, blockSize)) } - private def readObject(in: ObjectInputStream): Unit = { - in.defaultReadObject + private def readObject(in: ObjectInputStream) { + in.defaultReadObject() BitTorrentBroadcast.synchronized { val cachedVal = BitTorrentBroadcast.values.get(uuid, 0) @@ -168,7 +168,7 @@ extends Broadcast[T] with Logging with Serializable { } // Initialize variables in the worker node. Master sends everything as 0/null - private def initializeWorkerVariables: Unit = { + private def initializeWorkerVariables() { arrayOfBlocks = null hasBlocksBitVector = null numCopiesSent = null @@ -194,7 +194,7 @@ extends Broadcast[T] with Logging with Serializable { stopBroadcast = false } - private def registerBroadcast(uuid: UUID, gInfo: SourceInfo): Unit = { + private def registerBroadcast(uuid: UUID, gInfo: SourceInfo) { val socket = new Socket(Broadcast.MasterHostAddress, Broadcast.MasterTrackerPort) val oosST = new ObjectOutputStream(socket.getOutputStream) @@ -222,7 +222,7 @@ extends Broadcast[T] with Logging with Serializable { socket.close() } - private def unregisterBroadcast(uuid: UUID): Unit = { + private def unregisterBroadcast(uuid: UUID) { val socket = new Socket(Broadcast.MasterHostAddress, Broadcast.MasterTrackerPort) val oosST = new ObjectOutputStream(socket.getOutputStream) @@ -250,14 +250,14 @@ extends Broadcast[T] with Logging with Serializable { // Wait till hostName and listenPort are OK while (listenPort == -1) { listenPortLock.synchronized { - listenPortLock.wait + listenPortLock.wait() } } // Wait till totalBlocks and totalBytes are OK while (totalBlocks == -1) { totalBlocksLock.synchronized { - totalBlocksLock.wait + totalBlocksLock.wait() } } @@ -275,7 +275,7 @@ extends Broadcast[T] with Logging with Serializable { // Add new SourceInfo to the listOfSources. Update if it exists already. // TODO: Optimizing just by OR-ing the BitVectors was BAD for performance - private def addToListOfSources(newSourceInfo: SourceInfo): Unit = { + private def addToListOfSources(newSourceInfo: SourceInfo) { listOfSources.synchronized { if (listOfSources.contains(newSourceInfo)) { listOfSources = listOfSources - newSourceInfo @@ -284,7 +284,7 @@ extends Broadcast[T] with Logging with Serializable { } } - private def addToListOfSources(newSourceInfos: ListBuffer[SourceInfo]): Unit = { + private def addToListOfSources(newSourceInfos: ListBuffer[SourceInfo]) { newSourceInfos.foreach { newSourceInfo => addToListOfSources(newSourceInfo) } @@ -292,7 +292,7 @@ extends Broadcast[T] with Logging with Serializable { class TalkToGuide(gInfo: SourceInfo) extends Thread with Logging { - override def run: Unit = { + override def run() { // Keep exchaning information until all blocks have been received while (hasBlocks.get < totalBlocks) { @@ -307,7 +307,7 @@ extends Broadcast[T] with Logging with Serializable { } // Connect to Guide and send this worker's information - private def talkOnce: Unit = { + private def talkOnce { var clientSocketToGuide: Socket = null var oosGuide: ObjectOutputStream = null var oisGuide: ObjectInputStream = null @@ -402,7 +402,7 @@ extends Broadcast[T] with Logging with Serializable { // ServeMultipleRequests thread while (listenPort == -1) { listenPortLock.synchronized { - listenPortLock.wait + listenPortLock.wait() } } @@ -412,7 +412,7 @@ extends Broadcast[T] with Logging with Serializable { hasBlocksBitVector = new BitSet(totalBlocks) numCopiesSent = new Array[Int](totalBlocks) totalBlocksLock.synchronized { - totalBlocksLock.notifyAll + totalBlocksLock.notifyAll() } totalBytes = gInfo.totalBytes blockSize = gInfo.blockSize @@ -445,7 +445,7 @@ extends Broadcast[T] with Logging with Serializable { // certain bit is NOT unset upon failure resulting in an infinite loop. private var blocksInRequestBitVector = new BitSet(totalBlocks) - override def run: Unit = { + override def run() { var threadPool = Utils.newDaemonFixedThreadPool(Broadcast.MaxRxSlots) while (hasBlocks.get < totalBlocks) { @@ -613,13 +613,13 @@ extends Broadcast[T] with Logging with Serializable { private var oosSource: ObjectOutputStream = null private var oisSource: ObjectInputStream = null - override def run: Unit = { + override def run() { // TODO: There is a possible bug here regarding blocksInRequestBitVector var blockToAskFor = -1 // Setup the timeout mechanism var timeOutTask = new TimerTask { - override def run: Unit = { + override def run() { cleanUpConnections() } } @@ -645,7 +645,7 @@ extends Broadcast[T] with Logging with Serializable { addToListOfSources(newPeerToTalkTo) // Turn the timer OFF, if the sender responds before timeout - timeOutTimer.cancel + timeOutTimer.cancel() // Send the latest SourceInfo oosSource.writeObject(getLocalSourceInfo) @@ -836,7 +836,7 @@ extends Broadcast[T] with Logging with Serializable { } } - private def cleanUpConnections(): Unit = { + private def cleanUpConnections() { if (oisSource != null) { oisSource.close() } @@ -860,7 +860,7 @@ extends Broadcast[T] with Logging with Serializable { // Keep track of sources that have completed reception private var setOfCompletedSources = Set[SourceInfo]() - override def run: Unit = { + override def run() { var threadPool = Utils.newDaemonCachedThreadPool() var serverSocket: ServerSocket = null @@ -869,7 +869,7 @@ extends Broadcast[T] with Logging with Serializable { logInfo("GuideMultipleRequests => " + serverSocket + " " + guidePort) guidePortLock.synchronized { - guidePortLock.notifyAll + guidePortLock.notifyAll() } try { @@ -920,7 +920,7 @@ extends Broadcast[T] with Logging with Serializable { } } - private def sendStopBroadcastNotifications: Unit = { + private def sendStopBroadcastNotifications() { listOfSources.synchronized { listOfSources.foreach { sourceInfo => @@ -972,7 +972,7 @@ extends Broadcast[T] with Logging with Serializable { private var sourceInfo: SourceInfo = null private var selectedSources: ListBuffer[SourceInfo] = null - override def run: Unit = { + override def run() { try { logInfo("new GuideSingleRequest is running") // Connecting worker is sending in its information @@ -1060,14 +1060,14 @@ extends Broadcast[T] with Logging with Serializable { // Server at most Broadcast.MaxTxSlots peers var threadPool = Utils.newDaemonFixedThreadPool(Broadcast.MaxTxSlots) - override def run: Unit = { + override def run() { var serverSocket = new ServerSocket(0) listenPort = serverSocket.getLocalPort logInfo("ServeMultipleRequests started with " + serverSocket) listenPortLock.synchronized { - listenPortLock.notifyAll + listenPortLock.notifyAll() } try { @@ -1111,7 +1111,7 @@ extends Broadcast[T] with Logging with Serializable { logInfo("new ServeSingleRequest is running") - override def run: Unit = { + override def run() { try { // Send latest local SourceInfo to the receiver // In the case of receiver timeout and connection close, this will @@ -1178,7 +1178,7 @@ extends Broadcast[T] with Logging with Serializable { } } - private def sendBlock(blockToSend: Int): Unit = { + private def sendBlock(blockToSend: Int) { try { oos.writeObject(arrayOfBlocks(blockToSend)) oos.flush() @@ -1195,12 +1195,13 @@ extends Broadcast[T] with Logging with Serializable { class BitTorrentBroadcastFactory extends BroadcastFactory { - def initialize(isMaster: Boolean) = { + def initialize(isMaster: Boolean) { BitTorrentBroadcast.initialize(isMaster) } - def newBroadcast[T](value_ : T, isLocal: Boolean) = + def newBroadcast[T](value_ : T, isLocal: Boolean) = { new BitTorrentBroadcast[T](value_, isLocal) + } } private object BitTorrentBroadcast @@ -1217,7 +1218,7 @@ extends Logging { private var trackMV: TrackMultipleValues = null - def initialize(isMaster__ : Boolean): Unit = { + def initialize(isMaster__ : Boolean) { synchronized { if (!initialized) { @@ -1245,7 +1246,7 @@ extends Logging { class TrackMultipleValues extends Thread with Logging { - override def run: Unit = { + override def run() { var threadPool = Utils.newDaemonCachedThreadPool() var serverSocket: ServerSocket = null @@ -1267,7 +1268,7 @@ extends Logging { if (clientSocket != null) { try { threadPool.execute(new Thread { - override def run: Unit = { + override def run() { val oos = new ObjectOutputStream(clientSocket.getOutputStream) oos.flush() val ois = new ObjectInputStream(clientSocket.getInputStream) diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index 06049749a91702593aac42f36b46800e9de42f66..4c7ab5f8ec336bd7103455cfef4c8eb788cf8079 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -30,26 +30,28 @@ object Broadcast extends Logging with Serializable { private var broadcastFactory: BroadcastFactory = null // Called by SparkContext or Executor before using Broadcast - def initialize (isMaster__ : Boolean): Unit = synchronized { - if (!initialized) { - val broadcastFactoryClass = System.getProperty( - "spark.broadcast.factory", "spark.broadcast.HttpBroadcastFactory") - - broadcastFactory = - Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory] - - // Setup isMaster before using it - isMaster_ = isMaster__ - - // Set masterHostAddress to the master's IP address for the slaves to read - if (isMaster) { - System.setProperty("spark.broadcast.masterHostAddress", Utils.localIpAddress) - } + def initialize (isMaster__ : Boolean) { + synchronized { + if (!initialized) { + val broadcastFactoryClass = System.getProperty( + "spark.broadcast.factory", "spark.broadcast.HttpBroadcastFactory") + + broadcastFactory = + Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory] + + // Setup isMaster before using it + isMaster_ = isMaster__ - // Initialize appropriate BroadcastFactory and BroadcastObject - broadcastFactory.initialize(isMaster) + // Set masterHostAddress to the master's IP address for the slaves to read + if (isMaster) { + System.setProperty("spark.broadcast.masterHostAddress", Utils.localIpAddress) + } - initialized = true + // Initialize appropriate BroadcastFactory and BroadcastObject + broadcastFactory.initialize(isMaster) + + initialized = true + } } } @@ -185,11 +187,11 @@ object Broadcast extends Logging with Serializable { } } -case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) extends Serializable +case class BroadcastBlock (blockID: Int, byteArray: Array[Byte]) extends Serializable -case class VariableInfo (@transient val arrayOfBlocks : Array[BroadcastBlock], - val totalBlocks: Int, - val totalBytes: Int) +case class VariableInfo (@transient arrayOfBlocks : Array[BroadcastBlock], + totalBlocks: Int, + totalBytes: Int) extends Serializable { @transient @@ -200,7 +202,7 @@ class SpeedTracker extends Serializable { // Mapping 'source' to '(totalTime, numBlocks)' private var sourceToSpeedMap = Map[SourceInfo, (Long, Int)] () - def addDataPoint (srcInfo: SourceInfo, timeInMillis: Long): Unit = { + def addDataPoint (srcInfo: SourceInfo, timeInMillis: Long) { sourceToSpeedMap.synchronized { if (!sourceToSpeedMap.contains(srcInfo)) { sourceToSpeedMap += (srcInfo -> (timeInMillis, 1)) diff --git a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala index 64da650142b2b53abc7136264b6df723a0ece9d5..43290c241fc6e87ea6748e6bf313b1dcfd6a2194 100644 --- a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala @@ -47,7 +47,7 @@ extends Broadcast[T] with Logging with Serializable { sendBroadcast } - def sendBroadcast(): Unit = { + def sendBroadcast() { logInfo("Local host address: " + hostAddress) // Store a persistent copy in HDFS @@ -80,7 +80,7 @@ extends Broadcast[T] with Logging with Serializable { while (listenPort == -1) { listenPortLock.synchronized { - listenPortLock.wait + listenPortLock.wait() } } @@ -92,14 +92,14 @@ extends Broadcast[T] with Logging with Serializable { // Register with the Tracker while (guidePort == -1) { guidePortLock.synchronized { - guidePortLock.wait + guidePortLock.wait() } } ChainedBroadcast.registerValue(uuid, guidePort) } - private def readObject(in: ObjectInputStream): Unit = { - in.defaultReadObject + private def readObject(in: ObjectInputStream) { + in.defaultReadObject() ChainedBroadcast.synchronized { val cachedVal = ChainedBroadcast.values.get(uuid, 0) if (cachedVal != null) { @@ -135,7 +135,7 @@ extends Broadcast[T] with Logging with Serializable { } } - private def initializeSlaveVariables: Unit = { + private def initializeSlaveVariables() { arrayOfBlocks = null totalBytes = -1 totalBlocks = -1 @@ -218,7 +218,7 @@ extends Broadcast[T] with Logging with Serializable { // ServeMultipleRequests thread while (listenPort == -1) { listenPortLock.synchronized { - listenPortLock.wait + listenPortLock.wait() } } @@ -251,7 +251,7 @@ extends Broadcast[T] with Logging with Serializable { totalBlocks = sourceInfo.totalBlocks arrayOfBlocks = new Array[BroadcastBlock](totalBlocks) totalBlocksLock.synchronized { - totalBlocksLock.notifyAll + totalBlocksLock.notifyAll() } totalBytes = sourceInfo.totalBytes @@ -322,7 +322,7 @@ extends Broadcast[T] with Logging with Serializable { // Set to true if at least one block is received receptionSucceeded = true hasBlocksLock.synchronized { - hasBlocksLock.notifyAll + hasBlocksLock.notifyAll() } } } catch { @@ -349,7 +349,7 @@ extends Broadcast[T] with Logging with Serializable { // Keep track of sources that have completed reception private var setOfCompletedSources = Set[SourceInfo]() - override def run: Unit = { + override def run() { var threadPool = Utils.newDaemonCachedThreadPool() var serverSocket: ServerSocket = null @@ -358,7 +358,7 @@ extends Broadcast[T] with Logging with Serializable { logInfo("GuideMultipleRequests => " + serverSocket + " " + guidePort) guidePortLock.synchronized { - guidePortLock.notifyAll + guidePortLock.notifyAll() } try { @@ -407,7 +407,7 @@ extends Broadcast[T] with Logging with Serializable { threadPool.shutdown() } - private def sendStopBroadcastNotifications: Unit = { + private def sendStopBroadcastNotifications() { pqOfSources.synchronized { var pqIter = pqOfSources.iterator while (pqIter.hasNext) { @@ -459,7 +459,7 @@ extends Broadcast[T] with Logging with Serializable { private var selectedSourceInfo: SourceInfo = null private var thisWorkerInfo:SourceInfo = null - override def run: Unit = { + override def run() { try { logInfo("new GuideSingleRequest is running") // Connecting worker is sending in its hostAddress and listenPort it will @@ -556,7 +556,7 @@ extends Broadcast[T] with Logging with Serializable { class ServeMultipleRequests extends Thread with Logging { - override def run: Unit = { + override def run() { var threadPool = Utils.newDaemonCachedThreadPool() var serverSocket: ServerSocket = null @@ -565,7 +565,7 @@ extends Broadcast[T] with Logging with Serializable { logInfo("ServeMultipleRequests started with " + serverSocket) listenPortLock.synchronized { - listenPortLock.notifyAll + listenPortLock.notifyAll() } try { @@ -609,7 +609,7 @@ extends Broadcast[T] with Logging with Serializable { private var sendFrom = 0 private var sendUntil = totalBlocks - override def run: Unit = { + override def run() { try { logInfo("new ServeSingleRequest is running") @@ -639,18 +639,18 @@ extends Broadcast[T] with Logging with Serializable { } } - private def sendObject: Unit = { + private def sendObject() { // Wait till receiving the SourceInfo from Master while (totalBlocks == -1) { totalBlocksLock.synchronized { - totalBlocksLock.wait + totalBlocksLock.wait() } } for (i <- sendFrom until sendUntil) { while (i == hasBlocks) { hasBlocksLock.synchronized { - hasBlocksLock.wait + hasBlocksLock.wait() } } try { @@ -670,9 +670,12 @@ extends Broadcast[T] with Logging with Serializable { class ChainedBroadcastFactory extends BroadcastFactory { - def initialize(isMaster: Boolean) = ChainedBroadcast.initialize(isMaster) - def newBroadcast[T](value_ : T, isLocal: Boolean) = + def initialize(isMaster: Boolean) { + ChainedBroadcast.initialize(isMaster) + } + def newBroadcast[T](value_ : T, isLocal: Boolean) = { new ChainedBroadcast[T](value_, isLocal) + } } private object ChainedBroadcast @@ -689,7 +692,7 @@ extends Logging { private var trackMV: TrackMultipleValues = null - def initialize(isMaster__ : Boolean): Unit = { + def initialize(isMaster__ : Boolean) { synchronized { if (!initialized) { isMaster_ = isMaster__ @@ -713,14 +716,14 @@ extends Logging { def isMaster = isMaster_ - def registerValue(uuid: UUID, guidePort: Int): Unit = { + def registerValue(uuid: UUID, guidePort: Int) { valueToGuidePortMap.synchronized { valueToGuidePortMap +=(uuid -> guidePort) logInfo("New value registered with the Tracker " + valueToGuidePortMap) } } - def unregisterValue(uuid: UUID): Unit = { + def unregisterValue(uuid: UUID) { valueToGuidePortMap.synchronized { valueToGuidePortMap(uuid) = SourceInfo.TxOverGoToHDFS logInfo("Value unregistered from the Tracker " + valueToGuidePortMap) @@ -729,7 +732,7 @@ extends Logging { class TrackMultipleValues extends Thread with Logging { - override def run: Unit = { + override def run() { var threadPool = Utils.newDaemonCachedThreadPool() var serverSocket: ServerSocket = null @@ -751,7 +754,7 @@ extends Logging { if (clientSocket != null) { try { threadPool.execute(new Thread { - override def run: Unit = { + override def run() { val oos = new ObjectOutputStream(clientSocket.getOutputStream) oos.flush() val ois = new ObjectInputStream(clientSocket.getInputStream) diff --git a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala b/core/src/main/scala/spark/broadcast/DfsBroadcast.scala index b053e2b62ecd0caa6275f4c27f942c1a9f65b92d..d18dfb8963e3f10188ddec8b424a94a504eafdac 100644 --- a/core/src/main/scala/spark/broadcast/DfsBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/DfsBroadcast.scala @@ -24,15 +24,15 @@ extends Broadcast[T] with Logging with Serializable { sendBroadcast } - def sendBroadcast (): Unit = { + def sendBroadcast () { val out = new ObjectOutputStream (DfsBroadcast.openFileForWriting(uuid)) out.writeObject (value_) - out.close + out.close() } // Called by JVM when deserializing an object - private def readObject(in: ObjectInputStream): Unit = { - in.defaultReadObject + private def readObject(in: ObjectInputStream) { + in.defaultReadObject() DfsBroadcast.synchronized { val cachedVal = DfsBroadcast.values.get(uuid, 0) if (cachedVal != null) { @@ -44,7 +44,7 @@ extends Broadcast[T] with Logging with Serializable { val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid)) value_ = fileIn.readObject.asInstanceOf[T] DfsBroadcast.values.put(uuid, 0, value_) - fileIn.close + fileIn.close() val time = (System.nanoTime - start) / 1e9 logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s") @@ -55,7 +55,9 @@ extends Broadcast[T] with Logging with Serializable { class DfsBroadcastFactory extends BroadcastFactory { - def initialize (isMaster: Boolean) = DfsBroadcast.initialize + def initialize (isMaster: Boolean) { + DfsBroadcast.initialize + } def newBroadcast[T] (value_ : T, isLocal: Boolean) = new DfsBroadcast[T] (value_, isLocal) } @@ -71,7 +73,7 @@ extends Logging { private var compress: Boolean = false private var bufferSize: Int = 65536 - def initialize (): Unit = { + def initialize() { synchronized { if (!initialized) { bufferSize = System.getProperty("spark.buffer.size", "65536").toInt diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index d0853eadf96f445ce7f85a9f18f2fdbf95874400..6e3dde76bd01883d7ca128bd52a2a1ec30eaf158 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -25,7 +25,7 @@ extends Broadcast[T] with Logging with Serializable { } // Called by JVM when deserializing an object - private def readObject(in: ObjectInputStream): Unit = { + private def readObject(in: ObjectInputStream) { in.defaultReadObject() HttpBroadcast.synchronized { val cachedVal = HttpBroadcast.values.get(uuid, 0) @@ -44,7 +44,9 @@ extends Broadcast[T] with Logging with Serializable { } class HttpBroadcastFactory extends BroadcastFactory { - def initialize(isMaster: Boolean): Unit = HttpBroadcast.initialize(isMaster) + def initialize(isMaster: Boolean) { + HttpBroadcast.initialize(isMaster) + } def newBroadcast[T](value_ : T, isLocal: Boolean) = new HttpBroadcast[T](value_, isLocal) } @@ -59,7 +61,7 @@ private object HttpBroadcast extends Logging { private var serverUri: String = null private var server: HttpServer = null - def initialize(isMaster: Boolean): Unit = { + def initialize(isMaster: Boolean) { synchronized { if (!initialized) { bufferSize = System.getProperty("spark.buffer.size", "65536").toInt diff --git a/core/src/main/scala/spark/broadcast/SourceInfo.scala b/core/src/main/scala/spark/broadcast/SourceInfo.scala index 03f928953d6f7627a0ad056310b4f953543b57cf..09907f4ee74fe1488b0e0f98399cfa9134aace3e 100644 --- a/core/src/main/scala/spark/broadcast/SourceInfo.scala +++ b/core/src/main/scala/spark/broadcast/SourceInfo.scala @@ -10,11 +10,11 @@ import spark._ * CHANGED: Keep track of the blockSize for THIS broadcast variable. * Broadcast.BlockSize is expected to be updated across different broadcasts */ -case class SourceInfo (val hostAddress: String, - val listenPort: Int, - val totalBlocks: Int = SourceInfo.UnusedParam, - val totalBytes: Int = SourceInfo.UnusedParam, - val blockSize: Int = Broadcast.BlockSize) +case class SourceInfo (hostAddress: String, + listenPort: Int, + totalBlocks: Int = SourceInfo.UnusedParam, + totalBytes: Int = SourceInfo.UnusedParam, + blockSize: Int = Broadcast.BlockSize) extends Comparable[SourceInfo] with Logging { var currentLeechers = 0 diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index 374389def5eb62bc2f2f680f6b1331ad12335c0a..f5527b6ec96021c7927924def32599654004ce39 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -47,7 +47,7 @@ extends Broadcast[T] with Logging with Serializable { sendBroadcast } - def sendBroadcast(): Unit = { + def sendBroadcast() { logInfo("Local host address: " + hostAddress) // Store a persistent copy in HDFS @@ -70,25 +70,25 @@ extends Broadcast[T] with Logging with Serializable { guideMR = new GuideMultipleRequests guideMR.setDaemon(true) - guideMR.start + guideMR.start() logInfo("GuideMultipleRequests started...") // Must always come AFTER guideMR is created while (guidePort == -1) { guidePortLock.synchronized { - guidePortLock.wait + guidePortLock.wait() } } serveMR = new ServeMultipleRequests serveMR.setDaemon(true) - serveMR.start + serveMR.start() logInfo("ServeMultipleRequests started...") // Must always come AFTER serveMR is created while (listenPort == -1) { listenPortLock.synchronized { - listenPortLock.wait + listenPortLock.wait() } } @@ -101,8 +101,8 @@ extends Broadcast[T] with Logging with Serializable { TreeBroadcast.registerValue(uuid, guidePort) } - private def readObject(in: ObjectInputStream): Unit = { - in.defaultReadObject + private def readObject(in: ObjectInputStream) { + in.defaultReadObject() TreeBroadcast.synchronized { val cachedVal = TreeBroadcast.values.get(uuid, 0) if (cachedVal != null) { @@ -115,7 +115,7 @@ extends Broadcast[T] with Logging with Serializable { serveMR = new ServeMultipleRequests serveMR.setDaemon(true) - serveMR.start + serveMR.start() logInfo("ServeMultipleRequests started...") val start = System.nanoTime @@ -138,7 +138,7 @@ extends Broadcast[T] with Logging with Serializable { } } - private def initializeSlaveVariables: Unit = { + private def initializeSlaveVariables() { arrayOfBlocks = null totalBytes = -1 totalBlocks = -1 @@ -221,7 +221,7 @@ extends Broadcast[T] with Logging with Serializable { // ServeMultipleRequests thread while (listenPort == -1) { listenPortLock.synchronized { - listenPortLock.wait + listenPortLock.wait() } } @@ -254,7 +254,7 @@ extends Broadcast[T] with Logging with Serializable { totalBlocks = sourceInfo.totalBlocks arrayOfBlocks = new Array[BroadcastBlock](totalBlocks) totalBlocksLock.synchronized { - totalBlocksLock.notifyAll + totalBlocksLock.notifyAll() } totalBytes = sourceInfo.totalBytes blockSize = sourceInfo.blockSize @@ -326,7 +326,7 @@ extends Broadcast[T] with Logging with Serializable { // Set to true if at least one block is received receptionSucceeded = true hasBlocksLock.synchronized { - hasBlocksLock.notifyAll + hasBlocksLock.notifyAll() } } } catch { @@ -353,7 +353,7 @@ extends Broadcast[T] with Logging with Serializable { // Keep track of sources that have completed reception private var setOfCompletedSources = Set[SourceInfo]() - override def run: Unit = { + override def run() { var threadPool = Utils.newDaemonCachedThreadPool() var serverSocket: ServerSocket = null @@ -362,7 +362,7 @@ extends Broadcast[T] with Logging with Serializable { logInfo("GuideMultipleRequests => " + serverSocket + " " + guidePort) guidePortLock.synchronized { - guidePortLock.notifyAll + guidePortLock.notifyAll() } try { @@ -408,10 +408,10 @@ extends Broadcast[T] with Logging with Serializable { } // Shutdown the thread pool - threadPool.shutdown + threadPool.shutdown() } - private def sendStopBroadcastNotifications: Unit = { + private def sendStopBroadcastNotifications() { listOfSources.synchronized { var listIter = listOfSources.iterator while (listIter.hasNext) { @@ -463,7 +463,7 @@ extends Broadcast[T] with Logging with Serializable { private var selectedSourceInfo: SourceInfo = null private var thisWorkerInfo:SourceInfo = null - override def run: Unit = { + override def run() { try { logInfo("new GuideSingleRequest is running") // Connecting worker is sending in its hostAddress and listenPort it will @@ -569,7 +569,7 @@ extends Broadcast[T] with Logging with Serializable { class ServeMultipleRequests extends Thread with Logging { - override def run: Unit = { + override def run() { var threadPool = Utils.newDaemonCachedThreadPool() var serverSocket: ServerSocket = null @@ -578,7 +578,7 @@ extends Broadcast[T] with Logging with Serializable { logInfo("ServeMultipleRequests started with " + serverSocket) listenPortLock.synchronized { - listenPortLock.notifyAll + listenPortLock.notifyAll() } try { @@ -610,7 +610,7 @@ extends Broadcast[T] with Logging with Serializable { } // Shutdown the thread pool - threadPool.shutdown + threadPool.shutdown() } class ServeSingleRequest(val clientSocket: Socket) @@ -622,7 +622,7 @@ extends Broadcast[T] with Logging with Serializable { private var sendFrom = 0 private var sendUntil = totalBlocks - override def run: Unit = { + override def run() { try { logInfo("new ServeSingleRequest is running") @@ -652,18 +652,18 @@ extends Broadcast[T] with Logging with Serializable { } } - private def sendObject: Unit = { + private def sendObject() { // Wait till receiving the SourceInfo from Master while (totalBlocks == -1) { totalBlocksLock.synchronized { - totalBlocksLock.wait + totalBlocksLock.wait() } } for (i <- sendFrom until sendUntil) { while (i == hasBlocks) { hasBlocksLock.synchronized { - hasBlocksLock.wait + hasBlocksLock.wait() } } try { @@ -704,7 +704,7 @@ extends Logging { private var MaxDegree_ : Int = 2 - def initialize(isMaster__ : Boolean): Unit = { + def initialize(isMaster__ : Boolean) { synchronized { if (!initialized) { isMaster_ = isMaster__ @@ -712,7 +712,7 @@ extends Logging { if (isMaster) { trackMV = new TrackMultipleValues trackMV.setDaemon(true) - trackMV.start + trackMV.start() // TODO: Logging the following line makes the Spark framework ID not // getting logged, cause it calls logInfo before log4j is initialized logInfo("TrackMultipleValues started...") @@ -728,14 +728,14 @@ extends Logging { def isMaster = isMaster_ - def registerValue(uuid: UUID, guidePort: Int): Unit = { + def registerValue(uuid: UUID, guidePort: Int) { valueToGuidePortMap.synchronized { valueToGuidePortMap += (uuid -> guidePort) logInfo("New value registered with the Tracker " + valueToGuidePortMap) } } - def unregisterValue(uuid: UUID): Unit = { + def unregisterValue(uuid: UUID) { valueToGuidePortMap.synchronized { valueToGuidePortMap(uuid) = SourceInfo.TxOverGoToHDFS logInfo("Value unregistered from the Tracker " + valueToGuidePortMap) @@ -744,7 +744,7 @@ extends Logging { class TrackMultipleValues extends Thread with Logging { - override def run: Unit = { + override def run() { var threadPool = Utils.newDaemonCachedThreadPool() var serverSocket: ServerSocket = null @@ -766,7 +766,7 @@ extends Logging { if (clientSocket != null) { try { threadPool.execute(new Thread { - override def run: Unit = { + override def run() { val oos = new ObjectOutputStream(clientSocket.getOutputStream) oos.flush() val ois = new ObjectInputStream(clientSocket.getInputStream) @@ -800,7 +800,7 @@ extends Logging { } // Shutdown the thread pool - threadPool.shutdown + threadPool.shutdown() } } } diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala index 4546dfa0fac1b6c7f07d708a42abac2f4cedbdaa..451faee66e8bb519f7b0cc7dac062e68658d8455 100644 --- a/core/src/main/scala/spark/network/Connection.scala +++ b/core/src/main/scala/spark/network/Connection.scala @@ -107,7 +107,7 @@ extends Connection(SocketChannel.open, selector_) { val defaultChunkSize = 65536 //32768 //16384 var nextMessageToBeUsed = 0 - def addMessage(message: Message): Unit = { + def addMessage(message: Message) { messages.synchronized{ /*messages += message*/ messages.enqueue(message) diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index a5a707a57def3f50917ff0c44f85ac52c10c04b2..f680a6419bd76190702fc8593c17cb62989dcddb 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -17,7 +17,7 @@ import java.nio.channels.spi._ import java.net._ import java.util.concurrent.Executors -case class ConnectionManagerId(val host: String, val port: Int) { +case class ConnectionManagerId(host: String, port: Int) { def toSocketAddress() = new InetSocketAddress(host, port) } @@ -112,7 +112,7 @@ class ConnectionManager(port: Int) extends Logging { val selectedKeys = selector.selectedKeys().iterator() while (selectedKeys.hasNext()) { - val key = selectedKeys.next.asInstanceOf[SelectionKey] + val key = selectedKeys.next selectedKeys.remove() if (key.isValid) { if (key.isAcceptable) { @@ -173,7 +173,7 @@ class ConnectionManager(port: Int) extends Logging { status.synchronized { status.attempted = true status.acked = false - status.notifyAll + status.notifyAll() } }) @@ -204,7 +204,7 @@ class ConnectionManager(port: Int) extends Logging { status.synchronized { status.attempted = true status.acked = false - status.notifyAll + status.notifyAll() } }) @@ -260,7 +260,7 @@ class ConnectionManager(port: Int) extends Logging { sentMessageStatus.ackMessage = Some(message) sentMessageStatus.attempted = true sentMessageStatus.acked = true - sentMessageStatus.notifyAll + sentMessageStatus.notifyAll() } } else { val ackMessage = if (onReceiveCallback != null) { diff --git a/core/src/main/scala/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/spark/partial/ApproximateActionListener.scala index 260547902bb4a743e7a48ec1fb2d5a8b3b56da9c..e6535836ab3c4ae11119cc718f8826cfe91dd0ff 100644 --- a/core/src/main/scala/spark/partial/ApproximateActionListener.scala +++ b/core/src/main/scala/spark/partial/ApproximateActionListener.scala @@ -25,20 +25,24 @@ class ApproximateActionListener[T, U, R]( var failure: Option[Exception] = None // Set if the job has failed (permanently) var resultObject: Option[PartialResult[R]] = None // Set if we've already returned a PartialResult - override def taskSucceeded(index: Int, result: Any): Unit = synchronized { - evaluator.merge(index, result.asInstanceOf[U]) - finishedTasks += 1 - if (finishedTasks == totalTasks) { - // If we had already returned a PartialResult, set its final value - resultObject.foreach(r => r.setFinalValue(evaluator.currentResult())) - // Notify any waiting thread that may have called getResult - this.notifyAll() + override def taskSucceeded(index: Int, result: Any) { + synchronized { + evaluator.merge(index, result.asInstanceOf[U]) + finishedTasks += 1 + if (finishedTasks == totalTasks) { + // If we had already returned a PartialResult, set its final value + resultObject.foreach(r => r.setFinalValue(evaluator.currentResult())) + // Notify any waiting thread that may have called getResult + this.notifyAll() + } } } - override def jobFailed(exception: Exception): Unit = synchronized { - failure = Some(exception) - this.notifyAll() + override def jobFailed(exception: Exception) { + synchronized { + failure = Some(exception) + this.notifyAll() + } } /** diff --git a/core/src/main/scala/spark/partial/PartialResult.scala b/core/src/main/scala/spark/partial/PartialResult.scala index 7095bc8ca1bbf4d134a3ce01b3cd1826e3a93722..e7d2d4e8ccea18c5a661fbe9c38b1a157fb2c2e2 100644 --- a/core/src/main/scala/spark/partial/PartialResult.scala +++ b/core/src/main/scala/spark/partial/PartialResult.scala @@ -44,37 +44,43 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) { * Set a handler to be called if this PartialResult's job fails. Only one failure handler * is supported per PartialResult. */ - def onFail(handler: Exception => Unit): Unit = synchronized { - if (failureHandler != None) { - throw new UnsupportedOperationException("onFail cannot be called twice") - } - failureHandler = Some(handler) - if (failure != None) { - // We already have a failure, so let's call the handler - handler(failure.get) + def onFail(handler: Exception => Unit) { + synchronized { + if (failureHandler != None) { + throw new UnsupportedOperationException("onFail cannot be called twice") + } + failureHandler = Some(handler) + if (failure != None) { + // We already have a failure, so let's call the handler + handler(failure.get) + } } } - private[spark] def setFinalValue(value: R): Unit = synchronized { - if (finalValue != None) { - throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult") + private[spark] def setFinalValue(value: R) { + synchronized { + if (finalValue != None) { + throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult") + } + finalValue = Some(value) + // Call the completion handler if it was set + completionHandler.foreach(h => h(value)) + // Notify any threads that may be calling getFinalValue() + this.notifyAll() } - finalValue = Some(value) - // Call the completion handler if it was set - completionHandler.foreach(h => h(value)) - // Notify any threads that may be calling getFinalValue() - this.notifyAll() } - private[spark] def setFailure(exception: Exception): Unit = synchronized { - if (failure != None) { - throw new UnsupportedOperationException("setFailure called twice on a PartialResult") + private[spark] def setFailure(exception: Exception) { + synchronized { + if (failure != None) { + throw new UnsupportedOperationException("setFailure called twice on a PartialResult") + } + failure = Some(exception) + // Call the failure handler if it was set + failureHandler.foreach(h => h(exception)) + // Notify any threads that may be calling getFinalValue() + this.notifyAll() } - failure = Some(exception) - // Call the failure handler if it was set - failureHandler.foreach(h => h(exception)) - // Notify any threads that may be calling getFinalValue() - this.notifyAll() } override def toString: String = synchronized { diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index fc8adbc517c0ddfb79504b4b700e76f366f905bf..436c16cdddebd9cad11fc1aa97ffbfe94cf8fd2c 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -529,7 +529,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with for (inPart <- n.getParents(partition)) { val locs = getPreferredLocs(n.rdd, inPart) if (locs != Nil) - return locs; + return locs } case _ => }) diff --git a/core/src/main/scala/spark/scheduler/JobWaiter.scala b/core/src/main/scala/spark/scheduler/JobWaiter.scala index be8ec9bd7b07e9d8ac8e986ae9a20b575b9bbd0c..4c2ae23051a2aa39cce32de3ca2563957d66a776 100644 --- a/core/src/main/scala/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/spark/scheduler/JobWaiter.scala @@ -12,26 +12,30 @@ class JobWaiter(totalTasks: Int) extends JobListener { private var jobFinished = false // Is the job as a whole finished (succeeded or failed)? private var jobResult: JobResult = null // If the job is finished, this will be its result - override def taskSucceeded(index: Int, result: Any) = synchronized { - if (jobFinished) { - throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") - } - taskResults(index) = result - finishedTasks += 1 - if (finishedTasks == totalTasks) { - jobFinished = true - jobResult = JobSucceeded(taskResults) - this.notifyAll() + override def taskSucceeded(index: Int, result: Any) { + synchronized { + if (jobFinished) { + throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") + } + taskResults(index) = result + finishedTasks += 1 + if (finishedTasks == totalTasks) { + jobFinished = true + jobResult = JobSucceeded(taskResults) + this.notifyAll() + } } } - override def jobFailed(exception: Exception) = synchronized { - if (jobFinished) { - throw new UnsupportedOperationException("jobFailed() called on a finished JobWaiter") + override def jobFailed(exception: Exception) { + synchronized { + if (jobFinished) { + throw new UnsupportedOperationException("jobFailed() called on a finished JobWaiter") + } + jobFinished = true + jobResult = JobFailed(exception) + this.notifyAll() } - jobFinished = true - jobResult = JobFailed(exception) - this.notifyAll() } def getResult(): JobResult = synchronized { diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 79cca0f294593154d1d667debb261db4ad836974..8c0e06f0209a1b539b660d3af3ae8b3441b1fc74 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -42,7 +42,7 @@ object ShuffleMapTask { if (old != null) { return old } else { - val loader = currentThread.getContextClassLoader + val loader = Thread.currentThread.getContextClassLoader val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) val objIn = new ObjectInputStream(in) { override def resolveClass(desc: ObjectStreamClass) = @@ -107,7 +107,7 @@ class ShuffleMapTask( override def run(attemptId: Int): BlockManagerId = { val numOutputSplits = dep.partitioner.numPartitions val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]] - val partitioner = dep.partitioner.asInstanceOf[Partitioner] + val partitioner = dep.partitioner val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any]) for (elem <- rdd.iterator(split)) { val (k, v) = elem.asInstanceOf[(Any, Any)] diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala index 95ad6c5b59969ed8176ec70e165063e8ff506f7d..2eee36264acd2663b6cdc30cd4961f965bb39062 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala @@ -210,7 +210,7 @@ class CoarseMesosScheduler( } // Also report the loss to the DAGScheduler listener.hostLost(failedHost.get) - reviveOffers(); + reviveOffers() } } @@ -283,9 +283,9 @@ class CoarseMesosScheduler( class WorkerTask(slaveId: String, host: String) extends Task[Unit](-1) { generation = 0 - def run(id: Int): Unit = { + def run(id: Int) { val env = SparkEnv.get - val classLoader = currentThread.getContextClassLoader + val classLoader = Thread.currentThread.getContextClassLoader val actor = env.actorSystem.actorOf( Props(new WorkerActor(slaveId, host, env, classLoader)), name = "WorkerActor") @@ -309,7 +309,7 @@ class WorkerActor(slaveId: String, host: String, env: SparkEnv, classLoader: Cla class TaskRunner(desc: MTaskInfo) extends Runnable { - override def run() = { + override def run() { val tid = desc.getTaskId.getValue logInfo("Running task ID " + tid) try { @@ -360,7 +360,7 @@ class WorkerActor(slaveId: String, host: String, env: SparkEnv, classLoader: Cla } override def receive = { - case LaunchTask(slaveId, task) => + case LaunchTask(slaveId_, task) => threadPool.execute(new TaskRunner(task)) } } diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala index f72618c03fc8a1b996f32c86678b19de6ecf31cd..8e34537674ccf2c310856b1136966a8b80fb555f 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala @@ -107,7 +107,7 @@ class MesosScheduler( override def start() { new Thread("MesosScheduler driver") { setDaemon(true) - override def run { + override def run() { val sched = MesosScheduler.this val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build() driver = new MesosSchedulerDriver(sched, fwInfo, master) @@ -122,7 +122,7 @@ class MesosScheduler( if (System.getProperty("spark.speculation", "false") == "true") { new Thread("MesosScheduler speculation check") { setDaemon(true) - override def run { + override def run() { waitForRegister() while (true) { try { @@ -184,7 +184,7 @@ class MesosScheduler( activeTaskSetsQueue += manager taskSetTaskIds(taskSet.id) = new HashSet() } - reviveOffers(); + reviveOffers() } def taskSetFinished(manager: TaskSetManager) { @@ -331,7 +331,7 @@ class MesosScheduler( } if (failedHost != None) { listener.hostLost(failedHost.get) - reviveOffers(); + reviveOffers() } if (taskFailed) { // Also revive offers if a task had failed for some reason other than host lost @@ -439,7 +439,7 @@ class MesosScheduler( } if (failedHost != None) { listener.hostLost(failedHost.get) - reviveOffers(); + reviveOffers() } } diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala index 3a8574a815aa86a1c98993a704c12bcd287b4de8..501183ab1fc92e2f8bdd86c703dadd07b8a04990 100644 --- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala @@ -82,7 +82,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging { val block = blockManager.getLocal(id) val buffer = block match { case Some(tValues) => { - val values = tValues.asInstanceOf[Iterator[Any]] + val values = tValues val buffer = blockManager.dataSerialize(values) buffer } diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala index 75df4bee0948f9f7d4c4e9141873c025d8d582a3..816411debebb6280ad37f77af19daf12e2439e85 100644 --- a/core/src/test/scala/spark/FailureSuite.scala +++ b/core/src/test/scala/spark/FailureSuite.scala @@ -14,9 +14,11 @@ object FailureSuiteState { var tasksRun = 0 var tasksFailed = 0 - def clear(): Unit = synchronized { - tasksRun = 0 - tasksFailed = 0 + def clear() { + synchronized { + tasksRun = 0 + tasksFailed = 0 + } } } diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala index 078071209ae9de0e0673e9862a28b8422cec7116..06d446ea246724675ab79cc3b33deb2952606fb7 100644 --- a/core/src/test/scala/spark/KryoSerializerSuite.scala +++ b/core/src/test/scala/spark/KryoSerializerSuite.scala @@ -11,8 +11,9 @@ import SparkContext._ class KryoSerializerSuite extends FunSuite { test("basic types") { val ser = (new KryoSerializer).newInstance() - def check[T](t: T): Unit = + def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) + } check(1) check(1L) check(1.0f) @@ -39,8 +40,9 @@ class KryoSerializerSuite extends FunSuite { test("pairs") { val ser = (new KryoSerializer).newInstance() - def check[T](t: T): Unit = + def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) + } check((1, 1)) check((1, 1L)) check((1L, 1)) @@ -62,8 +64,9 @@ class KryoSerializerSuite extends FunSuite { test("Scala data structures") { val ser = (new KryoSerializer).newInstance() - def check[T](t: T): Unit = + def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) + } check(List[Int]()) check(List[Int](1, 2, 3)) check(List[String]()) @@ -86,8 +89,9 @@ class KryoSerializerSuite extends FunSuite { System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName) val ser = (new KryoSerializer).newInstance() - def check[T](t: T): Unit = + def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) + } check(CaseClass(17, "hello")) diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala index cadf01432f9a3e1b81c81173730342e2c903eef2..d38e72d8b8c7dede21cb8501f28fb9ee5427b752 100644 --- a/core/src/test/scala/spark/ThreadingSuite.scala +++ b/core/src/test/scala/spark/ThreadingSuite.scala @@ -31,7 +31,7 @@ class ThreadingSuite extends FunSuite { new Thread { override def run() { answer1 = nums.reduce(_ + _) - answer2 = nums.first // This will run "locally" in the current thread + answer2 = nums.first() // This will run "locally" in the current thread sem.release() } }.start() @@ -51,13 +51,13 @@ class ThreadingSuite extends FunSuite { override def run() { val answer1 = nums.reduce(_ + _) if (answer1 != 55) { - printf("In thread %d: answer1 was %d\n", i, answer1); - ok = false; + printf("In thread %d: answer1 was %d\n", i, answer1) + ok = false } - val answer2 = nums.first // This will run "locally" in the current thread + val answer2 = nums.first() // This will run "locally" in the current thread if (answer2 != 1) { - printf("In thread %d: answer2 was %d\n", i, answer2); - ok = false; + printf("In thread %d: answer2 was %d\n", i, answer2) + ok = false } sem.release() } @@ -80,13 +80,13 @@ class ThreadingSuite extends FunSuite { override def run() { val answer1 = nums.reduce(_ + _) if (answer1 != 55) { - printf("In thread %d: answer1 was %d\n", i, answer1); - ok = false; + printf("In thread %d: answer1 was %d\n", i, answer1) + ok = false } - val answer2 = nums.first // This will run "locally" in the current thread + val answer2 = nums.first() // This will run "locally" in the current thread if (answer2 != 1) { - printf("In thread %d: answer2 was %d\n", i, answer2); - ok = false; + printf("In thread %d: answer2 was %d\n", i, answer2) + ok = false } sem.release() } diff --git a/project/plugins.sbt b/project/plugins.sbt index 313a877ed24d204130e196554c6c380c23bf7cd5..0e2b6d49028e92cd52bba3371670366dfa7286f9 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -5,3 +5,5 @@ resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline. addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.3") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0-RC1") + +addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0")