Skip to content
Snippets Groups Projects
Commit 8494b3a4 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

- Added log messages for benchmarking.

- Added GroupByTest.scala for benchmarking.
parent f8ea98d9
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,3 @@ work
.DS_Store
third_party/libmesos.so
third_party/libmesos.dylib
conf/java-opts
conf/spark-env.sh
conf/log4j.properties
-Dspark.shuffle.class=spark.LocalFileShuffle
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
#!/usr/bin/env bash
# Set Spark environment variables for your site in this file. Some useful
# variables to set are:
# - MESOS_HOME, to point to your Mesos installation
# - SCALA_HOME, to point to your Scala installation
# - SPARK_CLASSPATH, to add elements to Spark's classpath
# - SPARK_JAVA_OPTS, to add JVM options
# - SPARK_MEM, to change the amount of memory used per node (this should
# be in the same format as the JVM's -Xmx option, e.g. 300m or 1g).
# - SPARK_LIBRARY_PATH, to add extra search paths for native libraries.
MESOS_HOME=/home/mosharaf/Work/mesos
import spark.SparkContext
import spark.SparkContext._
import java.util.Random
object GroupByTest {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: GroupByTest <host> [numSlices] [numKVPairs] [KeySize]")
System.exit(1)
}
var numSlices = if (args.length > 1) args(1).toInt else 2
var numKVPairs = if (args.length > 2) args(2).toInt else 1000
var valSize = if (args.length > 3) args(3).toInt else 1000
val sc = new SparkContext(args(0), "GroupBy Test")
// import java.util.Random
//
// var numSlices = 6
// var numKVPairs = 1000
// var valSize = 500000
val ranGen = new Random
val pairs1 = sc.parallelize(0 until numSlices, numSlices).flatMap { p =>
var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) {
val byteArr = new Array[Byte] (valSize)
ranGen.nextBytes (byteArr)
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
}.cache
// Enforce that everything has been calculated and in cache
pairs1.count
// val pairs2 = sc.parallelize(0 until numSlices, numSlices).flatMap { p =>
// var arr2 = new Array[(Int, Array[Byte])](numKVPairs)
// for (i <- 0 until numKVPairs) {
// val byteArr = new Array[Byte] (valSize)
// ranGen.nextBytes (byteArr)
// arr2(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
// }
// arr2
// }.cache
// // Enforce that everything has been calculated and in cache
// pairs2.count
println(pairs1.groupByKey(numSlices).count)
// pairs2.groupByKey(numSlices).count
// pairs1.join(pairs2)
}
}
......@@ -73,7 +73,7 @@ extends BroadcastRecipe with Logging {
} else {
// Only a single worker (the first one) in the same node can ever be
// here. The rest will always get the value ready.
val start = System.nanoTime
val start = System.nanoTime
val retByteArray = BroadcastCS.receiveBroadcast (uuid)
// If does not succeed, then get from HDFS copy
......
......@@ -48,9 +48,14 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
for (i <- 0 until numOutputSplits) {
val file = LocalFileShuffle.getOutputFile(shuffleId, myIndex, i)
val writeStartTime = System.currentTimeMillis
logInfo ("BEGIN WRITE: " + file)
val out = new ObjectOutputStream(new FileOutputStream(file))
buckets(i).foreach(pair => out.writeObject(pair))
out.close()
logInfo ("END WRITE: " + file)
val writeTime = (System.currentTimeMillis - writeStartTime)
logInfo ("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.")
}
(myIndex, LocalFileShuffle.serverUri)
}).collect()
......@@ -71,6 +76,8 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) {
for (i <- inputIds) {
val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, myId)
val readStartTime = System.currentTimeMillis
logInfo ("BEGIN READ: " + url)
val inputStream = new ObjectInputStream(new URL(url).openStream())
try {
while (true) {
......@@ -84,6 +91,9 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
case e: EOFException => {}
}
inputStream.close()
logInfo ("END READ: " + url)
val readTime = (System.currentTimeMillis - readStartTime)
logInfo ("Reading " + url + " took " + readTime + " millis.")
}
}
combiners
......@@ -149,6 +159,7 @@ object LocalFileShuffle extends Logging {
serverUri = server.uri
}
initialized = true
logInfo ("Local URI: " + serverUri)
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment