Skip to content
Snippets Groups Projects
Commit 2404d8e5 authored by Yin Huai's avatar Yin Huai
Browse files

Revert "[SPARK-18990][SQL] make DatasetBenchmark fairer for Dataset"

This reverts commit a05cc425.
parent a05cc425
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,7 @@
package org.apache.spark.sql
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.expressions.scalalang.typed
import org.apache.spark.sql.functions._
......@@ -33,13 +34,11 @@ object DatasetBenchmark {
def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = {
import spark.implicits._
val rdd = spark.sparkContext.range(0, numRows)
val ds = spark.range(0, numRows)
val df = ds.toDF("l")
val func = (l: Long) => l + 1
val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
val benchmark = new Benchmark("back-to-back map", numRows)
val func = (d: Data) => Data(d.l + 1, d.s)
val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
benchmark.addCase("RDD") { iter =>
var res = rdd
var i = 0
......@@ -54,14 +53,14 @@ object DatasetBenchmark {
var res = df
var i = 0
while (i < numChains) {
res = res.select($"l" + 1 as "l")
res = res.select($"l" + 1 as "l", $"s")
i += 1
}
res.queryExecution.toRdd.foreach(_ => Unit)
}
benchmark.addCase("Dataset") { iter =>
var res = ds.as[Long]
var res = df.as[Data]
var i = 0
while (i < numChains) {
res = res.map(func)
......@@ -76,14 +75,14 @@ object DatasetBenchmark {
def backToBackFilter(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = {
import spark.implicits._
val rdd = spark.sparkContext.range(0, numRows)
val ds = spark.range(0, numRows)
val df = ds.toDF("l")
val func = (l: Long, i: Int) => l % (100L + i) == 0L
val funcs = 0.until(numChains).map { i => (l: Long) => func(l, i) }
val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
val benchmark = new Benchmark("back-to-back filter", numRows)
val func = (d: Data, i: Int) => d.l % (100L + i) == 0L
val funcs = 0.until(numChains).map { i =>
(d: Data) => func(d, i)
}
val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
benchmark.addCase("RDD") { iter =>
var res = rdd
var i = 0
......@@ -105,7 +104,7 @@ object DatasetBenchmark {
}
benchmark.addCase("Dataset") { iter =>
var res = ds.as[Long]
var res = df.as[Data]
var i = 0
while (i < numChains) {
res = res.filter(funcs(i))
......@@ -134,29 +133,24 @@ object DatasetBenchmark {
def aggregate(spark: SparkSession, numRows: Long): Benchmark = {
import spark.implicits._
val rdd = spark.sparkContext.range(0, numRows)
val ds = spark.range(0, numRows)
val df = ds.toDF("l")
val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
val benchmark = new Benchmark("aggregate", numRows)
val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
benchmark.addCase("RDD sum") { iter =>
rdd.map(l => (l % 10, l)).reduceByKey(_ + _).foreach(_ => Unit)
rdd.aggregate(0L)(_ + _.l, _ + _)
}
benchmark.addCase("DataFrame sum") { iter =>
df.groupBy($"l" % 10).agg(sum($"l")).queryExecution.toRdd.foreach(_ => Unit)
df.select(sum($"l")).queryExecution.toRdd.foreach(_ => Unit)
}
benchmark.addCase("Dataset sum using Aggregator") { iter =>
val result = ds.as[Long].groupByKey(_ % 10).agg(typed.sumLong[Long](identity))
result.queryExecution.toRdd.foreach(_ => Unit)
df.as[Data].select(typed.sumLong((d: Data) => d.l)).queryExecution.toRdd.foreach(_ => Unit)
}
val complexDs = df.select($"l", $"l".cast(StringType).as("s")).as[Data]
benchmark.addCase("Dataset complex Aggregator") { iter =>
val result = complexDs.groupByKey(_.l % 10).agg(ComplexAggregator.toColumn)
result.queryExecution.toRdd.foreach(_ => Unit)
df.as[Data].select(ComplexAggregator.toColumn).queryExecution.toRdd.foreach(_ => Unit)
}
benchmark
......@@ -176,39 +170,36 @@ object DatasetBenchmark {
val benchmark3 = aggregate(spark, numRows)
/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 3963 / 3976 25.2 39.6 1.0X
DataFrame 826 / 834 121.1 8.3 4.8X
Dataset 5178 / 5198 19.3 51.8 0.8X
RDD 3448 / 3646 29.0 34.5 1.0X
DataFrame 2647 / 3116 37.8 26.5 1.3X
Dataset 4781 / 5155 20.9 47.8 0.7X
*/
benchmark.run()
/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 533 / 587 187.6 5.3 1.0X
DataFrame 79 / 91 1269.0 0.8 6.8X
Dataset 550 / 559 181.7 5.5 1.0X
RDD 1346 / 1618 74.3 13.5 1.0X
DataFrame 59 / 72 1695.4 0.6 22.8X
Dataset 2777 / 2805 36.0 27.8 0.5X
*/
benchmark2.run()
/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD sum 2297 / 2440 43.5 23.0 1.0X
DataFrame sum 630 / 637 158.7 6.3 3.6X
Dataset sum using Aggregator 3129 / 3247 32.0 31.3 0.7X
Dataset complex Aggregator 12109 / 12142 8.3 121.1 0.2X
RDD sum 1913 / 1942 52.3 19.1 1.0X
DataFrame sum 46 / 61 2157.7 0.5 41.3X
Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X
Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X
*/
benchmark3.run()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment