Skip to content
Snippets Groups Projects
Commit 6662ee21 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-9418][SQL] Use sort-merge join as the default shuffle join.

Sort-merge join is more robust in Spark since sorting can be made using the Tungsten sort operator.

Author: Reynold Xin <rxin@databricks.com>

Closes #7733 from rxin/smj and squashes the following commits:

61e4d34 [Reynold Xin] Fixed test case.
5ffd731 [Reynold Xin] Fixed JoinSuite.
a137dc0 [Reynold Xin] [SPARK-9418][SQL] Use sort-merge join as the default shuffle join.
parent b7f54119
No related branches found
No related tags found
No related merge requests found
...@@ -322,7 +322,7 @@ private[spark] object SQLConf { ...@@ -322,7 +322,7 @@ private[spark] object SQLConf {
" memory.") " memory.")
val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin", val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin",
defaultValue = Some(false), defaultValue = Some(true),
doc = "When true, use sort merge join (as opposed to hash join) by default for large joins.") doc = "When true, use sort merge join (as opposed to hash join) by default for large joins.")
// This is only used for the thriftserver // This is only used for the thriftserver
......
...@@ -79,9 +79,9 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { ...@@ -79,9 +79,9 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key = 2", classOf[CartesianProduct]), ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key = 2", classOf[CartesianProduct]),
("SELECT * FROM testData JOIN testData2 WHERE key > a", classOf[CartesianProduct]), ("SELECT * FROM testData JOIN testData2 WHERE key > a", classOf[CartesianProduct]),
("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key > a", classOf[CartesianProduct]), ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key > a", classOf[CartesianProduct]),
("SELECT * FROM testData JOIN testData2 ON key = a", classOf[ShuffledHashJoin]), ("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[ShuffledHashJoin]), ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[ShuffledHashJoin]), ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]),
("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[ShuffledHashOuterJoin]), ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[ShuffledHashOuterJoin]),
("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2", ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
classOf[ShuffledHashOuterJoin]), classOf[ShuffledHashOuterJoin]),
......
...@@ -23,16 +23,16 @@ import org.apache.spark.sql.SQLConf ...@@ -23,16 +23,16 @@ import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive
/** /**
* Runs the test cases that are included in the hive distribution with sort merge join is true. * Runs the test cases that are included in the hive distribution with hash joins.
*/ */
class SortMergeCompatibilitySuite extends HiveCompatibilitySuite { class HashJoinCompatibilitySuite extends HiveCompatibilitySuite {
override def beforeAll() { override def beforeAll() {
super.beforeAll() super.beforeAll()
TestHive.setConf(SQLConf.SORTMERGE_JOIN, true) TestHive.setConf(SQLConf.SORTMERGE_JOIN, false)
} }
override def afterAll() { override def afterAll() {
TestHive.setConf(SQLConf.SORTMERGE_JOIN, false) TestHive.setConf(SQLConf.SORTMERGE_JOIN, true)
super.afterAll() super.afterAll()
} }
......
...@@ -172,7 +172,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { ...@@ -172,7 +172,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
val shj = df.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j } val shj = df.queryExecution.sparkPlan.collect { case j: SortMergeJoin => j }
assert(shj.size === 1, assert(shj.size === 1,
"ShuffledHashJoin should be planned when BroadcastHashJoin is turned off") "ShuffledHashJoin should be planned when BroadcastHashJoin is turned off")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment