Skip to content
Snippets Groups Projects
Commit 4df65184 authored by Sameer Agarwal's avatar Sameer Agarwal Committed by Yin Huai
Browse files

[SPARK-14620][SQL] Use/benchmark a better hash in VectorizedHashMap

## What changes were proposed in this pull request?

This PR uses a better hashing algorithm while probing the AggregateHashMap:

```java
long h = 0
h = (h ^ (0x9e3779b9)) + key_1 + (h << 6) + (h >>> 2);
h = (h ^ (0x9e3779b9)) + key_2 + (h << 6) + (h >>> 2);
h = (h ^ (0x9e3779b9)) + key_3 + (h << 6) + (h >>> 2);
...
h = (h ^ (0x9e3779b9)) + key_n + (h << 6) + (h >>> 2);
return h
```

Depends on: https://github.com/apache/spark/pull/12345
## How was this patch tested?

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
    Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz
    Aggregate w keys:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    codegen = F                              2417 / 2457          8.7         115.2       1.0X
    codegen = T hashmap = F                  1554 / 1581         13.5          74.1       1.6X
    codegen = T hashmap = T                   877 /  929         23.9          41.8       2.8X

Author: Sameer Agarwal <sameer@databricks.com>

Closes #12379 from sameeragarwal/hash.
parent 8028a288
No related branches found
No related tags found
No related merge requests found
......@@ -86,28 +86,21 @@ class VectorizedHashMapGenerator(
| private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch;
| private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch;
| private int[] buckets;
| private int numBuckets;
| private int maxSteps;
| private int capacity = 1 << 16;
| private double loadFactor = 0.5;
| private int numBuckets = (int) (capacity / loadFactor);
| private int maxSteps = 2;
| private int numRows = 0;
| private org.apache.spark.sql.types.StructType schema = $generatedSchema
| private org.apache.spark.sql.types.StructType aggregateBufferSchema =
| $generatedAggBufferSchema
|
| public $generatedClassName() {
| // TODO: These should be generated based on the schema
| int DEFAULT_CAPACITY = 1 << 16;
| double DEFAULT_LOAD_FACTOR = 0.25;
| int DEFAULT_MAX_STEPS = 2;
| assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & (DEFAULT_CAPACITY - 1)) == 0));
| this.maxSteps = DEFAULT_MAX_STEPS;
| numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR);
|
| batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
| org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
|
| org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
| // TODO: Possibly generate this projection in TungstenAggregate directly
| aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
| aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
| aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
| for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
| aggregateBufferBatch.setColumn(i, batch.column(i+${groupingKeys.length}));
| }
......@@ -130,9 +123,11 @@ class VectorizedHashMapGenerator(
*/
private def generateHashFunction(): String = {
s"""
|// TODO: Improve this hash function
|private long hash($groupingKeySignature) {
| return ${groupingKeys.map(_._2).mkString(" | ")};
| long h = 0;
| ${groupingKeys.map(key => s"h = (h ^ (0x9e3779b9)) + ${key._2} + (h << 6) + (h >>> 2);")
.mkString("\n")}
| return h;
|}
""".stripMargin
}
......@@ -201,15 +196,20 @@ class VectorizedHashMapGenerator(
| while (step < maxSteps) {
| // Return bucket index if it's either an empty slot or already contains the key
| if (buckets[idx] == -1) {
| ${groupingKeys.zipWithIndex.map(k =>
s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")}
| ${bufferValues.zipWithIndex.map(k =>
s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);")
.mkString("\n")}
| buckets[idx] = numRows++;
| batch.setNumRows(numRows);
| aggregateBufferBatch.setNumRows(numRows);
| return aggregateBufferBatch.getRow(buckets[idx]);
| if (numRows < capacity) {
| ${groupingKeys.zipWithIndex.map(k =>
s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")}
| ${bufferValues.zipWithIndex.map(k =>
s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);")
.mkString("\n")}
| buckets[idx] = numRows++;
| batch.setNumRows(numRows);
| aggregateBufferBatch.setNumRows(numRows);
| return aggregateBufferBatch.getRow(buckets[idx]);
| } else {
| // No more space
| return null;
| }
| } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) {
| return aggregateBufferBatch.getRow(buckets[idx]);
| }
......
......@@ -150,7 +150,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
*/
}
ignore("aggregate with keys") {
ignore("aggregate with linear keys") {
val N = 20 << 20
val benchmark = new Benchmark("Aggregate w keys", N)
......@@ -180,9 +180,47 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
codegen = F 2219 / 2392 9.4 105.8 1.0X
codegen = T hashmap = F 1330 / 1466 15.8 63.4 1.7X
codegen = T hashmap = T 384 / 518 54.7 18.3 5.8X
codegen = F 2067 / 2166 10.1 98.6 1.0X
codegen = T hashmap = F 1149 / 1321 18.3 54.8 1.8X
codegen = T hashmap = T 388 / 475 54.0 18.5 5.3X
*/
}
ignore("aggregate with randomized keys") {
val N = 20 << 20
val benchmark = new Benchmark("Aggregate w keys", N)
sqlContext.range(N).selectExpr("id", "floor(rand() * 10000) as k").registerTempTable("test")
def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, k").collect()
benchmark.addCase(s"codegen = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
f()
}
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
f()
}
benchmark.run()
/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
codegen = F 2517 / 2608 8.3 120.0 1.0X
codegen = T hashmap = F 1484 / 1560 14.1 70.8 1.7X
codegen = T hashmap = T 794 / 908 26.4 37.9 3.2X
*/
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment