From 2404d8e54b6b2cfc78d892e7ebb31578457518a3 Mon Sep 17 00:00:00 2001 From: Yin Huai <yhuai@databricks.com> Date: Tue, 27 Dec 2016 10:03:52 -0800 Subject: [PATCH] Revert "[SPARK-18990][SQL] make DatasetBenchmark fairer for Dataset" This reverts commit a05cc425a0a7d18570b99883993a04ad175aa071. --- .../apache/spark/sql/DatasetBenchmark.scala | 75 ++++++++----------- 1 file changed, 33 insertions(+), 42 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala index cd925e630e..66d94d6016 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ @@ -33,13 +34,11 @@ object DatasetBenchmark { def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { import spark.implicits._ - val rdd = spark.sparkContext.range(0, numRows) - val ds = spark.range(0, numRows) - val df = ds.toDF("l") - val func = (l: Long) => l + 1 - + val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) val benchmark = new Benchmark("back-to-back map", numRows) + val func = (d: Data) => Data(d.l + 1, d.s) + val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString)) benchmark.addCase("RDD") { iter => var res = rdd var i = 0 @@ -54,14 +53,14 @@ object DatasetBenchmark { var res = df var i = 0 while (i < numChains) { - res = res.select($"l" + 1 as "l") + res = res.select($"l" + 1 as "l", $"s") i += 1 } res.queryExecution.toRdd.foreach(_ => Unit) } benchmark.addCase("Dataset") { iter => - var res = ds.as[Long] + var res = df.as[Data] var i = 0 while (i < numChains) { res = res.map(func) @@ -76,14 +75,14 @@ object DatasetBenchmark { def backToBackFilter(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { import spark.implicits._ - val rdd = spark.sparkContext.range(0, numRows) - val ds = spark.range(0, numRows) - val df = ds.toDF("l") - val func = (l: Long, i: Int) => l % (100L + i) == 0L - val funcs = 0.until(numChains).map { i => (l: Long) => func(l, i) } - + val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) val benchmark = new Benchmark("back-to-back filter", numRows) + val func = (d: Data, i: Int) => d.l % (100L + i) == 0L + val funcs = 0.until(numChains).map { i => + (d: Data) => func(d, i) + } + val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString)) benchmark.addCase("RDD") { iter => var res = rdd var i = 0 @@ -105,7 +104,7 @@ object DatasetBenchmark { } benchmark.addCase("Dataset") { iter => - var res = ds.as[Long] + var res = df.as[Data] var i = 0 while (i < numChains) { res = res.filter(funcs(i)) @@ -134,29 +133,24 @@ object DatasetBenchmark { def aggregate(spark: SparkSession, numRows: Long): Benchmark = { import spark.implicits._ - val rdd = spark.sparkContext.range(0, numRows) - val ds = spark.range(0, numRows) - val df = ds.toDF("l") - + val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) val benchmark = new Benchmark("aggregate", numRows) + val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString)) benchmark.addCase("RDD sum") { iter => - rdd.map(l => (l % 10, l)).reduceByKey(_ + _).foreach(_ => Unit) + rdd.aggregate(0L)(_ + _.l, _ + _) } benchmark.addCase("DataFrame sum") { iter => - df.groupBy($"l" % 10).agg(sum($"l")).queryExecution.toRdd.foreach(_ => Unit) + df.select(sum($"l")).queryExecution.toRdd.foreach(_ => Unit) } benchmark.addCase("Dataset sum using Aggregator") { iter => - val result = ds.as[Long].groupByKey(_ % 10).agg(typed.sumLong[Long](identity)) - result.queryExecution.toRdd.foreach(_ => Unit) + df.as[Data].select(typed.sumLong((d: Data) => d.l)).queryExecution.toRdd.foreach(_ => Unit) } - val complexDs = df.select($"l", $"l".cast(StringType).as("s")).as[Data] benchmark.addCase("Dataset complex Aggregator") { iter => - val result = complexDs.groupByKey(_.l % 10).agg(ComplexAggregator.toColumn) - result.queryExecution.toRdd.foreach(_ => Unit) + df.as[Data].select(ComplexAggregator.toColumn).queryExecution.toRdd.foreach(_ => Unit) } benchmark @@ -176,39 +170,36 @@ object DatasetBenchmark { val benchmark3 = aggregate(spark, numRows) /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - + OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64 + Intel Xeon E3-12xx v2 (Ivy Bridge) back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - RDD 3963 / 3976 25.2 39.6 1.0X - DataFrame 826 / 834 121.1 8.3 4.8X - Dataset 5178 / 5198 19.3 51.8 0.8X + RDD 3448 / 3646 29.0 34.5 1.0X + DataFrame 2647 / 3116 37.8 26.5 1.3X + Dataset 4781 / 5155 20.9 47.8 0.7X */ benchmark.run() /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - + OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64 + Intel Xeon E3-12xx v2 (Ivy Bridge) back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - RDD 533 / 587 187.6 5.3 1.0X - DataFrame 79 / 91 1269.0 0.8 6.8X - Dataset 550 / 559 181.7 5.5 1.0X + RDD 1346 / 1618 74.3 13.5 1.0X + DataFrame 59 / 72 1695.4 0.6 22.8X + Dataset 2777 / 2805 36.0 27.8 0.5X */ benchmark2.run() /* Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1 Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - RDD sum 2297 / 2440 43.5 23.0 1.0X - DataFrame sum 630 / 637 158.7 6.3 3.6X - Dataset sum using Aggregator 3129 / 3247 32.0 31.3 0.7X - Dataset complex Aggregator 12109 / 12142 8.3 121.1 0.2X + RDD sum 1913 / 1942 52.3 19.1 1.0X + DataFrame sum 46 / 61 2157.7 0.5 41.3X + Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X + Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X */ benchmark3.run() } -- GitLab