From e2780ce8252ded93a695125c0a745d8b93193cca Mon Sep 17 00:00:00 2001 From: Nong Li <nong@databricks.com> Date: Wed, 2 Mar 2016 15:03:19 -0800 Subject: [PATCH] [SPARK-13574] [SQL] Add benchmark to measure string dictionary decode. ## What changes were proposed in this pull request? Also updated the other benchmarks when the default to use vectorized decode was flipped. Author: Nong Li <nong@databricks.com> Closes #11454 from nongli/benchmark. --- .../parquet/ParquetReadBenchmark.scala | 70 ++++++++++++++----- 1 file changed, 52 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala index 660f0f173a..14dbdf3409 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala @@ -37,6 +37,10 @@ object ParquetReadBenchmark { val sc = new SparkContext("local[1]", "test-sql-context", conf) val sqlContext = new SQLContext(sc) + // Set default configs. Individual cases will change them if necessary. + sqlContext.conf.setConfString(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true") + sqlContext.conf.setConfString(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + def withTempPath(f: File => Unit): Unit = { val path = Utils.createTempDir() path.delete() @@ -72,7 +76,7 @@ object ParquetReadBenchmark { .write.parquet(dir.getCanonicalPath) sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable") - sqlBenchmark.addCase("SQL Parquet Reader") { iter => + sqlBenchmark.addCase("SQL Parquet Vectorized") { iter => sqlContext.sql("select sum(id) from tempTable").collect() } @@ -82,8 +86,8 @@ object ParquetReadBenchmark { } } - sqlBenchmark.addCase("SQL Parquet Vectorized") { iter => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { + sqlBenchmark.addCase("SQL Parquet Non-Vectorized") { iter => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { sqlContext.sql("select sum(id) from tempTable").collect() } } @@ -152,9 +156,9 @@ object ParquetReadBenchmark { Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz SQL Single Int Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - SQL Parquet Reader 1042 / 1208 15.1 66.2 1.0X - SQL Parquet MR 1544 / 1607 10.2 98.2 0.7X - SQL Parquet Vectorized 674 / 739 23.3 42.9 1.5X + SQL Parquet Vectorized 657 / 778 23.9 41.8 1.0X + SQL Parquet MR 1606 / 1731 9.8 102.1 0.4X + SQL Parquet Non-Vectorized 1133 / 1216 13.9 72.1 0.6X */ sqlBenchmark.run() @@ -172,8 +176,6 @@ object ParquetReadBenchmark { } def intStringScanBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Int and String Scan", values) - withTempPath { dir => withTempTable("t1", "tempTable") { sqlContext.range(values).registerTempTable("t1") @@ -183,7 +185,7 @@ object ParquetReadBenchmark { val benchmark = new Benchmark("Int and String Scan", values) - benchmark.addCase("SQL Parquet Reader") { iter => + benchmark.addCase("SQL Parquet Vectorized") { iter => sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect } @@ -193,15 +195,14 @@ object ParquetReadBenchmark { } } - benchmark.addCase("SQL Parquet Vectorized") { iter => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { + benchmark.addCase("SQL Parquet Non-vectorized") { iter => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect } } - val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray - benchmark.addCase("ParquetReader") { num => + benchmark.addCase("ParquetReader Non-vectorized") { num => var sum1 = 0L var sum2 = 0L files.map(_.asInstanceOf[String]).foreach { p => @@ -219,11 +220,43 @@ object ParquetReadBenchmark { /* Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------- - SQL Parquet Reader 1381 / 1679 7.6 131.7 1.0X - SQL Parquet MR 2005 / 2177 5.2 191.2 0.7X - SQL Parquet Vectorized 919 / 1044 11.4 87.6 1.5X - ParquetReader 1035 / 1163 10.1 98.7 1.3X + ------------------------------------------------------------------------------------------- + SQL Parquet Vectorized 1025 / 1180 10.2 97.8 1.0X + SQL Parquet MR 2157 / 2222 4.9 205.7 0.5X + SQL Parquet Non-vectorized 1450 / 1466 7.2 138.3 0.7X + ParquetReader Non-vectorized 1005 / 1022 10.4 95.9 1.0X + */ + benchmark.run() + } + } + } + + def stringDictionaryScanBenchmark(values: Int): Unit = { + withTempPath { dir => + withTempTable("t1", "tempTable") { + sqlContext.range(values).registerTempTable("t1") + sqlContext.sql("select cast((id % 200) + 10000 as STRING) as c1 from t1") + .write.parquet(dir.getCanonicalPath) + sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable") + + val benchmark = new Benchmark("String Dictionary", values) + + benchmark.addCase("SQL Parquet Vectorized") { iter => + sqlContext.sql("select sum(length(c1)) from tempTable").collect + } + + benchmark.addCase("SQL Parquet MR") { iter => + withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") { + sqlContext.sql("select sum(length(c1)) from tempTable").collect + } + } + + /* + Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz + String Dictionary: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + SQL Parquet Vectorized 578 / 593 18.1 55.1 1.0X + SQL Parquet MR 1021 / 1032 10.3 97.4 0.6X */ benchmark.run() } @@ -233,5 +266,6 @@ object ParquetReadBenchmark { def main(args: Array[String]): Unit = { intScanBenchmark(1024 * 1024 * 15) intStringScanBenchmark(1024 * 1024 * 10) + stringDictionaryScanBenchmark(1024 * 1024 * 10) } } -- GitLab