Skip to content
Snippets Groups Projects
Commit 235d2833 authored by caoxuewen's avatar caoxuewen Committed by gatorsmile
Browse files

[MINOR][SQL][TEST] Test shuffle hash join while is not expected

## What changes were proposed in this pull request?

igore("shuffle hash join") is to shuffle hash join to test _case class ShuffledHashJoinExec_.
But when you 'ignore' -> 'test', the test is _case class BroadcastHashJoinExec_.

Before modified,  as a result of:canBroadcast is true.
Print information in _canBroadcast(plan: LogicalPlan)_
```
canBroadcast plan.stats.sizeInBytes:6710880
canBroadcast conf.autoBroadcastJoinThreshold:10000000
```

After modified, plan.stats.sizeInBytes is 11184808.
Print information in _canBuildLocalHashMap(plan: LogicalPlan)_
and _muchSmaller(a: LogicalPlan, b: LogicalPlan)_ :

```
canBuildLocalHashMap plan.stats.sizeInBytes:11184808
canBuildLocalHashMap conf.autoBroadcastJoinThreshold:10000000
canBuildLocalHashMap conf.numShufflePartitions:2
```
```
muchSmaller a.stats.sizeInBytes * 3:33554424
muchSmaller b.stats.sizeInBytes:33554432
```
## How was this patch tested?

existing test case.

Author: caoxuewen <cao.xuewen@zte.com.cn>

Closes #19069 from heary-cao/shuffle_hash_join.
parent 32d6d9d7
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.benchmark
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
......@@ -35,7 +36,9 @@ class JoinBenchmark extends BenchmarkBase {
val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v"))
runBenchmark("Join w long", N) {
sparkSession.range(N).join(dim, (col("id") % M) === col("k")).count()
val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
/*
......@@ -55,7 +58,9 @@ class JoinBenchmark extends BenchmarkBase {
val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v"))
runBenchmark("Join w long duplicated", N) {
val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as long) as k"))
sparkSession.range(N).join(dim, (col("id") % M) === col("k")).count()
val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
/*
......@@ -75,9 +80,11 @@ class JoinBenchmark extends BenchmarkBase {
.selectExpr("cast(id as int) as k1", "cast(id as int) as k2", "cast(id as string) as v"))
runBenchmark("Join w 2 ints", N) {
sparkSession.range(N).join(dim2,
val df = sparkSession.range(N).join(dim2,
(col("id") % M).cast(IntegerType) === col("k1")
&& (col("id") % M).cast(IntegerType) === col("k2")).count()
&& (col("id") % M).cast(IntegerType) === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
/*
......@@ -97,9 +104,10 @@ class JoinBenchmark extends BenchmarkBase {
.selectExpr("id as k1", "id as k2", "cast(id as string) as v"))
runBenchmark("Join w 2 longs", N) {
sparkSession.range(N).join(dim3,
val df = sparkSession.range(N).join(dim3,
(col("id") % M) === col("k1") && (col("id") % M) === col("k2"))
.count()
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
/*
......@@ -119,9 +127,10 @@ class JoinBenchmark extends BenchmarkBase {
.selectExpr("cast(id/10 as long) as k1", "cast(id/10 as long) as k2"))
runBenchmark("Join w 2 longs duplicated", N) {
sparkSession.range(N).join(dim4,
val df = sparkSession.range(N).join(dim4,
(col("id") bitwiseAND M) === col("k1") && (col("id") bitwiseAND M) === col("k2"))
.count()
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
/*
......@@ -138,7 +147,9 @@ class JoinBenchmark extends BenchmarkBase {
val M = 1 << 16
val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v"))
runBenchmark("outer join w long", N) {
sparkSession.range(N).join(dim, (col("id") % M) === col("k"), "left").count()
val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"), "left")
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
/*
......@@ -156,7 +167,9 @@ class JoinBenchmark extends BenchmarkBase {
val M = 1 << 16
val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v"))
runBenchmark("semi join w long", N) {
sparkSession.range(N).join(dim, (col("id") % M) === col("k"), "leftsemi").count()
val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"), "leftsemi")
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}
/*
......@@ -174,7 +187,9 @@ class JoinBenchmark extends BenchmarkBase {
runBenchmark("merge join", N) {
val df1 = sparkSession.range(N).selectExpr(s"id * 2 as k1")
val df2 = sparkSession.range(N).selectExpr(s"id * 3 as k2")
df1.join(df2, col("k1") === col("k2")).count()
val df = df1.join(df2, col("k1") === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
df.count()
}
/*
......@@ -193,7 +208,9 @@ class JoinBenchmark extends BenchmarkBase {
.selectExpr(s"(id * 15485863) % ${N*10} as k1")
val df2 = sparkSession.range(N)
.selectExpr(s"(id * 15485867) % ${N*10} as k2")
df1.join(df2, col("k1") === col("k2")).count()
val df = df1.join(df2, col("k1") === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
df.count()
}
/*
......@@ -212,18 +229,19 @@ class JoinBenchmark extends BenchmarkBase {
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
runBenchmark("shuffle hash join", N) {
val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")
df1.join(df2, col("k1") === col("k2")).count()
val df2 = sparkSession.range(N / 3).selectExpr(s"id * 3 as k2")
val df = df1.join(df2, col("k1") === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
df.count()
}
/*
*Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
*Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
*Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
*Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
*shuffle hash join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
*-------------------------------------------------------------------------------------------
*shuffle hash join codegen=false 1101 / 1391 3.8 262.6 1.0X
*shuffle hash join codegen=true 528 / 578 7.9 125.8 2.1X
*shuffle hash join codegen=false 2005 / 2010 2.1 478.0 1.0X
*shuffle hash join codegen=true 1773 / 1792 2.4 422.7 1.1X
*/
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment