Skip to content
Snippets Groups Projects
Commit fb51df0b authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

Added a skewed shuffle test example.

Output per mapper is distributed from 1/numMappers to 1 of numKVPairs.
parent b23d337c
No related branches found
No related tags found
No related merge requests found
...@@ -16,13 +16,12 @@ object GroupByTest { ...@@ -16,13 +16,12 @@ object GroupByTest {
val sc = new SparkContext(args(0), "GroupBy Test") val sc = new SparkContext(args(0), "GroupBy Test")
val ranGen = new Random
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
var arr1 = new Array[(Int, Array[Byte])](numKVPairs) var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) { for (i <- 0 until numKVPairs) {
val byteArr = new Array[Byte] (valSize) val byteArr = new Array[Byte](valSize)
ranGen.nextBytes (byteArr) ranGen.nextBytes(byteArr)
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
} }
arr1 arr1
......
import spark.SparkContext
import spark.SparkContext._
import java.util.Random
object SkewedGroupByTest {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: GroupByTest <host> [numMappers] [numKVPairs] [KeySize] [numReducers]")
System.exit(1)
}
var numMappers = if (args.length > 1) args(1).toInt else 2
var numKVPairs = if (args.length > 2) args(2).toInt else 1000
var valSize = if (args.length > 3) args(3).toInt else 1000
var numReducers = if (args.length > 4) args(4).toInt else numMappers
val sc = new SparkContext(args(0), "GroupBy Test")
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
// map output sizes lineraly increase from the 1st to the last
numKVPairs = (1. * (p + 1) / numMappers * numKVPairs).toInt
var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) {
val byteArr = new Array[Byte](valSize)
ranGen.nextBytes(byteArr)
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
}.cache
// Enforce that everything has been calculated and in cache
pairs1.count
println(pairs1.groupByKey(numReducers).count)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment