diff --git a/conf/core-site.xml b/conf/core-site.xml
new file mode 100644
index 0000000000000000000000000000000000000000..eefd875fc819bc87480c00fda7ab14c1b705ac9d
--- /dev/null
+++ b/conf/core-site.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+  <property>
+    <name>hadoop.tmp.dir</name>
+    <value>/mnt/ephemeral-hdfs</value>
+  </property>
+
+  <property>
+    <name>fs.default.name</name>
+    <value>hdfs://ec2-50-17-7-68.compute-1.amazonaws.com:9000</value>
+  </property>
+
+  <property>
+    <name>io.file.buffer.size</name>
+    <value>65536</value>
+  </property>
+
+  <property>
+    <name>dfs.client.read.shortcircuit</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>dfs.client.read.shortcircuit.skip.checksum</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>dfs.domain.socket.path</name>
+    <value>/var/run/hadoop-hdfs/dn._PORT</value>
+  </property>
+
+  <property>
+    <name>dfs.client.file-block-storage-locations.timeout</name>
+    <value>3000</value>
+  </property>
+
+</configuration>
diff --git a/conf/slaves b/conf/slaves
index da0a01343d20aff07b9143a3c2698867ffcf6770..728d22ac2ebb6b6385e0a4e250b43cdfb5cb5187 100644
--- a/conf/slaves
+++ b/conf/slaves
@@ -1,2 +1,10 @@
-# A Spark Worker will be started on each of the machines listed below.
-localhost
\ No newline at end of file
+ec2-23-20-12-62.compute-1.amazonaws.com
+ec2-54-205-173-19.compute-1.amazonaws.com
+ec2-54-225-4-124.compute-1.amazonaws.com
+ec2-23-22-209-112.compute-1.amazonaws.com
+ec2-50-16-69-88.compute-1.amazonaws.com
+ec2-54-205-163-126.compute-1.amazonaws.com
+ec2-54-242-235-95.compute-1.amazonaws.com
+ec2-54-211-169-232.compute-1.amazonaws.com
+ec2-54-237-31-30.compute-1.amazonaws.com
+ec2-54-235-15-124.compute-1.amazonaws.com
diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template
index 0a35ee7c79b12da7d4ab0f3036964dcd31764344..b8936314ecce282f835201c56dac6db6e99bbe1f 100755
--- a/conf/spark-env.sh.template
+++ b/conf/spark-env.sh.template
@@ -1,21 +1,19 @@
 #!/usr/bin/env bash
 
 # This file contains environment variables required to run Spark. Copy it as
-# spark-env.sh and edit that to configure Spark for your site.
-#
-# The following variables can be set in this file:
-# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
+# spark-env.sh and edit that to configure Spark for your site. At a minimum,
+# the following two variables should be set:
+# - SCALA_HOME, to point to your Scala installation, or SCALA_LIBRARY_PATH to
+#   point to the directory for Scala library JARs (if you install Scala as a
+#   Debian or RPM package, these are in a separate path, often /usr/share/java)
 # - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos
-# - SPARK_JAVA_OPTS, to set node-specific JVM options for Spark. Note that
-#   we recommend setting app-wide options in the application's driver program.
-#     Examples of node-specific options : -Dspark.local.dir, GC options
-#     Examples of app-wide options : -Dspark.serializer
 #
-# If using the standalone deploy mode, you can also set variables for it here:
-# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
+# If using the standalone deploy mode, you can also set variables for it:
+# - SPARK_MASTER_IP, to bind the master to a different IP address
 # - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
 # - SPARK_WORKER_CORES, to set the number of cores to use on this machine
 # - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
 # - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
-# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
+# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes
+#   to be spawned on every slave machine
 
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 29968c273c31d59e5b45e1b8237f860c9eb21bdf..9b8384bcbb58ccbd4f66bac5a24dfd34845e934f 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -156,6 +156,7 @@ object SparkEnv extends Logging {
 
     val serializer = serializerManager.setDefault(
       System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
+    logInfo("spark.serializer is " + System.getProperty("spark.serializer"))
 
     val closureSerializer = serializerManager.get(
       System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 24ef204aa19909dc20bcc5b169a09c0ace56856e..3feafde8b6ff4a219b34c6597d0fa1cdb1dc9d14 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -61,14 +61,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
     kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
 
     // Allow the user to register their own classes by setting spark.kryo.registrator
-    try {
-      Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
-        logDebug("Running user registrator: " + regCls)
-        val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
-        reg.registerClasses(kryo)
-      }
-    } catch {
-      case _: Exception => println("Failed to register spark.kryo.registrator")
+    Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
+      logDebug("Running user registrator: " + regCls)
+      val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
+      reg.registerClasses(kryo)
     }
 
     kryo.setClassLoader(classLoader)
@@ -116,7 +112,7 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
   }
 }
 
-private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
+private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance with Logging {
   val kryo = ks.newKryo()
   val output = ks.newKryoOutput()
   val input = ks.newKryoInput()
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 2955986feced55d1a2f3ec00f174b5b793560f8c..5082730ae3fa3127e3e26ac23dbdac7ad1713500 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
  * instance of the serializer object has been created, the get method returns that instead of
  * creating a new one.
  */
-private[spark] class SerializerManager {
+private[spark] class SerializerManager extends org.apache.spark.Logging {
 
   private val serializers = new ConcurrentHashMap[String, Serializer]
   private var _default: Serializer = _
diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
index b411c60cee15eba54eeb7efec83405dad5953d54..454df22a0805beebecb6d3994bc6bf99973e0776 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
@@ -6,37 +6,6 @@ import org.apache.spark._
 
 object Analytics extends Logging {
 
-//  def main(args: Array[String]) {
-//    //pregelPagerank()
-//  }
-
-  // /**
-  //  * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
-  //  */
-  // // def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
-  // //   // Compute the out degree of each vertex
-  // //   val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees,
-  // //     (vertex, deg) => (deg.getOrElse(0), 1.0F)
-  // //   )
-  // //   GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)(
-  // //     (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
-  // //     (a: Float, b: Float) => a + b, // merge
-  // //     (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply
-  // //     numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
-  // // }
-  // def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
-  //   // Compute the out degree of each vertex
-  //   val pagerankGraph = graph.updateVertices[Int, (Int, Double)](graph.outDegrees,
-  //     (vertex, deg) => (deg.getOrElse(0), 1.0)
-  //   )
-  //   GraphLab.iterateGA2[(Int, Double), ED, Double](pagerankGraph)(
-  //     (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
-  //     (a: Double, b: Double) => a + b, // merge
-  //     0.0, // default
-  //     (vertex, a: Double) => (vertex.data._1, (0.15 + 0.85 * a)), // apply
-  //     numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
-  // }
-
   /**
    * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
    */
@@ -106,335 +75,220 @@ object Analytics extends Logging {
       gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both
     )
   }
-
-  //   /**
-  //    * Compute the shortest path to a set of markers
-  //    */
-  //   def shortestPath[VD: Manifest](graph: Graph[VD, Float], sources: List[Int], numIter: Int) = {
-  //     val sourceSet = sources.toSet
-  //     val spGraph = graph.mapVertices {
-  //       case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue))
-  //     }
-  //     GraphLab.iterateGA[Float, Float, Float](spGraph)(
-  //       (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
-  //       (a: Float, b: Float) => math.min(a, b), // merge
-  //       (v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply
-  //       numIter,
-  //       gatherDirection = EdgeDirection.In)
-  //   }
-
-  //   // /**
-  //   //  * Compute the connected component membership of each vertex
-  //   //  * and return an RDD with the vertex value containing the
-  //   //  * lowest vertex id in the connected component containing
-  //   //  * that vertex.
-  //   //  */
-  //   // def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
-  //   //   numIter: Int = Int.MaxValue) = {
-
-  //   //   val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) })
-  //   //   val edges = graph.edges // .mapValues(v => None)
-  //   //   val ccGraph = new Graph(vertices, edges)
-
-  //   //   ccGraph.iterateDynamic(
-  //   //     (me_id, edge) => edge.otherVertex(me_id).data, // gather
-  //   //     (a: Int, b: Int) => math.min(a, b), // merge
-  //   //     Integer.MAX_VALUE,
-  //   //     (v, a: Int) => math.min(v.data, a), // apply
-  //   //     (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter
-  //   //     numIter,
-  //   //     gatherEdges = EdgeDirection.Both,
-  //   //     scatterEdges = EdgeDirection.Both).vertices
-  //   //   //
-  //   //   //    graph_ret.vertices.collect.foreach(println)
-  //   //   //    graph_ret.edges.take(10).foreach(println)
-  //   // }
-
-
-  //   // /**
-  //   //  * Compute the shortest path to a set of markers
-  //   //  */
-  //   //  def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Float],
-  //   //   sources: List[Int], numIter: Int) = {
-  //   //   val sourceSet = sources.toSet
-  //   //   val vertices = graph.vertices.mapPartitions(
-  //   //     iter => iter.map {
-  //   //       case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue) )
-  //   //       });
-
-  //   //   val edges = graph.edges // .mapValues(v => None)
-  //   //   val spGraph = new Graph(vertices, edges)
-
-  //   //   val niterations = Int.MaxValue
-  //   //   spGraph.iterateDynamic(
-  //   //     (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
-  //   //     (a: Float, b: Float) => math.min(a, b), // merge
-  //   //     Float.MaxValue,
-  //   //     (v, a: Float) => math.min(v.data, a), // apply
-  //   //     (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter
-  //   //     numIter,
-  //   //     gatherEdges = EdgeDirection.In,
-  //   //     scatterEdges = EdgeDirection.Out).vertices
-  //   // }
-
-
-  //   // /**
-  //   //  *
-  //   //  */
-  //   // def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double],
-  //   //   latentK: Int, lambda: Double, numIter: Int) = {
-  //   //   val vertices = graph.vertices.mapPartitions( _.map {
-  //   //       case (vid, _) => (vid,  Array.fill(latentK){ scala.util.Random.nextDouble() } )
-  //   //       }).cache
-  //   //   val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
-  //   //   val edges = graph.edges // .mapValues(v => None)
-  //   //   val alsGraph = new Graph(vertices, edges)
-  //   //   alsGraph.numVPart = graph.numVPart
-  //   //   alsGraph.numEPart = graph.numEPart
-
-  //   //   val niterations = Int.MaxValue
-  //   //   alsGraph.iterateDynamic[(Array[Double], Array[Double])](
-  //   //     (me_id, edge) => { // gather
-  //   //       val X = edge.otherVertex(me_id).data
-  //   //       val y = edge.data
-  //   //       val Xy = X.map(_ * y)
-  //   //       val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray
-  //   //       (Xy, XtX)
-  //   //     },
-  //   //     (a, b) => {
-  //   //     // The difference between the while loop and the zip is a FACTOR OF TWO in overall
-  //   //     //  runtime
-  //   //       var i = 0
-  //   //       while(i < a._1.length) { a._1(i) += b._1(i); i += 1 }
-  //   //       i = 0
-  //   //       while(i < a._2.length) { a._2(i) += b._2(i); i += 1 }
-  //   //       a
-  //   //       // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r })
-  //   //     },
-  //   //     (Array.empty[Double], Array.empty[Double]), // default value is empty
-  //   //     (vertex, accum) => { // apply
-  //   //       val XyArray  = accum._1
-  //   //       val XtXArray = accum._2
-  //   //       if(XyArray.isEmpty) vertex.data // no neighbors
-  //   //       else {
-  //   //         val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) =>
-  //   //           (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) +
-  //   //           (if(i == j) lambda else 1.0F) //regularization
-  //   //         }
-  //   //         val Xy = DenseMatrix.create(latentK,1,XyArray)
-  //   //         val w = XtX \ Xy
-  //   //         w.data
-  //   //       }
-  //   //     },
-  //   //     (me_id, edge) => true,
-  //   //     numIter,
-  //   //     gatherEdges = EdgeDirection.Both,
-  //   //     scatterEdges = EdgeDirection.Both,
-  //   //     vertex => vertex.id < maxUser).vertices
-  //   // }
-
-  //   def main(args: Array[String]) = {
-  //     val host = args(0)
-  //     val taskType = args(1)
-  //     val fname = args(2)
-  //     val options =  args.drop(3).map { arg =>
-  //       arg.dropWhile(_ == '-').split('=') match {
-  //         case Array(opt, v) => (opt -> v)
-  //         case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
-  //       }
-  //     }
-
-  //     System.setProperty("spark.serializer", "spark.KryoSerializer")
-  //     //System.setProperty("spark.shuffle.compress", "false")
-  //     System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator")
-
-  //     taskType match {
-  //       case "pagerank" => {
-
-  //         var numIter = Int.MaxValue
-  //         var isDynamic = false
-  //         var tol:Float = 0.001F
-  //         var outFname = ""
-  //         var numVPart = 4
-  //         var numEPart = 4
-
-  //         options.foreach{
-  //           case ("numIter", v) => numIter = v.toInt
-  //           case ("dynamic", v) => isDynamic = v.toBoolean
-  //           case ("tol", v) => tol = v.toFloat
-  //           case ("output", v) => outFname = v
-  //           case ("numVPart", v) => numVPart = v.toInt
-  //           case ("numEPart", v) => numEPart = v.toInt
-  //           case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
-  //         }
-
-  //         if(!isDynamic && numIter == Int.MaxValue) {
-  //           println("Set number of iterations!")
-  //           sys.exit(1)
-  //         }
-  //         println("======================================")
-  //         println("|             PageRank               |")
-  //         println("--------------------------------------")
-  //         println(" Using parameters:")
-  //         println(" \tDynamic:  " + isDynamic)
-  //         if(isDynamic) println(" \t  |-> Tolerance: " + tol)
-  //         println(" \tNumIter:  " + numIter)
-  //         println("======================================")
-
-  //         val sc = new SparkContext(host, "PageRank(" + fname + ")")
-
-  //         val graph = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
-
-  //         val startTime = System.currentTimeMillis
-  //         logInfo("GRAPHX: starting tasks")
-  //         logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
-  //         logInfo("GRAPHX: Number of edges " + graph.edges.count)
-
-  //         val pr = Analytics.pagerank(graph, numIter)
-  //         // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
-  //         //   else  Analytics.pagerank(graph, numIter)
-  //         logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
-  //         if (!outFname.isEmpty) {
-  //           println("Saving pageranks of pages to " + outFname)
-  //           pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
-  //         }
-  //         logInfo("GRAPHX: Runtime:    " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
-  //         sc.stop()
-  //       }
-
-  //      case "cc" => {
-
-  //         var numIter = Int.MaxValue
-  //         var isDynamic = false
-
-  //         options.foreach{
-  //           case ("numIter", v) => numIter = v.toInt
-  //           case ("dynamic", v) => isDynamic = v.toBoolean
-  //           case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
-  //         }
-
-  //         if(!isDynamic && numIter == Int.MaxValue) {
-  //           println("Set number of iterations!")
-  //           sys.exit(1)
-  //         }
-  //         println("======================================")
-  //         println("|      Connected Components          |")
-  //         println("--------------------------------------")
-  //         println(" Using parameters:")
-  //         println(" \tDynamic:  " + isDynamic)
-  //         println(" \tNumIter:  " + numIter)
-  //         println("======================================")
-
-  //         val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
-  //         val graph = Graph.textFile(sc, fname, a => 1.0F)
-  //         val cc = Analytics.connectedComponents(graph, numIter)
-  //         // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
-  //         //   else  Analytics.connectedComponents(graph, numIter)
-  //         println("Components: " + cc.vertices.map(_.data).distinct())
-
-  //         sc.stop()
-  //       }
-
-  //      case "shortestpath" => {
-
-  //         var numIter = Int.MaxValue
-  //         var isDynamic = true
-  //         var sources: List[Int] = List.empty
-
-  //         options.foreach{
-  //           case ("numIter", v) => numIter = v.toInt
-  //           case ("dynamic", v) => isDynamic = v.toBoolean
-  //           case ("source", v) => sources ++= List(v.toInt)
-  //           case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
-  //         }
-
-
-  //         if(!isDynamic && numIter == Int.MaxValue) {
-  //           println("Set number of iterations!")
-  //           sys.exit(1)
-  //         }
-
-  //         if(sources.isEmpty) {
-  //           println("No sources provided!")
-  //           sys.exit(1)
-  //         }
-
-  //         println("======================================")
-  //         println("|          Shortest Path             |")
-  //         println("--------------------------------------")
-  //         println(" Using parameters:")
-  //         println(" \tDynamic:  " + isDynamic)
-  //         println(" \tNumIter:  " + numIter)
-  //         println(" \tSources:  [" + sources.mkString(", ") + "]")
-  //         println("======================================")
-
-  //         val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
-  //         val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) )
-  //         val sp = Analytics.shortestPath(graph, sources, numIter)
-  //         // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
-  //         //   else  Analytics.shortestPath(graph, sources, numIter)
-  //         println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
-
-  //         sc.stop()
-  //       }
-
-
-  //      //  case "als" => {
-
-  //      //    var numIter = 5
-  //      //    var lambda = 0.01
-  //      //    var latentK = 10
-  //      //    var usersFname = "usersFactors.tsv"
-  //      //    var moviesFname = "moviesFname.tsv"
-  //      //    var numVPart = 4
-  //      //    var numEPart = 4
-
-  //      //    options.foreach{
-  //      //      case ("numIter", v) => numIter = v.toInt
-  //      //      case ("lambda", v) => lambda = v.toDouble
-  //      //      case ("latentK", v) => latentK = v.toInt
-  //      //      case ("usersFname", v) => usersFname = v
-  //      //      case ("moviesFname", v) => moviesFname = v
-  //      //      case ("numVPart", v) => numVPart = v.toInt
-  //      //      case ("numEPart", v) => numEPart = v.toInt
-  //      //      case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
-  //      //    }
-
-  //      //    println("======================================")
-  //      //    println("|       Alternating Least Squares    |")
-  //      //    println("--------------------------------------")
-  //      //    println(" Using parameters:")
-  //      //    println(" \tNumIter:     " + numIter)
-  //      //    println(" \tLambda:      " + lambda)
-  //      //    println(" \tLatentK:     " + latentK)
-  //      //    println(" \tusersFname:  " + usersFname)
-  //      //    println(" \tmoviesFname: " + moviesFname)
-  //      //    println("======================================")
-
-  //      //    val sc = new SparkContext(host, "ALS(" + fname + ")")
-  //      //    val graph = Graph.textFile(sc, fname, a => a(0).toDouble )
-  //      //    graph.numVPart = numVPart
-  //      //    graph.numEPart = numEPart
-
-  //      //    val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
-  //      //    val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
-  //      //    assert(maxUser < minMovie)
-
-  //      //    val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
-  //      //    factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
-  //      //      .saveAsTextFile(usersFname)
-  //      //    factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
-  //      //      .saveAsTextFile(moviesFname)
-
-  //      //    sc.stop()
-  //      //  }
-
-
-  //       case _ => {
-  //         println("Invalid task type.")
-  //       }
-  //     }
-  //   }
+     def main(args: Array[String]) = {
+       val host = args(0)
+       val taskType = args(1)
+       val fname = args(2)
+       val options =  args.drop(3).map { arg =>
+         arg.dropWhile(_ == '-').split('=') match {
+           case Array(opt, v) => (opt -> v)
+           case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+         }
+       }
+       
+       def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = {
+         loggers.map{
+           loggerName =>
+             val logger = org.apache.log4j.Logger.getLogger(loggerName)
+           val prevLevel = logger.getLevel()
+           logger.setLevel(level)
+           loggerName -> prevLevel
+         }.toMap
+       }
+//       setLogLevels(org.apache.log4j.Level.DEBUG, Seq("org.apache.spark"))
+
+       val serializer = "org.apache.spark.serializer.KryoSerializer"
+       System.setProperty("spark.serializer", serializer)
+       //System.setProperty("spark.shuffle.compress", "false")
+       System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
+
+       taskType match {
+         case "pagerank" => {
+
+           var numIter = Int.MaxValue
+           var isDynamic = false
+           var tol:Float = 0.001F
+           var outFname = ""
+           var numVPart = 4
+           var numEPart = 4
+
+           options.foreach{
+             case ("numIter", v) => numIter = v.toInt
+             case ("dynamic", v) => isDynamic = v.toBoolean
+             case ("tol", v) => tol = v.toFloat
+             case ("output", v) => outFname = v
+             case ("numVPart", v) => numVPart = v.toInt
+             case ("numEPart", v) => numEPart = v.toInt
+             case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+           }
+
+           if(!isDynamic && numIter == Int.MaxValue) {
+             println("Set number of iterations!")
+             sys.exit(1)
+           }
+           println("======================================")
+           println("|             PageRank               |")
+           println("--------------------------------------")
+           println(" Using parameters:")
+           println(" \tDynamic:  " + isDynamic)
+           if(isDynamic) println(" \t  |-> Tolerance: " + tol)
+           println(" \tNumIter:  " + numIter)
+           println("======================================")
+
+           val sc = new SparkContext(host, "PageRank(" + fname + ")")
+
+           val graph = GraphLoader.textFile(sc, fname, a => 1.0F, numEPart).withPartitioner(numVPart, numEPart).cache()
+
+           val startTime = System.currentTimeMillis
+           logInfo("GRAPHX: starting tasks")
+           logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
+           logInfo("GRAPHX: Number of edges " + graph.edges.count)
+
+           val pr = Analytics.pagerank(graph, numIter)
+           // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
+           //   else  Analytics.pagerank(graph, numIter)
+           logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
+           if (!outFname.isEmpty) {
+             println("Saving pageranks of pages to " + outFname)
+             pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
+           }
+           logInfo("GRAPHX: Runtime:    " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
+           sc.stop()
+         }
+
+//        case "cc" => {
+//
+//           var numIter = Int.MaxValue
+//           var isDynamic = false
+//
+//           options.foreach{
+//             case ("numIter", v) => numIter = v.toInt
+//             case ("dynamic", v) => isDynamic = v.toBoolean
+//             case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+//           }
+//
+//           if(!isDynamic && numIter == Int.MaxValue) {
+//             println("Set number of iterations!")
+//             sys.exit(1)
+//           }
+//           println("======================================")
+//           println("|      Connected Components          |")
+//           println("--------------------------------------")
+//           println(" Using parameters:")
+//           println(" \tDynamic:  " + isDynamic)
+//           println(" \tNumIter:  " + numIter)
+//           println("======================================")
+//
+//           val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
+//           val graph = GraphLoader.textFile(sc, fname, a => 1.0F)
+//           //val cc = Analytics.connectedComponents(graph, numIter)
+//           // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
+//           //   else  Analytics.connectedComponents(graph, numIter)
+//           println("Components: " + cc.vertices.map(_.data).distinct())
+//
+//           sc.stop()
+//         }
+//
+//        case "shortestpath" => {
+//
+//           var numIter = Int.MaxValue
+//           var isDynamic = true
+//           var sources: List[Int] = List.empty
+//
+//           options.foreach{
+//             case ("numIter", v) => numIter = v.toInt
+//             case ("dynamic", v) => isDynamic = v.toBoolean
+//             case ("source", v) => sources ++= List(v.toInt)
+//             case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+//           }
+//
+//
+//           if(!isDynamic && numIter == Int.MaxValue) {
+//             println("Set number of iterations!")
+//             sys.exit(1)
+//           }
+//
+//           if(sources.isEmpty) {
+//             println("No sources provided!")
+//             sys.exit(1)
+//           }
+//
+//           println("======================================")
+//           println("|          Shortest Path             |")
+//           println("--------------------------------------")
+//           println(" Using parameters:")
+//           println(" \tDynamic:  " + isDynamic)
+//           println(" \tNumIter:  " + numIter)
+//           println(" \tSources:  [" + sources.mkString(", ") + "]")
+//           println("======================================")
+//
+//           val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
+//           val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) )
+//           //val sp = Analytics.shortestPath(graph, sources, numIter)
+//           // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
+//           //   else  Analytics.shortestPath(graph, sources, numIter)
+//           println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
+//
+//           sc.stop()
+//         }
+
+
+        //  case "als" => {
+
+        //    var numIter = 5
+        //    var lambda = 0.01
+        //    var latentK = 10
+        //    var usersFname = "usersFactors.tsv"
+        //    var moviesFname = "moviesFname.tsv"
+        //    var numVPart = 4
+        //    var numEPart = 4
+
+        //    options.foreach{
+        //      case ("numIter", v) => numIter = v.toInt
+        //      case ("lambda", v) => lambda = v.toDouble
+        //      case ("latentK", v) => latentK = v.toInt
+        //      case ("usersFname", v) => usersFname = v
+        //      case ("moviesFname", v) => moviesFname = v
+        //      case ("numVPart", v) => numVPart = v.toInt
+        //      case ("numEPart", v) => numEPart = v.toInt
+        //      case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+        //    }
+
+        //    println("======================================")
+        //    println("|       Alternating Least Squares    |")
+        //    println("--------------------------------------")
+        //    println(" Using parameters:")
+        //    println(" \tNumIter:     " + numIter)
+        //    println(" \tLambda:      " + lambda)
+        //    println(" \tLatentK:     " + latentK)
+        //    println(" \tusersFname:  " + usersFname)
+        //    println(" \tmoviesFname: " + moviesFname)
+        //    println("======================================")
+
+        //    val sc = new SparkContext(host, "ALS(" + fname + ")")
+        //    val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble )
+        //    graph.numVPart = numVPart
+        //    graph.numEPart = numEPart
+
+        //    val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
+        //    val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
+        //    assert(maxUser < minMovie)
+
+        //    val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
+        //    factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
+        //      .saveAsTextFile(usersFname)
+        //    factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
+        //      .saveAsTextFile(moviesFname)
+
+        //    sc.stop()
+        //  }
+
+
+         case _ => {
+           println("Invalid task type.")
+         }
+       }
+     }
 
   // /**
   //  * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
@@ -648,7 +502,7 @@ object Analytics extends Logging {
 
   //       val sc = new SparkContext(host, "PageRank(" + fname + ")")
 
-  //       val graph = Graph.textFile(sc, fname, a => 1.0).withPartitioner(numVPart, numEPart).cache()
+  //       val graph = GraphLoader.textFile(sc, fname, a => 1.0).withPartitioner(numVPart, numEPart).cache()
 
   //       val startTime = System.currentTimeMillis
   //       logInfo("GRAPHX: starting tasks")
@@ -691,7 +545,7 @@ object Analytics extends Logging {
   //       println("======================================")
 
   //       val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
-  //       val graph = Graph.textFile(sc, fname, a => 1.0)
+  //       val graph = GraphLoader.textFile(sc, fname, a => 1.0)
   //       val cc = Analytics.connectedComponents(graph, numIter)
   //       // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
   //       //   else  Analytics.connectedComponents(graph, numIter)
@@ -734,7 +588,7 @@ object Analytics extends Logging {
   //       println("======================================")
 
   //       val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
-  //       val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0 else a(0).toDouble ) )
+  //       val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0 else a(0).toDouble ) )
   //       val sp = Analytics.shortestPath(graph, sources, numIter)
   //       // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
   //       //   else  Analytics.shortestPath(graph, sources, numIter)
@@ -777,7 +631,7 @@ object Analytics extends Logging {
   //    println("======================================")
 
   //    val sc = new SparkContext(host, "ALS(" + fname + ")")
-  //    val graph = Graph.textFile(sc, fname, a => a(0).toDouble )
+  //    val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble )
   //    graph.numVPart = numVPart
   //    graph.numEPart = numEPart
 
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
index 297506e1e351edbab13c425a9ccd8097fd1bd71f..2d72789878c1aabd7593e6e2a1209f0cfacebb35 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
@@ -4,7 +4,7 @@ import com.esotericsoftware.kryo.Kryo
 
 import org.apache.spark.graph.impl.MessageToPartition
 import org.apache.spark.serializer.KryoRegistrator
-
+import org.apache.spark.graph.impl._
 
 class GraphKryoRegistrator extends KryoRegistrator {
 
@@ -13,6 +13,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
     kryo.register(classOf[Edge[Object]])
     kryo.register(classOf[MutableTuple2[Object, Object]])
     kryo.register(classOf[MessageToPartition[Object]])
+    kryo.register(classOf[(Vid, Object)])
+    kryo.register(classOf[EdgePartition[Object]])
 
     // This avoids a large number of hash table lookups.
     kryo.setReferences(false)
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
index 4d7ca1268d36ef1a08ccdd0afd6797170b096693..bcc73691df0ff8881e4969cd1146c8d8853bff54 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
@@ -20,7 +20,7 @@ object GraphLoader {
     : GraphImpl[Int, ED] = {
 
     // Parse the edge data table
-    val edges = sc.textFile(path).flatMap { line =>
+    val edges = sc.textFile(path, minEdgePartitions).flatMap { line =>
       if (!line.isEmpty && line(0) != '#') {
         val lineArray = line.split("\\s+")
         if(lineArray.length < 2) {
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index e178df3841531f30b893af81a68720c25d0e0262..9f5002208962f5de9ec225f2726fa744c69df5d1 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -516,7 +516,7 @@ object GraphImpl {
       .map { e =>
         // Random partitioning based on the source vertex id.
         // val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions)
-        val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
+        val part: Pid = randomVertexCut(e.src, e.dst, numPartitions)
         //val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
 
         // Should we be using 3-tuple or an optimized class
diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
index d0583c48a85970694e6278f2b8f0277d7c5d5c5c..a393cb504e377014e2497dafb99234a9e91a9cfb 100644
--- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
@@ -63,9 +63,6 @@ object GraphGenerators {
 
   }
 
-  // For now just writes graph to a file. Eventually
-  // it will return a spark.graph.Graph
-
 
   // Right now it just generates a bunch of edges where
   // the edge data is the weight (default 1)