Skip to content
Snippets Groups Projects
Commit dd515ca3 authored by Mridul Muralidharan's avatar Mridul Muralidharan
Browse files

Attempt at fixing merge conflict

parents adcda84f 17e076de
No related branches found
No related tags found
No related merge requests found
Showing
with 69 additions and 46 deletions
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
<version>0.7.1-SNAPSHOT</version>
<version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
......
......@@ -7,8 +7,7 @@ import scala.collection.mutable.ArrayBuffer
import storage.StorageLevel
object Bagel extends Logging {
val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_ONLY
val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK
/**
* Runs a Bagel program.
......@@ -63,8 +62,9 @@ object Bagel extends Logging {
val combinedMsgs = msgs.combineByKey(
combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
val grouped = combinedMsgs.groupWith(verts)
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
val (processed, numMsgs, numActiveVerts) =
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep), storageLevel)
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
val timeTaken = System.currentTimeMillis - startTime
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
......
......@@ -30,7 +30,7 @@
# SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
##
usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <args...>"
usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <spark-instance-number> <args...>"
# if no args specified, show usage
if [ $# -le 1 ]; then
......@@ -48,6 +48,8 @@ startStop=$1
shift
command=$1
shift
instance=$1
shift
spark_rotate_log ()
{
......@@ -92,10 +94,10 @@ if [ "$SPARK_PID_DIR" = "" ]; then
fi
# some variables
export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.log
export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.log
export SPARK_ROOT_LOGGER="INFO,DRFA"
log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.out
pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command.pid
log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.out
pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid
# Set default scheduling priority
if [ "$SPARK_NICENESS" = "" ]; then
......
......@@ -2,7 +2,7 @@
# Run a Spark command on all slave hosts.
usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command args..."
usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command instance-number args..."
# if no args specified, show usage
if [ $# -le 1 ]; then
......
......@@ -32,4 +32,4 @@ if [ "$SPARK_PUBLIC_DNS" = "" ]; then
fi
fi
"$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
"$bin"/spark-daemon.sh start spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
......@@ -11,4 +11,4 @@ if [ "$SPARK_PUBLIC_DNS" = "" ]; then
fi
fi
"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1
"$bin"/spark-daemon.sh start spark.deploy.worker.Worker "$@"
......@@ -21,4 +21,13 @@ fi
echo "Master IP: $SPARK_MASTER_IP"
# Launch the slaves
exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" 1 spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
else
if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
SPARK_WORKER_WEBUI_PORT=8081
fi
for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
"$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" $(( $i + 1 )) spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i ))
done
fi
......@@ -7,4 +7,4 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
"$bin"/spark-daemon.sh stop spark.deploy.master.Master
\ No newline at end of file
"$bin"/spark-daemon.sh stop spark.deploy.master.Master 1
......@@ -7,4 +7,14 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
"$bin"/spark-daemons.sh stop spark.deploy.worker.Worker
\ No newline at end of file
if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
. "${SPARK_CONF_DIR}/spark-env.sh"
fi
if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
"$bin"/spark-daemons.sh stop spark.deploy.worker.Worker 1
else
for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
"$bin"/spark-daemons.sh stop spark.deploy.worker.Worker $(( $i + 1 ))
done
fi
......@@ -12,6 +12,7 @@
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes to be spawned on every slave machine
#
# Finally, Spark also relies on the following variables, but these can be set
# on just the *master* (i.e. in your driver program), and will automatically
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
<version>0.7.1-SNAPSHOT</version>
<version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
......
......@@ -366,7 +366,7 @@ abstract class RDD[T: ClassManifest](
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
@deprecated("use mapPartitionsWithIndex")
@deprecated("use mapPartitionsWithIndex", "0.7.0")
def mapPartitionsWithSplit[U: ClassManifest](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] =
......
......@@ -36,17 +36,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
self: RDD[(K, V)])
extends Logging
with Serializable {
private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
val c = {
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
classManifest[T].erasure
} else {
// We get the type of the Writable class by looking at the apply method which converts
// from T to Writable. Since we have two apply methods we filter out the one which
// is of the form "java.lang.Object apply(java.lang.Object)"
// is not of the form "java.lang.Object apply(java.lang.Object)"
implicitly[T => Writable].getClass.getDeclaredMethods().filter(
m => m.getReturnType().toString != "java.lang.Object" &&
m => m.getReturnType().toString != "class java.lang.Object" &&
m.getName() == "apply")(0).getReturnType
}
......@@ -69,17 +69,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
val valueClass = getWritableClass[V]
val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
if (!convertKey && !convertValue) {
self.saveAsHadoopFile(path, keyClass, valueClass, format)
self.saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (!convertKey && convertValue) {
self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (convertKey && !convertValue) {
self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (convertKey && convertValue) {
self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
}
self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
}
}
}
......@@ -131,6 +131,6 @@ private[spark] object CheckpointRDD extends Logging {
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
fs.delete(path)
fs.delete(path, true)
}
}
......@@ -29,7 +29,7 @@ private[spark] case class NarrowCoGroupSplitDep(
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
private[spark]
class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep])
class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
extends Partition with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
......@@ -88,7 +88,7 @@ class CoGroupedRDD[K](
case _ =>
new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
}
}.toList)
}.toArray)
}
array
}
......
......@@ -16,7 +16,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* @tparam V the value class.
*/
class ShuffledRDD[K, V](
prev: RDD[(K, V)],
@transient prev: RDD[(K, V)],
part: Partitioner)
extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part))) {
......
......@@ -56,7 +56,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM
case _ =>
new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
}
}.toList)
}.toArray)
}
array
}
......
......@@ -10,17 +10,17 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest](
@transient rdd2: RDD[U]
) extends Partition {
var split1 = rdd1.partitions(idx)
var split2 = rdd1.partitions(idx)
var partition1 = rdd1.partitions(idx)
var partition2 = rdd2.partitions(idx)
override val index: Int = idx
def splits = (split1, split2)
def partitions = (partition1, partition2)
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
// Update the reference to parent split at the time of task serialization
split1 = rdd1.partitions(idx)
split2 = rdd2.partitions(idx)
// Update the reference to parent partition at the time of task serialization
partition1 = rdd1.partitions(idx)
partition2 = rdd2.partitions(idx)
oos.defaultWriteObject()
}
}
......@@ -43,13 +43,13 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
}
override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = {
val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits
rdd1.iterator(split1, context).zip(rdd2.iterator(split2, context))
val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
rdd1.iterator(partition1, context).zip(rdd2.iterator(partition2, context))
}
override def getPreferredLocations(s: Partition): Seq[String] = {
val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits
rdd1.preferredLocations(split1).intersect(rdd2.preferredLocations(split2))
val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2))
}
override def clearDependencies() {
......
......@@ -121,7 +121,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
val toRemove = new HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " +
(now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
}
}
......
......@@ -3,8 +3,8 @@ markdown: kramdown
# These allow the documentation to be updated with nerw releases
# of Spark, Scala, and Mesos.
SPARK_VERSION: 0.7.1-SNAPSHOT
SPARK_VERSION_SHORT: 0.7.1
SCALA_VERSION: 2.9.2
SPARK_VERSION: 0.8.0-SNAPSHOT
SPARK_VERSION_SHORT: 0.8.0
SCALA_VERSION: 2.9.3
MESOS_VERSION: 0.9.0-incubating
SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment