Skip to content
Snippets Groups Projects
Commit 945d8bcb authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Josh Rosen
Browse files

[SPARK-9306] [SQL] Don't use SortMergeJoin when joining on unsortable columns

JIRA: https://issues.apache.org/jira/browse/SPARK-9306

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #7645 from viirya/smj_unsortable and squashes the following commits:

a240707 [Liang-Chi Hsieh] Use forall instead of exists for readability.
55221fa [Liang-Chi Hsieh] Shouldn't use SortMergeJoin when joining on unsortable columns.
parent 1efe97dc
No related branches found
No related tags found
No related merge requests found
......@@ -184,7 +184,7 @@ object PartialAggregation {
* A pattern that finds joins with equality conditions that can be evaluated using equi-join.
*/
object ExtractEquiJoinKeys extends Logging with PredicateHelper {
/** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
/** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild) */
type ReturnType =
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
......
......@@ -35,9 +35,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object LeftSemiJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right)
if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
case ExtractEquiJoinKeys(
LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
joins.BroadcastLeftSemiJoinHash(
leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil
// Find left semi joins where at least some predicates can be evaluated by matching join keys
......@@ -90,6 +89,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil
}
private[this] def isValidSort(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression]): Boolean = {
leftKeys.zip(rightKeys).forall { keys =>
(keys._1.dataType, keys._2.dataType) match {
case (l: AtomicType, r: AtomicType) => true
case (NullType, NullType) => true
case _ => false
}
}
}
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight)
......@@ -100,7 +111,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// If the sort merge join option is set, we want to use sort merge join prior to hashjoin
// for now let's support inner join first, then add outer join
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if sqlContext.conf.sortMergeJoinEnabled =>
if sqlContext.conf.sortMergeJoinEnabled && isValidSort(leftKeys, rightKeys) =>
val mergeJoin =
joins.SortMergeJoin(leftKeys, rightKeys, planLater(left), planLater(right))
condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil
......
......@@ -108,6 +108,18 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
}
}
test("SortMergeJoin shouldn't work on unsortable columns") {
val SORTMERGEJOIN_ENABLED: Boolean = ctx.conf.sortMergeJoinEnabled
try {
ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, true)
Seq(
("SELECT * FROM arrayData JOIN complexData ON data = a", classOf[ShuffledHashJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
} finally {
ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED)
}
}
test("broadcasted hash join operator selection") {
ctx.cacheManager.clearCache()
ctx.sql("CACHE TABLE testData")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment