diff --git a/src/examples/GroupByTest.scala b/src/examples/GroupByTest.scala index 2c62f28b4fa4c6dd2581a4c59088ad7aa790fd1a..b3f8b4396a91cb29cc61bc0d552128603261d562 100644 --- a/src/examples/GroupByTest.scala +++ b/src/examples/GroupByTest.scala @@ -5,25 +5,20 @@ import java.util.Random object GroupByTest { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: GroupByTest <host> [numSlices] [numKVPairs] [KeySize]") + System.err.println("Usage: GroupByTest <host> [numMappers] [numKVPairs] [KeySize] [numReducers]") System.exit(1) } - var numSlices = if (args.length > 1) args(1).toInt else 2 + var numMappers = if (args.length > 1) args(1).toInt else 2 var numKVPairs = if (args.length > 2) args(2).toInt else 1000 var valSize = if (args.length > 3) args(3).toInt else 1000 + var numReducers = if (args.length > 4) args(4).toInt else numMappers val sc = new SparkContext(args(0), "GroupBy Test") -// import java.util.Random -// -// var numSlices = 6 -// var numKVPairs = 1000 -// var valSize = 500000 - val ranGen = new Random - val pairs1 = sc.parallelize(0 until numSlices, numSlices).flatMap { p => + val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => var arr1 = new Array[(Int, Array[Byte])](numKVPairs) for (i <- 0 until numKVPairs) { val byteArr = new Array[Byte] (valSize) @@ -35,21 +30,7 @@ object GroupByTest { // Enforce that everything has been calculated and in cache pairs1.count -// val pairs2 = sc.parallelize(0 until numSlices, numSlices).flatMap { p => -// var arr2 = new Array[(Int, Array[Byte])](numKVPairs) -// for (i <- 0 until numKVPairs) { -// val byteArr = new Array[Byte] (valSize) -// ranGen.nextBytes (byteArr) -// arr2(i) = (ranGen.nextInt(Int.MaxValue), byteArr) -// } -// arr2 -// }.cache -// // Enforce that everything has been calculated and in cache -// pairs2.count - - println(pairs1.groupByKey(numSlices).count) -// pairs2.groupByKey(numSlices).count - -// pairs1.join(pairs2) + println(pairs1.groupByKey(numReducers).count) } } +