Skip to content
Snippets Groups Projects
Commit 2881d14c authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-7929] Remove Bagel examples & whitespace fix for examples.

Author: Reynold Xin <rxin@databricks.com>

Closes #6480 from rxin/whitespace-example and squashes the following commits:

8a4a3d4 [Reynold Xin] [SPARK-7929] Remove Bagel examples & whitespace fix for examples.
parent ff44c711
No related branches found
No related tags found
No related merge requests found
Showing
with 31 additions and 485 deletions
......@@ -104,8 +104,8 @@ object CassandraCQLTest {
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[CqlPagingInputFormat],
classOf[java.util.Map[String,ByteBuffer]],
classOf[java.util.Map[String,ByteBuffer]])
classOf[java.util.Map[String, ByteBuffer]],
classOf[java.util.Map[String, ByteBuffer]])
println("Count: " + casRdd.count)
val productSaleRDD = casRdd.map {
......@@ -118,7 +118,7 @@ object CassandraCQLTest {
case (productId, saleCount) => println(productId + ":" + saleCount)
}
val casoutputCF = aggregatedRDD.map {
val casoutputCF = aggregatedRDD.map {
case (productId, saleCount) => {
val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId))
val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
......
......@@ -39,7 +39,7 @@ object LocalLR {
def generateData: Array[DataPoint] = {
def generatePoint(i: Int): DataPoint = {
val y = if(i % 2 == 0) -1 else 1
val y = if (i % 2 == 0) -1 else 1
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
}
......
......@@ -117,7 +117,7 @@ object SparkALS {
var us = Array.fill(U)(randomVector(F))
// Iteratively update movies then users
val Rc = sc.broadcast(R)
val Rc = sc.broadcast(R)
var msb = sc.broadcast(ms)
var usb = sc.broadcast(us)
for (iter <- 1 to ITERATIONS) {
......
......@@ -44,7 +44,7 @@ object SparkLR {
def generateData: Array[DataPoint] = {
def generatePoint(i: Int): DataPoint = {
val y = if(i % 2 == 0) -1 else 1
val y = if (i % 2 == 0) -1 else 1
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.bagel
import org.apache.spark._
import org.apache.spark.bagel._
class PageRankUtils extends Serializable {
def computeWithCombiner(numVertices: Long, epsilon: Double)(
self: PRVertex, messageSum: Option[Double], superstep: Int
): (PRVertex, Array[PRMessage]) = {
val newValue = messageSum match {
case Some(msgSum) if msgSum != 0 =>
0.15 / numVertices + 0.85 * msgSum
case _ => self.value
}
val terminate = superstep >= 10
val outbox: Array[PRMessage] =
if (!terminate) {
self.outEdges.map(targetId => new PRMessage(targetId, newValue / self.outEdges.size))
} else {
Array[PRMessage]()
}
(new PRVertex(newValue, self.outEdges, !terminate), outbox)
}
def computeNoCombiner(numVertices: Long, epsilon: Double)
(self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int)
: (PRVertex, Array[PRMessage]) =
computeWithCombiner(numVertices, epsilon)(self, messages match {
case Some(msgs) => Some(msgs.map(_.value).sum)
case None => None
}, superstep)
}
class PRCombiner extends Combiner[PRMessage, Double] with Serializable {
def createCombiner(msg: PRMessage): Double =
msg.value
def mergeMsg(combiner: Double, msg: PRMessage): Double =
combiner + msg.value
def mergeCombiners(a: Double, b: Double): Double =
a + b
}
class PRVertex() extends Vertex with Serializable {
var value: Double = _
var outEdges: Array[String] = _
var active: Boolean = _
def this(value: Double, outEdges: Array[String], active: Boolean = true) {
this()
this.value = value
this.outEdges = outEdges
this.active = active
}
override def toString(): String = {
"PRVertex(value=%f, outEdges.length=%d, active=%s)"
.format(value, outEdges.length, active.toString)
}
}
class PRMessage() extends Message[String] with Serializable {
var targetId: String = _
var value: Double = _
def this(targetId: String, value: Double) {
this()
this.targetId = targetId
this.value = value
}
}
class CustomPartitioner(partitions: Int) extends Partitioner {
def numPartitions: Int = partitions
def getPartition(key: Any): Int = {
val hash = key match {
case k: Long => (k & 0x00000000FFFFFFFFL).toInt
case _ => key.hashCode
}
val mod = key.hashCode % partitions
if (mod < 0) mod + partitions else mod
}
override def equals(other: Any): Boolean = other match {
case c: CustomPartitioner =>
c.numPartitions == numPartitions
case _ => false
}
override def hashCode: Int = numPartitions
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.bagel
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.bagel._
import scala.xml.{XML,NodeSeq}
/**
* Run PageRank on XML Wikipedia dumps from http://wiki.freebase.com/wiki/WEX. Uses the "articles"
* files from there, which contains one line per wiki article in a tab-separated format
* (http://wiki.freebase.com/wiki/WEX/Documentation#articles).
*/
object WikipediaPageRank {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println(
"Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <usePartitioner>")
System.exit(-1)
}
val sparkConf = new SparkConf()
sparkConf.setAppName("WikipediaPageRank")
sparkConf.registerKryoClasses(Array(classOf[PRVertex], classOf[PRMessage]))
val inputFile = args(0)
val threshold = args(1).toDouble
val numPartitions = args(2).toInt
val usePartitioner = args(3).toBoolean
sparkConf.setAppName("WikipediaPageRank")
val sc = new SparkContext(sparkConf)
// Parse the Wikipedia page data into a graph
val input = sc.textFile(inputFile)
println("Counting vertices...")
val numVertices = input.count()
println("Done counting vertices.")
println("Parsing input file...")
var vertices = input.map(line => {
val fields = line.split("\t")
val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
val links =
if (body == "\\N") {
NodeSeq.Empty
} else {
try {
XML.loadString(body) \\ "link" \ "target"
} catch {
case e: org.xml.sax.SAXParseException =>
System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body)
NodeSeq.Empty
}
}
val outEdges = links.map(link => new String(link.text)).toArray
val id = new String(title)
(id, new PRVertex(1.0 / numVertices, outEdges))
})
if (usePartitioner) {
vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache()
} else {
vertices = vertices.cache()
}
println("Done parsing input file.")
// Do the computation
val epsilon = 0.01 / numVertices
val messages = sc.parallelize(Array[(String, PRMessage)]())
val utils = new PageRankUtils
val result =
Bagel.run(
sc, vertices, messages, combiner = new PRCombiner(),
numPartitions = numPartitions)(
utils.computeWithCombiner(numVertices, epsilon))
// Print the result
System.err.println("Articles with PageRank >= " + threshold + ":")
val top =
(result
.filter { case (id, vertex) => vertex.value >= threshold }
.map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) }
.collect().mkString)
println(top)
sc.stop()
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.bagel
import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.xml.{XML, NodeSeq}
import org.apache.spark._
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
object WikipediaPageRankStandalone {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> " +
"<numIterations> <usePartitioner>")
System.exit(-1)
}
val sparkConf = new SparkConf()
sparkConf.set("spark.serializer", "spark.bagel.examples.WPRSerializer")
val inputFile = args(0)
val threshold = args(1).toDouble
val numIterations = args(2).toInt
val usePartitioner = args(3).toBoolean
sparkConf.setAppName("WikipediaPageRankStandalone")
val sc = new SparkContext(sparkConf)
val input = sc.textFile(inputFile)
val partitioner = new HashPartitioner(sc.defaultParallelism)
val links =
if (usePartitioner) {
input.map(parseArticle _).partitionBy(partitioner).cache()
} else {
input.map(parseArticle _).cache()
}
val n = links.count()
val defaultRank = 1.0 / n
val a = 0.15
// Do the computation
val startTime = System.currentTimeMillis
val ranks =
pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner,
sc.defaultParallelism)
// Print the result
System.err.println("Articles with PageRank >= " + threshold + ":")
val top =
(ranks
.filter { case (id, rank) => rank >= threshold }
.map { case (id, rank) => "%s\t%s\n".format(id, rank) }
.collect().mkString)
println(top)
val time = (System.currentTimeMillis - startTime) / 1000.0
println("Completed %d iterations in %f seconds: %f seconds per iteration"
.format(numIterations, time, time / numIterations))
sc.stop()
}
def parseArticle(line: String): (String, Array[String]) = {
val fields = line.split("\t")
val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
val id = new String(title)
val links =
if (body == "\\N") {
NodeSeq.Empty
} else {
try {
XML.loadString(body) \\ "link" \ "target"
} catch {
case e: org.xml.sax.SAXParseException =>
System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body)
NodeSeq.Empty
}
}
val outEdges = links.map(link => new String(link.text)).toArray
(id, outEdges)
}
def pageRank(
links: RDD[(String, Array[String])],
numIterations: Int,
defaultRank: Double,
a: Double,
n: Long,
partitioner: Partitioner,
usePartitioner: Boolean,
numPartitions: Int
): RDD[(String, Double)] = {
var ranks = links.mapValues { edges => defaultRank }
for (i <- 1 to numIterations) {
val contribs = links.groupWith(ranks).flatMap {
case (id, (linksWrapperIterable, rankWrapperIterable)) =>
val linksWrapper = linksWrapperIterable.iterator
val rankWrapper = rankWrapperIterable.iterator
if (linksWrapper.hasNext) {
val linksWrapperHead = linksWrapper.next
if (rankWrapper.hasNext) {
val rankWrapperHead = rankWrapper.next
linksWrapperHead.map(dest => (dest, rankWrapperHead / linksWrapperHead.size))
} else {
linksWrapperHead.map(dest => (dest, defaultRank / linksWrapperHead.size))
}
} else {
Array[(String, Double)]()
}
}
ranks = (contribs.combineByKey((x: Double) => x,
(x: Double, y: Double) => x + y,
(x: Double, y: Double) => x + y,
partitioner)
.mapValues(sum => a/n + (1-a)*sum))
}
ranks
}
}
class WPRSerializer extends org.apache.spark.serializer.Serializer {
def newInstance(): SerializerInstance = new WPRSerializerInstance()
}
class WPRSerializerInstance extends SerializerInstance {
def serialize[T: ClassTag](t: T): ByteBuffer = {
throw new UnsupportedOperationException()
}
def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
throw new UnsupportedOperationException()
}
def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
throw new UnsupportedOperationException()
}
def serializeStream(s: OutputStream): SerializationStream = {
new WPRSerializationStream(s)
}
def deserializeStream(s: InputStream): DeserializationStream = {
new WPRDeserializationStream(s)
}
}
class WPRSerializationStream(os: OutputStream) extends SerializationStream {
val dos = new DataOutputStream(os)
def writeObject[T: ClassTag](t: T): SerializationStream = t match {
case (id: String, wrapper: ArrayBuffer[_]) => wrapper(0) match {
case links: Array[String] => {
dos.writeInt(0) // links
dos.writeUTF(id)
dos.writeInt(links.length)
for (link <- links) {
dos.writeUTF(link)
}
this
}
case rank: Double => {
dos.writeInt(1) // rank
dos.writeUTF(id)
dos.writeDouble(rank)
this
}
}
case (id: String, rank: Double) => {
dos.writeInt(2) // rank without wrapper
dos.writeUTF(id)
dos.writeDouble(rank)
this
}
}
def flush() { dos.flush() }
def close() { dos.close() }
}
class WPRDeserializationStream(is: InputStream) extends DeserializationStream {
val dis = new DataInputStream(is)
def readObject[T: ClassTag](): T = {
val typeId = dis.readInt()
typeId match {
case 0 => {
val id = dis.readUTF()
val numLinks = dis.readInt()
val links = new Array[String](numLinks)
for (i <- 0 until numLinks) {
val link = dis.readUTF()
links(i) = link
}
(id, ArrayBuffer(links)).asInstanceOf[T]
}
case 1 => {
val id = dis.readUTF()
val rank = dis.readDouble()
(id, ArrayBuffer(rank)).asInstanceOf[T]
}
case 2 => {
val id = dis.readUTF()
val rank = dis.readDouble()
(id, rank).asInstanceOf[T]
}
}
}
def close() { dis.close() }
}
......@@ -73,7 +73,7 @@ object OneVsRestExample {
.action((x, c) => c.copy(fracTest = x))
opt[String]("testInput")
.text("input path to test dataset. If given, option fracTest is ignored")
.action((x,c) => c.copy(testInput = Some(x)))
.action((x, c) => c.copy(testInput = Some(x)))
opt[Int]("maxIter")
.text(s"maximum number of iterations for Logistic Regression." +
s" default: ${defaultParams.maxIter}")
......@@ -88,10 +88,10 @@ object OneVsRestExample {
.action((x, c) => c.copy(fitIntercept = x))
opt[Double]("regParam")
.text(s"the regularization parameter for Logistic Regression.")
.action((x,c) => c.copy(regParam = Some(x)))
.action((x, c) => c.copy(regParam = Some(x)))
opt[Double]("elasticNetParam")
.text(s"the ElasticNet mixing parameter for Logistic Regression.")
.action((x,c) => c.copy(elasticNetParam = Some(x)))
.action((x, c) => c.copy(elasticNetParam = Some(x)))
checkConfig { params =>
if (params.fracTest < 0 || params.fracTest >= 1) {
failure(s"fracTest ${params.fracTest} value incorrect; should be in [0,1).")
......
......@@ -40,7 +40,7 @@ object DenseGaussianMixture {
private def run(inputFile: String, k: Int, convergenceTol: Double, maxIterations: Int) {
val conf = new SparkConf().setAppName("Gaussian Mixture Model EM example")
val ctx = new SparkContext(conf)
val ctx = new SparkContext(conf)
val data = ctx.textFile(inputFile).map { line =>
Vectors.dense(line.trim.split(' ').map(_.toDouble))
......
......@@ -36,22 +36,21 @@ object AvroConversionUtil extends Serializable {
return null
}
schema.getType match {
case UNION => unpackUnion(obj, schema)
case ARRAY => unpackArray(obj, schema)
case FIXED => unpackFixed(obj, schema)
case MAP => unpackMap(obj, schema)
case BYTES => unpackBytes(obj)
case RECORD => unpackRecord(obj)
case STRING => obj.toString
case ENUM => obj.toString
case NULL => obj
case UNION => unpackUnion(obj, schema)
case ARRAY => unpackArray(obj, schema)
case FIXED => unpackFixed(obj, schema)
case MAP => unpackMap(obj, schema)
case BYTES => unpackBytes(obj)
case RECORD => unpackRecord(obj)
case STRING => obj.toString
case ENUM => obj.toString
case NULL => obj
case BOOLEAN => obj
case DOUBLE => obj
case FLOAT => obj
case INT => obj
case LONG => obj
case other => throw new SparkException(
s"Unknown Avro schema type ${other.getName}")
case DOUBLE => obj
case FLOAT => obj
case INT => obj
case LONG => obj
case other => throw new SparkException(s"Unknown Avro schema type ${other.getName}")
}
}
......
......@@ -104,10 +104,8 @@ extends Actor with ActorHelper {
object FeederActor {
def main(args: Array[String]) {
if(args.length < 2){
System.err.println(
"Usage: FeederActor <hostname> <port>\n"
)
if (args.length < 2){
System.err.println("Usage: FeederActor <hostname> <port>\n")
System.exit(1)
}
val Seq(host, port) = args.toSeq
......
......@@ -51,7 +51,7 @@ object DirectKafkaWordCount {
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
......
......@@ -49,10 +49,10 @@ object KafkaWordCount {
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
......
......@@ -49,7 +49,7 @@ object MQTTPublisher {
client.connect()
val msgtopic = client.getTopic(topic)
val msgtopic = client.getTopic(topic)
val msgContent = "hello mqtt demo for spark streaming"
val message = new MqttMessage(msgContent.getBytes("utf-8"))
......
......@@ -57,8 +57,7 @@ object PageViewGenerator {
404 -> .05)
val userZipCode = Map(94709 -> .5,
94117 -> .5)
val userID = Map((1 to 100).map(_ -> .01):_*)
val userID = Map((1 to 100).map(_ -> .01) : _*)
def pickFromDistribution[T](inputMap : Map[T, Double]) : T = {
val rand = new Random().nextDouble()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment