From 0567646180881a4738dbb06f3e614d8082ec46b5 Mon Sep 17 00:00:00 2001 From: Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> Date: Wed, 27 Apr 2011 21:11:41 -0700 Subject: [PATCH] Shuffle is also working from its own subpackage. --- conf/java-opts | 14 -------------- conf/log4j.properties | 8 -------- conf/spark-env.sh | 13 ------------- core/src/main/scala/spark/Executor.scala | 1 + core/src/main/scala/spark/RDD.scala | 3 ++- core/src/main/scala/spark/SparkContext.scala | 1 + .../src/main/scala/spark/broadcast/Broadcast.scala | 2 +- .../{ => shuffle}/BasicLocalFileShuffle.scala | 4 +++- .../CustomBlockedLocalFileShuffle.scala | 7 +++++-- .../CustomParallelLocalFileShuffle.scala | 7 +++++-- .../scala/spark/{ => shuffle}/DfsShuffle.scala | 4 +++- .../HttpParallelLocalFileShuffle.scala | 7 +++++-- .../main/scala/spark/{ => shuffle}/Shuffle.scala | 6 ++++-- 13 files changed, 30 insertions(+), 47 deletions(-) delete mode 100644 conf/java-opts delete mode 100644 conf/log4j.properties delete mode 100755 conf/spark-env.sh rename core/src/main/scala/spark/{ => shuffle}/BasicLocalFileShuffle.scala (99%) rename core/src/main/scala/spark/{ => shuffle}/CustomBlockedLocalFileShuffle.scala (99%) rename core/src/main/scala/spark/{ => shuffle}/CustomParallelLocalFileShuffle.scala (99%) rename core/src/main/scala/spark/{ => shuffle}/DfsShuffle.scala (99%) rename core/src/main/scala/spark/{ => shuffle}/HttpParallelLocalFileShuffle.scala (99%) rename core/src/main/scala/spark/{ => shuffle}/Shuffle.scala (98%) diff --git a/conf/java-opts b/conf/java-opts deleted file mode 100644 index 1a598061f0..0000000000 --- a/conf/java-opts +++ /dev/null @@ -1,14 +0,0 @@ --Dspark.shuffle.class=spark.CustomBlockedInMemoryShuffle --Dspark.shuffle.masterHostAddress=127.0.0.1 --Dspark.shuffle.masterTrackerPort=22222 --Dspark.shuffle.trackerStrategy=spark.BalanceRemainingShuffleTrackerStrategy --Dspark.shuffle.maxRxConnections=40 --Dspark.shuffle.maxTxConnections=120 --Dspark.shuffle.blockSize=4096 --Dspark.shuffle.minKnockInterval=100 --Dspark.shuffle.maxKnockInterval=5000 --Dspark.shuffle.maxChatTime=500 --Dspark.shuffle.throttleFraction=2.0 --verbose:gc --XX:+PrintGCTimeStamps --XX:+PrintGCDetails diff --git a/conf/log4j.properties b/conf/log4j.properties deleted file mode 100644 index 33774b463d..0000000000 --- a/conf/log4j.properties +++ /dev/null @@ -1,8 +0,0 @@ -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.eclipse.jetty=WARN diff --git a/conf/spark-env.sh b/conf/spark-env.sh deleted file mode 100755 index 5f6c8269e8..0000000000 --- a/conf/spark-env.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env bash - -# Set Spark environment variables for your site in this file. Some useful -# variables to set are: -# - MESOS_HOME, to point to your Mesos installation -# - SCALA_HOME, to point to your Scala installation -# - SPARK_CLASSPATH, to add elements to Spark's classpath -# - SPARK_JAVA_OPTS, to add JVM options -# - SPARK_MEM, to change the amount of memory used per node (this should -# be in the same format as the JVM's -Xmx option, e.g. 300m or 1g). -# - SPARK_LIBRARY_PATH, to add extra search paths for native libraries. - -MESOS_HOME=/Users/mosharaf/Work/mesos diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index 7e3911aef1..577e9371bf 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -7,6 +7,7 @@ import java.util.concurrent.{Executors, ExecutorService} import scala.collection.mutable.ArrayBuffer import spark.broadcast._ +import spark.shuffle._ import mesos.{ExecutorArgs, ExecutorDriver, MesosExecutorDriver} import mesos.{TaskDescription, TaskState, TaskStatus} diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 39f2dc4458..30929ed089 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -12,6 +12,7 @@ import SparkContext._ import mesos._ +import spark.shuffle._ @serializable abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { @@ -359,7 +360,7 @@ extends RDD[Pair[T, U]](sc) { : RDD[(K, C)] = { val shufClass = Class.forName(System.getProperty( - "spark.shuffle.class", "spark.LocalFileShuffle")) + "spark.shuffle.class", "spark.BasicLocalFileShuffle")) val shuf = shufClass.newInstance().asInstanceOf[Shuffle[K, V, C]] shuf.compute(self, numSplits, createCombiner, mergeValue, mergeCombiners) } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 0ffe44f91d..082e9bbe4b 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -5,6 +5,7 @@ import java.io._ import scala.collection.mutable.ArrayBuffer import spark.broadcast._ +import spark.shuffle._ import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.SequenceFileInputFormat diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index efc3c50b13..e9c7639f12 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -35,7 +35,7 @@ extends Logging { def initialize (isMaster__ : Boolean): Unit = synchronized { if (!initialized) { val broadcastFactoryClass = System.getProperty("spark.broadcast.factory", - "spark.DfsBroadcastFactory") + "spark.broadcast.DfsBroadcastFactory") broadcastFactory = Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory] diff --git a/core/src/main/scala/spark/BasicLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/BasicLocalFileShuffle.scala similarity index 99% rename from core/src/main/scala/spark/BasicLocalFileShuffle.scala rename to core/src/main/scala/spark/shuffle/BasicLocalFileShuffle.scala index 3c3f132083..5e2c11e116 100644 --- a/core/src/main/scala/spark/BasicLocalFileShuffle.scala +++ b/core/src/main/scala/spark/shuffle/BasicLocalFileShuffle.scala @@ -1,4 +1,4 @@ -package spark +package spark.shuffle import java.io._ import java.net.URL @@ -7,6 +7,8 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{ArrayBuffer, HashMap} +import spark._ + /** * A basic implementation of shuffle using local files served through HTTP. * diff --git a/core/src/main/scala/spark/CustomBlockedLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala similarity index 99% rename from core/src/main/scala/spark/CustomBlockedLocalFileShuffle.scala rename to core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala index 98af7c8d65..8d513b3d86 100644 --- a/core/src/main/scala/spark/CustomBlockedLocalFileShuffle.scala +++ b/core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala @@ -1,4 +1,4 @@ -package spark +package spark.shuffle import java.io._ import java.net._ @@ -7,6 +7,9 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{LinkedBlockingQueue, Executors, ThreadPoolExecutor, ThreadFactory} import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.math + +import spark._ /** * An implementation of shuffle using local files served through custom server @@ -153,7 +156,7 @@ extends Shuffle[K, V, C] with Logging { while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, Shuffle.MaxRxConnections) - + math.min(totalSplits, Shuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { diff --git a/core/src/main/scala/spark/CustomParallelLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala similarity index 99% rename from core/src/main/scala/spark/CustomParallelLocalFileShuffle.scala rename to core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala index 87e824fb2e..f210249d39 100644 --- a/core/src/main/scala/spark/CustomParallelLocalFileShuffle.scala +++ b/core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala @@ -1,4 +1,4 @@ -package spark +package spark.shuffle import java.io._ import java.net._ @@ -7,6 +7,9 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{LinkedBlockingQueue, Executors, ThreadPoolExecutor, ThreadFactory} import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.math + +import spark._ /** * An implementation of shuffle using local files served through custom server @@ -100,7 +103,7 @@ extends Shuffle[K, V, C] with Logging { Shuffle.MaxRxConnections) while (hasSplits < totalSplits) { - var numThreadsToCreate = Math.min(totalSplits, + var numThreadsToCreate = math.min(totalSplits, Shuffle.MaxRxConnections) - threadPool.getActiveCount diff --git a/core/src/main/scala/spark/DfsShuffle.scala b/core/src/main/scala/spark/shuffle/DfsShuffle.scala similarity index 99% rename from core/src/main/scala/spark/DfsShuffle.scala rename to core/src/main/scala/spark/shuffle/DfsShuffle.scala index bf91be7d2c..2f079a453d 100644 --- a/core/src/main/scala/spark/DfsShuffle.scala +++ b/core/src/main/scala/spark/shuffle/DfsShuffle.scala @@ -1,4 +1,4 @@ -package spark +package spark.shuffle import java.io.{EOFException, ObjectInputStream, ObjectOutputStream} import java.net.URI @@ -9,6 +9,8 @@ import scala.collection.mutable.HashMap import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} +import spark._ + /** * A simple implementation of shuffle using a distributed file system. * diff --git a/core/src/main/scala/spark/HttpParallelLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala similarity index 99% rename from core/src/main/scala/spark/HttpParallelLocalFileShuffle.scala rename to core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala index 8e7b897668..174cfd993f 100644 --- a/core/src/main/scala/spark/HttpParallelLocalFileShuffle.scala +++ b/core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala @@ -1,4 +1,4 @@ -package spark +package spark.shuffle import java.io._ import java.net._ @@ -7,6 +7,9 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{LinkedBlockingQueue, Executors, ThreadPoolExecutor, ThreadFactory} import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.math + +import spark._ /** * An implementation of shuffle using local files served through HTTP where @@ -98,7 +101,7 @@ extends Shuffle[K, V, C] with Logging { while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, Shuffle.MaxRxConnections) - + math.min(totalSplits, Shuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { diff --git a/core/src/main/scala/spark/Shuffle.scala b/core/src/main/scala/spark/shuffle/Shuffle.scala similarity index 98% rename from core/src/main/scala/spark/Shuffle.scala rename to core/src/main/scala/spark/shuffle/Shuffle.scala index f2d790f727..55d7d57b83 100644 --- a/core/src/main/scala/spark/Shuffle.scala +++ b/core/src/main/scala/spark/shuffle/Shuffle.scala @@ -1,9 +1,11 @@ -package spark +package spark.shuffle import java.net._ import java.util.{BitSet} import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor} +import spark._ + /** * A trait for shuffle system. Given an input RDD and combiner functions * for PairRDDExtras.combineByKey(), returns an output RDD. @@ -21,7 +23,7 @@ trait Shuffle[K, V, C] { /** * An object containing common shuffle config parameters */ -private object Shuffle +object Shuffle extends Logging { // Tracker communication constants val ReducerEntering = 0 -- GitLab