diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala index 66d94d6016050fa430bc2ff31f553c7c6e38f4fb..cd925e630e7bdc630491ccf7050419c841be5de6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ @@ -34,11 +33,13 @@ object DatasetBenchmark { def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { import spark.implicits._ - val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) + val rdd = spark.sparkContext.range(0, numRows) + val ds = spark.range(0, numRows) + val df = ds.toDF("l") + val func = (l: Long) => l + 1 + val benchmark = new Benchmark("back-to-back map", numRows) - val func = (d: Data) => Data(d.l + 1, d.s) - val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString)) benchmark.addCase("RDD") { iter => var res = rdd var i = 0 @@ -53,14 +54,14 @@ object DatasetBenchmark { var res = df var i = 0 while (i < numChains) { - res = res.select($"l" + 1 as "l", $"s") + res = res.select($"l" + 1 as "l") i += 1 } res.queryExecution.toRdd.foreach(_ => Unit) } benchmark.addCase("Dataset") { iter => - var res = df.as[Data] + var res = ds.as[Long] var i = 0 while (i < numChains) { res = res.map(func) @@ -75,14 +76,14 @@ object DatasetBenchmark { def backToBackFilter(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { import spark.implicits._ - val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) + val rdd = spark.sparkContext.range(0, numRows) + val ds = spark.range(0, numRows) + val df = ds.toDF("l") + val func = (l: Long, i: Int) => l % (100L + i) == 0L + val funcs = 0.until(numChains).map { i => (l: Long) => func(l, i) } + val benchmark = new Benchmark("back-to-back filter", numRows) - val func = (d: Data, i: Int) => d.l % (100L + i) == 0L - val funcs = 0.until(numChains).map { i => - (d: Data) => func(d, i) - } - val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString)) benchmark.addCase("RDD") { iter => var res = rdd var i = 0 @@ -104,7 +105,7 @@ object DatasetBenchmark { } benchmark.addCase("Dataset") { iter => - var res = df.as[Data] + var res = ds.as[Long] var i = 0 while (i < numChains) { res = res.filter(funcs(i)) @@ -133,24 +134,29 @@ object DatasetBenchmark { def aggregate(spark: SparkSession, numRows: Long): Benchmark = { import spark.implicits._ - val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) + val rdd = spark.sparkContext.range(0, numRows) + val ds = spark.range(0, numRows) + val df = ds.toDF("l") + val benchmark = new Benchmark("aggregate", numRows) - val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString)) benchmark.addCase("RDD sum") { iter => - rdd.aggregate(0L)(_ + _.l, _ + _) + rdd.map(l => (l % 10, l)).reduceByKey(_ + _).foreach(_ => Unit) } benchmark.addCase("DataFrame sum") { iter => - df.select(sum($"l")).queryExecution.toRdd.foreach(_ => Unit) + df.groupBy($"l" % 10).agg(sum($"l")).queryExecution.toRdd.foreach(_ => Unit) } benchmark.addCase("Dataset sum using Aggregator") { iter => - df.as[Data].select(typed.sumLong((d: Data) => d.l)).queryExecution.toRdd.foreach(_ => Unit) + val result = ds.as[Long].groupByKey(_ % 10).agg(typed.sumLong[Long](identity)) + result.queryExecution.toRdd.foreach(_ => Unit) } + val complexDs = df.select($"l", $"l".cast(StringType).as("s")).as[Data] benchmark.addCase("Dataset complex Aggregator") { iter => - df.as[Data].select(ComplexAggregator.toColumn).queryExecution.toRdd.foreach(_ => Unit) + val result = complexDs.groupByKey(_.l % 10).agg(ComplexAggregator.toColumn) + result.queryExecution.toRdd.foreach(_ => Unit) } benchmark @@ -170,36 +176,39 @@ object DatasetBenchmark { val benchmark3 = aggregate(spark, numRows) /* - OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64 - Intel Xeon E3-12xx v2 (Ivy Bridge) + Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - RDD 3448 / 3646 29.0 34.5 1.0X - DataFrame 2647 / 3116 37.8 26.5 1.3X - Dataset 4781 / 5155 20.9 47.8 0.7X + RDD 3963 / 3976 25.2 39.6 1.0X + DataFrame 826 / 834 121.1 8.3 4.8X + Dataset 5178 / 5198 19.3 51.8 0.8X */ benchmark.run() /* - OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64 - Intel Xeon E3-12xx v2 (Ivy Bridge) + Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - RDD 1346 / 1618 74.3 13.5 1.0X - DataFrame 59 / 72 1695.4 0.6 22.8X - Dataset 2777 / 2805 36.0 27.8 0.5X + RDD 533 / 587 187.6 5.3 1.0X + DataFrame 79 / 91 1269.0 0.8 6.8X + Dataset 550 / 559 181.7 5.5 1.0X */ benchmark2.run() /* Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1 Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - RDD sum 1913 / 1942 52.3 19.1 1.0X - DataFrame sum 46 / 61 2157.7 0.5 41.3X - Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X - Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X + RDD sum 2297 / 2440 43.5 23.0 1.0X + DataFrame sum 630 / 637 158.7 6.3 3.6X + Dataset sum using Aggregator 3129 / 3247 32.0 31.3 0.7X + Dataset complex Aggregator 12109 / 12142 8.3 121.1 0.2X */ benchmark3.run() }