Skip to content
Snippets Groups Projects
Commit c4a23f95 authored by Dan Crankshaw's avatar Dan Crankshaw
Browse files

Updated code so benchmarks actually run.

parent fa2f87ca
No related branches found
No related tags found
No related merge requests found
Showing with 292 additions and 393 deletions
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/mnt/ephemeral-hdfs</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://ec2-50-17-7-68.compute-1.amazonaws.com:9000</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>65536</value>
</property>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>false</value>
</property>
<property>
<name>dfs.client.read.shortcircuit.skip.checksum</name>
<value>false</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/run/hadoop-hdfs/dn._PORT</value>
</property>
<property>
<name>dfs.client.file-block-storage-locations.timeout</name>
<value>3000</value>
</property>
</configuration>
# A Spark Worker will be started on each of the machines listed below.
localhost
\ No newline at end of file
ec2-23-20-12-62.compute-1.amazonaws.com
ec2-54-205-173-19.compute-1.amazonaws.com
ec2-54-225-4-124.compute-1.amazonaws.com
ec2-23-22-209-112.compute-1.amazonaws.com
ec2-50-16-69-88.compute-1.amazonaws.com
ec2-54-205-163-126.compute-1.amazonaws.com
ec2-54-242-235-95.compute-1.amazonaws.com
ec2-54-211-169-232.compute-1.amazonaws.com
ec2-54-237-31-30.compute-1.amazonaws.com
ec2-54-235-15-124.compute-1.amazonaws.com
#!/usr/bin/env bash
# This file contains environment variables required to run Spark. Copy it as
# spark-env.sh and edit that to configure Spark for your site.
#
# The following variables can be set in this file:
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# spark-env.sh and edit that to configure Spark for your site. At a minimum,
# the following two variables should be set:
# - SCALA_HOME, to point to your Scala installation, or SCALA_LIBRARY_PATH to
# point to the directory for Scala library JARs (if you install Scala as a
# Debian or RPM package, these are in a separate path, often /usr/share/java)
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos
# - SPARK_JAVA_OPTS, to set node-specific JVM options for Spark. Note that
# we recommend setting app-wide options in the application's driver program.
# Examples of node-specific options : -Dspark.local.dir, GC options
# Examples of app-wide options : -Dspark.serializer
#
# If using the standalone deploy mode, you can also set variables for it here:
# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
# If using the standalone deploy mode, you can also set variables for it:
# - SPARK_MASTER_IP, to bind the master to a different IP address
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes
# to be spawned on every slave machine
......@@ -156,6 +156,7 @@ object SparkEnv extends Logging {
val serializer = serializerManager.setDefault(
System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
logInfo("spark.serializer is " + System.getProperty("spark.serializer"))
val closureSerializer = serializerManager.get(
System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
......
......@@ -61,14 +61,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
// Allow the user to register their own classes by setting spark.kryo.registrator
try {
Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
logDebug("Running user registrator: " + regCls)
val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
}
} catch {
case _: Exception => println("Failed to register spark.kryo.registrator")
Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
logDebug("Running user registrator: " + regCls)
val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
}
kryo.setClassLoader(classLoader)
......@@ -116,7 +112,7 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
}
}
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance with Logging {
val kryo = ks.newKryo()
val output = ks.newKryoOutput()
val input = ks.newKryoInput()
......
......@@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
* instance of the serializer object has been created, the get method returns that instead of
* creating a new one.
*/
private[spark] class SerializerManager {
private[spark] class SerializerManager extends org.apache.spark.Logging {
private val serializers = new ConcurrentHashMap[String, Serializer]
private var _default: Serializer = _
......
This diff is collapsed.
......@@ -4,7 +4,7 @@ import com.esotericsoftware.kryo.Kryo
import org.apache.spark.graph.impl.MessageToPartition
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.graph.impl._
class GraphKryoRegistrator extends KryoRegistrator {
......@@ -13,6 +13,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MutableTuple2[Object, Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[(Vid, Object)])
kryo.register(classOf[EdgePartition[Object]])
// This avoids a large number of hash table lookups.
kryo.setReferences(false)
......
......@@ -20,7 +20,7 @@ object GraphLoader {
: GraphImpl[Int, ED] = {
// Parse the edge data table
val edges = sc.textFile(path).flatMap { line =>
val edges = sc.textFile(path, minEdgePartitions).flatMap { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if(lineArray.length < 2) {
......
......@@ -516,7 +516,7 @@ object GraphImpl {
.map { e =>
// Random partitioning based on the source vertex id.
// val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions)
val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
val part: Pid = randomVertexCut(e.src, e.dst, numPartitions)
//val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
// Should we be using 3-tuple or an optimized class
......
......@@ -63,9 +63,6 @@ object GraphGenerators {
}
// For now just writes graph to a file. Eventually
// it will return a spark.graph.Graph
// Right now it just generates a bunch of edges where
// the edge data is the weight (default 1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment