Skip to content
Snippets Groups Projects
Commit 543a54df authored by Dan Crankshaw's avatar Dan Crankshaw
Browse files

Tried to fix some indenting

parent c4a23f95
No related branches found
No related tags found
No related merge requests found
......@@ -75,87 +75,88 @@ object Analytics extends Logging {
gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both
)
}
def main(args: Array[String]) = {
val host = args(0)
val taskType = args(1)
val fname = args(2)
val options = args.drop(3).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = {
loggers.map{
loggerName =>
val logger = org.apache.log4j.Logger.getLogger(loggerName)
val prevLevel = logger.getLevel()
logger.setLevel(level)
loggerName -> prevLevel
}.toMap
}
def main(args: Array[String]) = {
val host = args(0)
val taskType = args(1)
val fname = args(2)
val options = args.drop(3).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = {
loggers.map{
loggerName =>
val logger = org.apache.log4j.Logger.getLogger(loggerName)
val prevLevel = logger.getLevel()
logger.setLevel(level)
loggerName -> prevLevel
}.toMap
}
// setLogLevels(org.apache.log4j.Level.DEBUG, Seq("org.apache.spark"))
val serializer = "org.apache.spark.serializer.KryoSerializer"
System.setProperty("spark.serializer", serializer)
//System.setProperty("spark.shuffle.compress", "false")
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
taskType match {
case "pagerank" => {
var numIter = Int.MaxValue
var isDynamic = false
var tol:Float = 0.001F
var outFname = ""
var numVPart = 4
var numEPart = 4
options.foreach{
case ("numIter", v) => numIter = v.toInt
case ("dynamic", v) => isDynamic = v.toBoolean
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numVPart", v) => numVPart = v.toInt
case ("numEPart", v) => numEPart = v.toInt
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
if(!isDynamic && numIter == Int.MaxValue) {
println("Set number of iterations!")
sys.exit(1)
}
println("======================================")
println("| PageRank |")
println("--------------------------------------")
println(" Using parameters:")
println(" \tDynamic: " + isDynamic)
if(isDynamic) println(" \t |-> Tolerance: " + tol)
println(" \tNumIter: " + numIter)
println("======================================")
val sc = new SparkContext(host, "PageRank(" + fname + ")")
val graph = GraphLoader.textFile(sc, fname, a => 1.0F, numEPart).withPartitioner(numVPart, numEPart).cache()
val startTime = System.currentTimeMillis
logInfo("GRAPHX: starting tasks")
logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
logInfo("GRAPHX: Number of edges " + graph.edges.count)
val pr = Analytics.pagerank(graph, numIter)
// val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
// else Analytics.pagerank(graph, numIter)
logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
if (!outFname.isEmpty) {
println("Saving pageranks of pages to " + outFname)
pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
}
logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
sc.stop()
val serializer = "org.apache.spark.serializer.KryoSerializer"
System.setProperty("spark.serializer", serializer)
//System.setProperty("spark.shuffle.compress", "false")
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
taskType match {
case "pagerank" => {
var numIter = Int.MaxValue
var isDynamic = false
var tol:Float = 0.001F
var outFname = ""
var numVPart = 4
var numEPart = 4
options.foreach{
case ("numIter", v) => numIter = v.toInt
case ("dynamic", v) => isDynamic = v.toBoolean
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numVPart", v) => numVPart = v.toInt
case ("numEPart", v) => numEPart = v.toInt
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
if(!isDynamic && numIter == Int.MaxValue) {
println("Set number of iterations!")
sys.exit(1)
}
println("======================================")
println("| PageRank |")
println("--------------------------------------")
println(" Using parameters:")
println(" \tDynamic: " + isDynamic)
if(isDynamic) println(" \t |-> Tolerance: " + tol)
println(" \tNumIter: " + numIter)
println("======================================")
val sc = new SparkContext(host, "PageRank(" + fname + ")")
val graph = GraphLoader.textFile(sc, fname, a => 1.0F, numEPart).withPartitioner(numVPart, numEPart).cache()
val startTime = System.currentTimeMillis
logInfo("GRAPHX: starting tasks")
logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
logInfo("GRAPHX: Number of edges " + graph.edges.count)
val pr = Analytics.pagerank(graph, numIter)
// val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
// else Analytics.pagerank(graph, numIter)
logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
if (!outFname.isEmpty) {
println("Saving pageranks of pages to " + outFname)
pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
}
logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
sc.stop()
}
// case "cc" => {
//
// var numIter = Int.MaxValue
......@@ -233,62 +234,62 @@ object Analytics extends Logging {
// }
// case "als" => {
// var numIter = 5
// var lambda = 0.01
// var latentK = 10
// var usersFname = "usersFactors.tsv"
// var moviesFname = "moviesFname.tsv"
// var numVPart = 4
// var numEPart = 4
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("lambda", v) => lambda = v.toDouble
// case ("latentK", v) => latentK = v.toInt
// case ("usersFname", v) => usersFname = v
// case ("moviesFname", v) => moviesFname = v
// case ("numVPart", v) => numVPart = v.toInt
// case ("numEPart", v) => numEPart = v.toInt
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// println("======================================")
// println("| Alternating Least Squares |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tNumIter: " + numIter)
// println(" \tLambda: " + lambda)
// println(" \tLatentK: " + latentK)
// println(" \tusersFname: " + usersFname)
// println(" \tmoviesFname: " + moviesFname)
// println("======================================")
// val sc = new SparkContext(host, "ALS(" + fname + ")")
// val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble )
// graph.numVPart = numVPart
// graph.numEPart = numEPart
// val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
// assert(maxUser < minMovie)
// val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
// factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
// .saveAsTextFile(usersFname)
// factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
// .saveAsTextFile(moviesFname)
// sc.stop()
// }
case _ => {
println("Invalid task type.")
}
// case "als" => {
// var numIter = 5
// var lambda = 0.01
// var latentK = 10
// var usersFname = "usersFactors.tsv"
// var moviesFname = "moviesFname.tsv"
// var numVPart = 4
// var numEPart = 4
// options.foreach{
// case ("numIter", v) => numIter = v.toInt
// case ("lambda", v) => lambda = v.toDouble
// case ("latentK", v) => latentK = v.toInt
// case ("usersFname", v) => usersFname = v
// case ("moviesFname", v) => moviesFname = v
// case ("numVPart", v) => numVPart = v.toInt
// case ("numEPart", v) => numEPart = v.toInt
// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
// }
// println("======================================")
// println("| Alternating Least Squares |")
// println("--------------------------------------")
// println(" Using parameters:")
// println(" \tNumIter: " + numIter)
// println(" \tLambda: " + lambda)
// println(" \tLatentK: " + latentK)
// println(" \tusersFname: " + usersFname)
// println(" \tmoviesFname: " + moviesFname)
// println("======================================")
// val sc = new SparkContext(host, "ALS(" + fname + ")")
// val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble )
// graph.numVPart = numVPart
// graph.numEPart = numEPart
// val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
// val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
// assert(maxUser < minMovie)
// val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
// factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
// .saveAsTextFile(usersFname)
// factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
// .saveAsTextFile(moviesFname)
// sc.stop()
// }
case _ => {
println("Invalid task type.")
}
}
}
// /**
// * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment