diff --git a/conf/java-opts b/conf/java-opts deleted file mode 100644 index 1a598061f073d7edf28dae8a573559159471a800..0000000000000000000000000000000000000000 --- a/conf/java-opts +++ /dev/null @@ -1,14 +0,0 @@ --Dspark.shuffle.class=spark.CustomBlockedInMemoryShuffle --Dspark.shuffle.masterHostAddress=127.0.0.1 --Dspark.shuffle.masterTrackerPort=22222 --Dspark.shuffle.trackerStrategy=spark.BalanceRemainingShuffleTrackerStrategy --Dspark.shuffle.maxRxConnections=40 --Dspark.shuffle.maxTxConnections=120 --Dspark.shuffle.blockSize=4096 --Dspark.shuffle.minKnockInterval=100 --Dspark.shuffle.maxKnockInterval=5000 --Dspark.shuffle.maxChatTime=500 --Dspark.shuffle.throttleFraction=2.0 --verbose:gc --XX:+PrintGCTimeStamps --XX:+PrintGCDetails diff --git a/conf/log4j.properties b/conf/log4j.properties deleted file mode 100644 index 33774b463d9ecceab45e1754b5fddf646e6c5f53..0000000000000000000000000000000000000000 --- a/conf/log4j.properties +++ /dev/null @@ -1,8 +0,0 @@ -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.eclipse.jetty=WARN diff --git a/conf/spark-env.sh b/conf/spark-env.sh deleted file mode 100755 index 5f6c8269e826ebcb117caac348570e4474b07ce4..0000000000000000000000000000000000000000 --- a/conf/spark-env.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env bash - -# Set Spark environment variables for your site in this file. Some useful -# variables to set are: -# - MESOS_HOME, to point to your Mesos installation -# - SCALA_HOME, to point to your Scala installation -# - SPARK_CLASSPATH, to add elements to Spark's classpath -# - SPARK_JAVA_OPTS, to add JVM options -# - SPARK_MEM, to change the amount of memory used per node (this should -# be in the same format as the JVM's -Xmx option, e.g. 300m or 1g). -# - SPARK_LIBRARY_PATH, to add extra search paths for native libraries. - -MESOS_HOME=/Users/mosharaf/Work/mesos diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index 7e3911aef1ede6e06f3af394d42c31f95258bb3f..577e9371bf217146ba9abdb3fdf8c1df7ad3c07c 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -7,6 +7,7 @@ import java.util.concurrent.{Executors, ExecutorService} import scala.collection.mutable.ArrayBuffer import spark.broadcast._ +import spark.shuffle._ import mesos.{ExecutorArgs, ExecutorDriver, MesosExecutorDriver} import mesos.{TaskDescription, TaskState, TaskStatus} diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 39f2dc44581ef68f06f283a3f5f8f7eee3e6952d..30929ed08980e1df5cc877d9ed946601954897fe 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -12,6 +12,7 @@ import SparkContext._ import mesos._ +import spark.shuffle._ @serializable abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { @@ -359,7 +360,7 @@ extends RDD[Pair[T, U]](sc) { : RDD[(K, C)] = { val shufClass = Class.forName(System.getProperty( - "spark.shuffle.class", "spark.LocalFileShuffle")) + "spark.shuffle.class", "spark.BasicLocalFileShuffle")) val shuf = shufClass.newInstance().asInstanceOf[Shuffle[K, V, C]] shuf.compute(self, numSplits, createCombiner, mergeValue, mergeCombiners) } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 0ffe44f91d0e45a4dbc13c2878244b9bc91c4781..082e9bbe4bcbd95cecf46e46d26705d6ce4aeec4 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -5,6 +5,7 @@ import java.io._ import scala.collection.mutable.ArrayBuffer import spark.broadcast._ +import spark.shuffle._ import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.SequenceFileInputFormat diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index efc3c50b1373f9c3090c0fb29b34633c48d528c4..e9c7639f12be156d02edcf10d83cd75af4d71b53 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -35,7 +35,7 @@ extends Logging { def initialize (isMaster__ : Boolean): Unit = synchronized { if (!initialized) { val broadcastFactoryClass = System.getProperty("spark.broadcast.factory", - "spark.DfsBroadcastFactory") + "spark.broadcast.DfsBroadcastFactory") broadcastFactory = Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory] diff --git a/core/src/main/scala/spark/BasicLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/BasicLocalFileShuffle.scala similarity index 99% rename from core/src/main/scala/spark/BasicLocalFileShuffle.scala rename to core/src/main/scala/spark/shuffle/BasicLocalFileShuffle.scala index 3c3f132083726d04f5e978dbcec119ccbe062a21..5e2c11e116f09280e412b959811a617fa5ace5f3 100644 --- a/core/src/main/scala/spark/BasicLocalFileShuffle.scala +++ b/core/src/main/scala/spark/shuffle/BasicLocalFileShuffle.scala @@ -1,4 +1,4 @@ -package spark +package spark.shuffle import java.io._ import java.net.URL @@ -7,6 +7,8 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{ArrayBuffer, HashMap} +import spark._ + /** * A basic implementation of shuffle using local files served through HTTP. * diff --git a/core/src/main/scala/spark/CustomBlockedLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala similarity index 99% rename from core/src/main/scala/spark/CustomBlockedLocalFileShuffle.scala rename to core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala index 98af7c8d6569dbbcfc82b975fbffb096ee2d6f62..8d513b3d86e3537dc0ce275e064f8fccc4903705 100644 --- a/core/src/main/scala/spark/CustomBlockedLocalFileShuffle.scala +++ b/core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala @@ -1,4 +1,4 @@ -package spark +package spark.shuffle import java.io._ import java.net._ @@ -7,6 +7,9 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{LinkedBlockingQueue, Executors, ThreadPoolExecutor, ThreadFactory} import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.math + +import spark._ /** * An implementation of shuffle using local files served through custom server @@ -153,7 +156,7 @@ extends Shuffle[K, V, C] with Logging { while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, Shuffle.MaxRxConnections) - + math.min(totalSplits, Shuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { diff --git a/core/src/main/scala/spark/CustomParallelLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala similarity index 99% rename from core/src/main/scala/spark/CustomParallelLocalFileShuffle.scala rename to core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala index 87e824fb2edcf354585bca62f79b78d57f831be0..f210249d39bfca7810c05ed99d7156506ff518ea 100644 --- a/core/src/main/scala/spark/CustomParallelLocalFileShuffle.scala +++ b/core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala @@ -1,4 +1,4 @@ -package spark +package spark.shuffle import java.io._ import java.net._ @@ -7,6 +7,9 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{LinkedBlockingQueue, Executors, ThreadPoolExecutor, ThreadFactory} import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.math + +import spark._ /** * An implementation of shuffle using local files served through custom server @@ -100,7 +103,7 @@ extends Shuffle[K, V, C] with Logging { Shuffle.MaxRxConnections) while (hasSplits < totalSplits) { - var numThreadsToCreate = Math.min(totalSplits, + var numThreadsToCreate = math.min(totalSplits, Shuffle.MaxRxConnections) - threadPool.getActiveCount diff --git a/core/src/main/scala/spark/DfsShuffle.scala b/core/src/main/scala/spark/shuffle/DfsShuffle.scala similarity index 99% rename from core/src/main/scala/spark/DfsShuffle.scala rename to core/src/main/scala/spark/shuffle/DfsShuffle.scala index bf91be7d2cb86f020d5097070ed5ee412f953a7b..2f079a453de49f54934a297e901154f2b4dbcfa5 100644 --- a/core/src/main/scala/spark/DfsShuffle.scala +++ b/core/src/main/scala/spark/shuffle/DfsShuffle.scala @@ -1,4 +1,4 @@ -package spark +package spark.shuffle import java.io.{EOFException, ObjectInputStream, ObjectOutputStream} import java.net.URI @@ -9,6 +9,8 @@ import scala.collection.mutable.HashMap import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} +import spark._ + /** * A simple implementation of shuffle using a distributed file system. * diff --git a/core/src/main/scala/spark/HttpParallelLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala similarity index 99% rename from core/src/main/scala/spark/HttpParallelLocalFileShuffle.scala rename to core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala index 8e7b897668eaedd4d8f2250f63edd04e17d555c5..174cfd993faf0ba7169f2e5aeec6a07bc3ac2fcb 100644 --- a/core/src/main/scala/spark/HttpParallelLocalFileShuffle.scala +++ b/core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala @@ -1,4 +1,4 @@ -package spark +package spark.shuffle import java.io._ import java.net._ @@ -7,6 +7,9 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{LinkedBlockingQueue, Executors, ThreadPoolExecutor, ThreadFactory} import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.math + +import spark._ /** * An implementation of shuffle using local files served through HTTP where @@ -98,7 +101,7 @@ extends Shuffle[K, V, C] with Logging { while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, Shuffle.MaxRxConnections) - + math.min(totalSplits, Shuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { diff --git a/core/src/main/scala/spark/Shuffle.scala b/core/src/main/scala/spark/shuffle/Shuffle.scala similarity index 98% rename from core/src/main/scala/spark/Shuffle.scala rename to core/src/main/scala/spark/shuffle/Shuffle.scala index f2d790f72702b07bf11e09529d3821d154b6e381..55d7d57b831f3c5697d000830146ee64f2cbfd68 100644 --- a/core/src/main/scala/spark/Shuffle.scala +++ b/core/src/main/scala/spark/shuffle/Shuffle.scala @@ -1,9 +1,11 @@ -package spark +package spark.shuffle import java.net._ import java.util.{BitSet} import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor} +import spark._ + /** * A trait for shuffle system. Given an input RDD and combiner functions * for PairRDDExtras.combineByKey(), returns an output RDD. @@ -21,7 +23,7 @@ trait Shuffle[K, V, C] { /** * An object containing common shuffle config parameters */ -private object Shuffle +object Shuffle extends Logging { // Tracker communication constants val ReducerEntering = 0