From e49e723392b8a64d30bd90944a748eb6f5ef3a8a Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Wed, 11 Nov 2015 19:32:52 -0800
Subject: [PATCH] [SPARK-11675][SQL] Remove shuffle hash joins.

Author: Reynold Xin <rxin@databricks.com>

Closes #9645 from rxin/SPARK-11675.
---
 .../scala/org/apache/spark/sql/SQLConf.scala  |   9 +-
 .../spark/sql/execution/SparkStrategies.scala |  24 +-
 .../apache/spark/sql/execution/commands.scala |  11 +
 .../execution/joins/ShuffledHashJoin.scala    |  62 ---
 .../joins/ShuffledHashOuterJoin.scala         | 109 ----
 .../org/apache/spark/sql/JoinSuite.scala      | 523 ++++++++----------
 .../org/apache/spark/sql/SQLQuerySuite.scala  |   3 +-
 .../spark/sql/execution/PlannerSuite.scala    |  10 +-
 .../sql/execution/joins/InnerJoinSuite.scala  |  38 --
 .../sql/execution/joins/OuterJoinSuite.scala  |  12 -
 .../execution/metric/SQLMetricsSuite.scala    | 271 ++++-----
 .../spark/sql/hive/StatisticsSuite.scala      |   2 +-
 12 files changed, 357 insertions(+), 717 deletions(-)
 delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
 delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 41d28d448c..f40e603cd1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -353,12 +353,6 @@ private[spark] object SQLConf {
     defaultValue = Some(5 * 60),
     doc = "Timeout in seconds for the broadcast wait time in broadcast joins.")
 
-  // Options that control which operators can be chosen by the query planner.  These should be
-  // considered hints and may be ignored by future versions of Spark SQL.
-  val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin",
-    defaultValue = Some(true),
-    doc = "When true, use sort merge join (as opposed to hash join) by default for large joins.")
-
   // This is only used for the thriftserver
   val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool",
     doc = "Set a Fair Scheduler pool for a JDBC client session")
@@ -469,6 +463,7 @@ private[spark] object SQLConf {
     val TUNGSTEN_ENABLED = "spark.sql.tungsten.enabled"
     val CODEGEN_ENABLED = "spark.sql.codegen"
     val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
+    val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin"
   }
 }
 
@@ -533,8 +528,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
 
   private[spark] def nativeView: Boolean = getConf(NATIVE_VIEW)
 
-  private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
-
   def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
 
   private[spark] def subexpressionEliminationEnabled: Boolean =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 96242f160a..90989f2cee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -73,10 +73,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
    *     [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), then that side
    *     of the join will be broadcasted and the other side will be streamed, with no shuffling
    *     performed. If both sides of the join are eligible to be broadcasted then the
-   * - Sort merge: if the matching join keys are sortable and
-   *     [[org.apache.spark.sql.SQLConf.SORTMERGE_JOIN]] is enabled (default), then sort merge join
-   *     will be used.
-   * - Hash: will be chosen if neither of the above optimizations apply to this join.
+   * - Sort merge: if the matching join keys are sortable.
    */
   object EquiJoinSelection extends Strategy with PredicateHelper {
 
@@ -103,22 +100,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
-        if sqlContext.conf.sortMergeJoinEnabled && RowOrdering.isOrderable(leftKeys) =>
+        if RowOrdering.isOrderable(leftKeys) =>
         val mergeJoin =
           joins.SortMergeJoin(leftKeys, rightKeys, planLater(left), planLater(right))
         condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil
 
-      case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
-        val buildSide =
-          if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
-            joins.BuildRight
-          } else {
-            joins.BuildLeft
-          }
-        val hashJoin = joins.ShuffledHashJoin(
-          leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
-        condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
-
       // --- Outer joins --------------------------------------------------------------------------
 
       case ExtractEquiJoinKeys(
@@ -132,14 +118,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           leftKeys, rightKeys, RightOuter, condition, planLater(left), planLater(right)) :: Nil
 
       case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
-        if sqlContext.conf.sortMergeJoinEnabled && RowOrdering.isOrderable(leftKeys) =>
+        if RowOrdering.isOrderable(leftKeys) =>
         joins.SortMergeOuterJoin(
           leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
 
-      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) =>
-        joins.ShuffledHashOuterJoin(
-          leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
-
       // --- Cases where this strategy does not apply ---------------------------------------------
 
       case _ => Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index e29c281b95..24a79f289a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -148,6 +148,17 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
       }
       (keyValueOutput, runFunc)
 
+      (keyValueOutput, runFunc)
+
+    case Some((SQLConf.Deprecated.SORTMERGE_JOIN, Some(value))) =>
+      val runFunc = (sqlContext: SQLContext) => {
+        logWarning(
+          s"Property ${SQLConf.Deprecated.SORTMERGE_JOIN} is deprecated and " +
+            s"will be ignored. Sort merge join will continue to be used.")
+        Seq(Row(SQLConf.Deprecated.SORTMERGE_JOIN, "true"))
+      }
+      (keyValueOutput, runFunc)
+
     // Configures a single property.
     case Some((key, Some(value))) =>
       val runFunc = (sqlContext: SQLContext) => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
deleted file mode 100644
index 755986af8b..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.joins
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-
-/**
- * Performs an inner hash join of two child relations by first shuffling the data using the join
- * keys.
- */
-case class ShuffledHashJoin(
-    leftKeys: Seq[Expression],
-    rightKeys: Seq[Expression],
-    buildSide: BuildSide,
-    left: SparkPlan,
-    right: SparkPlan)
-  extends BinaryNode with HashJoin {
-
-  override private[sql] lazy val metrics = Map(
-    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
-    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
-  override def outputPartitioning: Partitioning =
-    PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
-
-  override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    val (numBuildRows, numStreamedRows) = buildSide match {
-      case BuildLeft => (longMetric("numLeftRows"), longMetric("numRightRows"))
-      case BuildRight => (longMetric("numRightRows"), longMetric("numLeftRows"))
-    }
-    val numOutputRows = longMetric("numOutputRows")
-
-    buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
-      val hashed = HashedRelation(buildIter, numBuildRows, buildSideKeyGenerator)
-      hashJoin(streamIter, numStreamedRows, hashed, numOutputRows)
-    }
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
deleted file mode 100644
index 6b2cb9d8f6..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.joins
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-
-/**
- * Performs a hash based outer join for two child relations by shuffling the data using
- * the join keys. This operator requires loading the associated partition in both side into memory.
- */
-case class ShuffledHashOuterJoin(
-    leftKeys: Seq[Expression],
-    rightKeys: Seq[Expression],
-    joinType: JoinType,
-    condition: Option[Expression],
-    left: SparkPlan,
-    right: SparkPlan) extends BinaryNode with HashOuterJoin {
-
-  override private[sql] lazy val metrics = Map(
-    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
-    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
-  override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
-
-  override def outputPartitioning: Partitioning = joinType match {
-    case LeftOuter => left.outputPartitioning
-    case RightOuter => right.outputPartitioning
-    case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions)
-    case x =>
-      throw new IllegalArgumentException(s"HashOuterJoin should not take $x as the JoinType")
-  }
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    val numLeftRows = longMetric("numLeftRows")
-    val numRightRows = longMetric("numRightRows")
-    val numOutputRows = longMetric("numOutputRows")
-
-    val joinedRow = new JoinedRow()
-    left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
-      // TODO this probably can be replaced by external sort (sort merged join?)
-      joinType match {
-        case LeftOuter =>
-          val hashed = HashedRelation(rightIter, numRightRows, buildKeyGenerator)
-          val keyGenerator = streamedKeyGenerator
-          val resultProj = resultProjection
-          leftIter.flatMap( currentRow => {
-            numLeftRows += 1
-            val rowKey = keyGenerator(currentRow)
-            joinedRow.withLeft(currentRow)
-            leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey), resultProj, numOutputRows)
-          })
-
-        case RightOuter =>
-          val hashed = HashedRelation(leftIter, numLeftRows, buildKeyGenerator)
-          val keyGenerator = streamedKeyGenerator
-          val resultProj = resultProjection
-          rightIter.flatMap ( currentRow => {
-            numRightRows += 1
-            val rowKey = keyGenerator(currentRow)
-            joinedRow.withRight(currentRow)
-            rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow, resultProj, numOutputRows)
-          })
-
-        case FullOuter =>
-          // TODO(davies): use UnsafeRow
-          val leftHashTable =
-            buildHashTable(leftIter, numLeftRows, newProjection(leftKeys, left.output)).asScala
-          val rightHashTable =
-            buildHashTable(rightIter, numRightRows, newProjection(rightKeys, right.output)).asScala
-          (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
-            fullOuterIterator(key,
-              leftHashTable.getOrElse(key, EMPTY_LIST),
-              rightHashTable.getOrElse(key, EMPTY_LIST),
-              joinedRow,
-              numOutputRows)
-          }
-
-        case x =>
-          throw new IllegalArgumentException(
-            s"ShuffledHashOuterJoin should not take $x as the JoinType")
-      }
-    }
-  }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 3f3b837f75..9a3c262e94 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -44,8 +44,6 @@ class JoinSuite extends QueryTest with SharedSQLContext {
     val df = sql(sqlString)
     val physical = df.queryExecution.sparkPlan
     val operators = physical.collect {
-      case j: ShuffledHashJoin => j
-      case j: ShuffledHashOuterJoin => j
       case j: LeftSemiJoinHash => j
       case j: BroadcastHashJoin => j
       case j: BroadcastHashOuterJoin => j
@@ -96,75 +94,39 @@ class JoinSuite extends QueryTest with SharedSQLContext {
       ("SELECT * FROM testData full JOIN testData2 ON (key * a != key + a)",
         classOf[BroadcastNestedLoopJoin])
     ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
-      Seq(
-        ("SELECT * FROM testData JOIN testData2 ON key = a", classOf[ShuffledHashJoin]),
-        ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2",
-          classOf[ShuffledHashJoin]),
-        ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2",
-          classOf[ShuffledHashJoin]),
-        ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[ShuffledHashOuterJoin]),
-        ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
-          classOf[ShuffledHashOuterJoin]),
-        ("SELECT * FROM testData right join testData2 ON key = a and key = 2",
-          classOf[ShuffledHashOuterJoin]),
-        ("SELECT * FROM testData full outer join testData2 ON key = a",
-          classOf[ShuffledHashOuterJoin])
-      ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
-    }
   }
 
-  test("SortMergeJoin shouldn't work on unsortable columns") {
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
-      Seq(
-        ("SELECT * FROM arrayData JOIN complexData ON data = a", classOf[ShuffledHashJoin])
-      ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
-    }
-  }
+//  ignore("SortMergeJoin shouldn't work on unsortable columns") {
+//    Seq(
+//      ("SELECT * FROM arrayData JOIN complexData ON data = a", classOf[ShuffledHashJoin])
+//    ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
+//  }
 
   test("broadcasted hash join operator selection") {
     sqlContext.cacheManager.clearCache()
     sql("CACHE TABLE testData")
-    for (sortMergeJoinEnabled <- Seq(true, false)) {
-      withClue(s"sortMergeJoinEnabled=$sortMergeJoinEnabled") {
-        withSQLConf(SQLConf.SORTMERGE_JOIN.key -> s"$sortMergeJoinEnabled") {
-          Seq(
-            ("SELECT * FROM testData join testData2 ON key = a",
-              classOf[BroadcastHashJoin]),
-            ("SELECT * FROM testData join testData2 ON key = a and key = 2",
-              classOf[BroadcastHashJoin]),
-            ("SELECT * FROM testData join testData2 ON key = a where key = 2",
-              classOf[BroadcastHashJoin])
-          ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
-        }
-      }
-    }
+    Seq(
+      ("SELECT * FROM testData join testData2 ON key = a",
+        classOf[BroadcastHashJoin]),
+      ("SELECT * FROM testData join testData2 ON key = a and key = 2",
+        classOf[BroadcastHashJoin]),
+      ("SELECT * FROM testData join testData2 ON key = a where key = 2",
+        classOf[BroadcastHashJoin])
+    ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
     sql("UNCACHE TABLE testData")
   }
 
   test("broadcasted hash outer join operator selection") {
     sqlContext.cacheManager.clearCache()
     sql("CACHE TABLE testData")
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
-      Seq(
-        ("SELECT * FROM testData LEFT JOIN testData2 ON key = a",
-          classOf[SortMergeOuterJoin]),
-        ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
-          classOf[BroadcastHashOuterJoin]),
-        ("SELECT * FROM testData right join testData2 ON key = a and key = 2",
-          classOf[BroadcastHashOuterJoin])
-      ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
-    }
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
-      Seq(
-        ("SELECT * FROM testData LEFT JOIN testData2 ON key = a",
-          classOf[ShuffledHashOuterJoin]),
-        ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
-          classOf[BroadcastHashOuterJoin]),
-        ("SELECT * FROM testData right join testData2 ON key = a and key = 2",
-          classOf[BroadcastHashOuterJoin])
-      ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
-    }
+    Seq(
+      ("SELECT * FROM testData LEFT JOIN testData2 ON key = a",
+        classOf[SortMergeOuterJoin]),
+      ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
+        classOf[BroadcastHashOuterJoin]),
+      ("SELECT * FROM testData right join testData2 ON key = a and key = 2",
+        classOf[BroadcastHashOuterJoin])
+    ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
     sql("UNCACHE TABLE testData")
   }
 
@@ -237,241 +199,222 @@ class JoinSuite extends QueryTest with SharedSQLContext {
         Row(2, 2, 2, 2) :: Nil)
   }
 
-  def test_outer_join(useSMJ: Boolean): Unit = {
-
-    val algo = if (useSMJ) "SortMergeOuterJoin" else "ShuffledHashOuterJoin"
-
-    test("left outer join: " + algo) {
-      withSQLConf(SQLConf.SORTMERGE_JOIN.key -> useSMJ.toString) {
-
-        checkAnswer(
-          upperCaseData.join(lowerCaseData, $"n" === $"N", "left"),
-          Row(1, "A", 1, "a") ::
-            Row(2, "B", 2, "b") ::
-            Row(3, "C", 3, "c") ::
-            Row(4, "D", 4, "d") ::
-            Row(5, "E", null, null) ::
-            Row(6, "F", null, null) :: Nil)
-
-        checkAnswer(
-          upperCaseData.join(lowerCaseData, $"n" === $"N" && $"n" > 1, "left"),
-          Row(1, "A", null, null) ::
-            Row(2, "B", 2, "b") ::
-            Row(3, "C", 3, "c") ::
-            Row(4, "D", 4, "d") ::
-            Row(5, "E", null, null) ::
-            Row(6, "F", null, null) :: Nil)
-
-        checkAnswer(
-          upperCaseData.join(lowerCaseData, $"n" === $"N" && $"N" > 1, "left"),
-          Row(1, "A", null, null) ::
-            Row(2, "B", 2, "b") ::
-            Row(3, "C", 3, "c") ::
-            Row(4, "D", 4, "d") ::
-            Row(5, "E", null, null) ::
-            Row(6, "F", null, null) :: Nil)
-
-        checkAnswer(
-          upperCaseData.join(lowerCaseData, $"n" === $"N" && $"l" > $"L", "left"),
-          Row(1, "A", 1, "a") ::
-            Row(2, "B", 2, "b") ::
-            Row(3, "C", 3, "c") ::
-            Row(4, "D", 4, "d") ::
-            Row(5, "E", null, null) ::
-            Row(6, "F", null, null) :: Nil)
-
-        // Make sure we are choosing left.outputPartitioning as the
-        // outputPartitioning for the outer join operator.
-        checkAnswer(
-          sql(
-            """
-            |SELECT l.N, count(*)
-            |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a)
-            |GROUP BY l.N
-          """.
-              stripMargin),
-        Row(1, 1) ::
-          Row(2, 1) ::
-          Row(3, 1) ::
-          Row(4, 1) ::
-          Row(5, 1) ::
-          Row(6, 1) :: Nil)
-
-        checkAnswer(
-          sql(
-            """
-              |SELECT r.a, count(*)
-              |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a)
-              |GROUP BY r.a
-            """.stripMargin),
-          Row(null, 6) :: Nil)
-      }
-    }
+  test("left outer join") {
+    checkAnswer(
+      upperCaseData.join(lowerCaseData, $"n" === $"N", "left"),
+      Row(1, "A", 1, "a") ::
+        Row(2, "B", 2, "b") ::
+        Row(3, "C", 3, "c") ::
+        Row(4, "D", 4, "d") ::
+        Row(5, "E", null, null) ::
+        Row(6, "F", null, null) :: Nil)
 
-    test("right outer join: " + algo) {
-      withSQLConf(SQLConf.SORTMERGE_JOIN.key -> useSMJ.toString) {
-        checkAnswer(
-          lowerCaseData.join(upperCaseData, $"n" === $"N", "right"),
-          Row(1, "a", 1, "A") ::
-            Row(2, "b", 2, "B") ::
-            Row(3, "c", 3, "C") ::
-            Row(4, "d", 4, "D") ::
-            Row(null, null, 5, "E") ::
-            Row(null, null, 6, "F") :: Nil)
-        checkAnswer(
-          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"n" > 1, "right"),
-          Row(null, null, 1, "A") ::
-            Row(2, "b", 2, "B") ::
-            Row(3, "c", 3, "C") ::
-            Row(4, "d", 4, "D") ::
-            Row(null, null, 5, "E") ::
-            Row(null, null, 6, "F") :: Nil)
-        checkAnswer(
-          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"N" > 1, "right"),
-          Row(null, null, 1, "A") ::
-            Row(2, "b", 2, "B") ::
-            Row(3, "c", 3, "C") ::
-            Row(4, "d", 4, "D") ::
-            Row(null, null, 5, "E") ::
-            Row(null, null, 6, "F") :: Nil)
-        checkAnswer(
-          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"l" > $"L", "right"),
-          Row(1, "a", 1, "A") ::
-            Row(2, "b", 2, "B") ::
-            Row(3, "c", 3, "C") ::
-            Row(4, "d", 4, "D") ::
-            Row(null, null, 5, "E") ::
-            Row(null, null, 6, "F") :: Nil)
-
-        // Make sure we are choosing right.outputPartitioning as the
-        // outputPartitioning for the outer join operator.
-        checkAnswer(
-          sql(
-            """
-              |SELECT l.a, count(*)
-              |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N)
-              |GROUP BY l.a
-            """.stripMargin),
-          Row(null,
-            6))
-
-        checkAnswer(
-          sql(
-            """
-              |SELECT r.N, count(*)
-              |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N)
-              |GROUP BY r.N
-            """.stripMargin),
-          Row(1
-            , 1) ::
-            Row(2, 1) ::
-            Row(3, 1) ::
-            Row(4, 1) ::
-            Row(5, 1) ::
-            Row(6, 1) :: Nil)
-        }
-      }
+    checkAnswer(
+      upperCaseData.join(lowerCaseData, $"n" === $"N" && $"n" > 1, "left"),
+      Row(1, "A", null, null) ::
+        Row(2, "B", 2, "b") ::
+        Row(3, "C", 3, "c") ::
+        Row(4, "D", 4, "d") ::
+        Row(5, "E", null, null) ::
+        Row(6, "F", null, null) :: Nil)
 
-    test("full outer join: " + algo) {
-      withSQLConf(SQLConf.SORTMERGE_JOIN.key -> useSMJ.toString) {
-
-        upperCaseData.where('N <= 4).registerTempTable("left")
-        upperCaseData.where('N >= 3).registerTempTable("right")
-
-        val left = UnresolvedRelation(TableIdentifier("left"), None)
-        val right = UnresolvedRelation(TableIdentifier("right"), None)
-
-        checkAnswer(
-          left.join(right, $"left.N" === $"right.N", "full"),
-          Row(1, "A", null, null) ::
-            Row(2, "B", null, null) ::
-            Row(3, "C", 3, "C") ::
-            Row(4, "D", 4, "D") ::
-            Row(null, null, 5, "E") ::
-            Row(null, null, 6, "F") :: Nil)
-
-        checkAnswer(
-          left.join(right, ($"left.N" === $"right.N") && ($"left.N" !== 3), "full"),
-          Row(1, "A", null, null) ::
-            Row(2, "B", null, null) ::
-            Row(3, "C", null, null) ::
-            Row(null, null, 3, "C") ::
-            Row(4, "D", 4, "D") ::
-            Row(null, null, 5, "E") ::
-            Row(null, null, 6, "F") :: Nil)
-
-        checkAnswer(
-          left.join(right, ($"left.N" === $"right.N") && ($"right.N" !== 3), "full"),
-          Row(1, "A", null, null) ::
-            Row(2, "B", null, null) ::
-            Row(3, "C", null, null) ::
-            Row(null, null, 3, "C") ::
-            Row(4, "D", 4, "D") ::
-            Row(null, null, 5, "E") ::
-            Row(null, null, 6, "F") :: Nil)
-
-        // Make sure we are UnknownPartitioning as the outputPartitioning for the outer join
-        // operator.
-        checkAnswer(
-          sql(
-            """
-            |SELECT l.a, count(*)
-            |FROM allNulls l FULL OUTER JOIN upperCaseData r ON (l.a = r.N)
-            |GROUP BY l.a
-          """.
-              stripMargin),
-        Row(
-          null, 10))
-
-        checkAnswer(
-          sql(
-            """
-              |SELECT r.N, count(*)
-              |FROM allNulls l FULL OUTER JOIN upperCaseData r ON (l.a = r.N)
-              |GROUP BY r.N
-            """.stripMargin),
-          Row
-            (1, 1) ::
-            Row(2, 1) ::
-            Row(3, 1) ::
-            Row(4, 1) ::
-            Row(5, 1) ::
-            Row(6, 1) ::
-            Row(null, 4) :: Nil)
-
-        checkAnswer(
-          sql(
-            """
-              |SELECT l.N, count(*)
-              |FROM upperCaseData l FULL OUTER JOIN allNulls r ON (l.N = r.a)
-              |GROUP BY l.N
-            """.stripMargin),
-          Row(1
-            , 1) ::
-            Row(2, 1) ::
-            Row(3, 1) ::
-            Row(4, 1) ::
-            Row(5, 1) ::
-            Row(6, 1) ::
-            Row(null, 4) :: Nil)
-
-        checkAnswer(
-          sql(
-            """
-            |SELECT r.a, count(*)
-            |FROM upperCaseData l FULL OUTER JOIN allNulls r ON (l.N = r.a)
-            |GROUP BY r.a
-          """.
-              stripMargin),
-          Row(null, 10))
-      }
-    }
+    checkAnswer(
+      upperCaseData.join(lowerCaseData, $"n" === $"N" && $"N" > 1, "left"),
+      Row(1, "A", null, null) ::
+        Row(2, "B", 2, "b") ::
+        Row(3, "C", 3, "c") ::
+        Row(4, "D", 4, "d") ::
+        Row(5, "E", null, null) ::
+        Row(6, "F", null, null) :: Nil)
+
+    checkAnswer(
+      upperCaseData.join(lowerCaseData, $"n" === $"N" && $"l" > $"L", "left"),
+      Row(1, "A", 1, "a") ::
+        Row(2, "B", 2, "b") ::
+        Row(3, "C", 3, "c") ::
+        Row(4, "D", 4, "d") ::
+        Row(5, "E", null, null) ::
+        Row(6, "F", null, null) :: Nil)
+
+    // Make sure we are choosing left.outputPartitioning as the
+    // outputPartitioning for the outer join operator.
+    checkAnswer(
+      sql(
+        """
+        |SELECT l.N, count(*)
+        |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a)
+        |GROUP BY l.N
+      """.
+          stripMargin),
+    Row(1, 1) ::
+      Row(2, 1) ::
+      Row(3, 1) ::
+      Row(4, 1) ::
+      Row(5, 1) ::
+      Row(6, 1) :: Nil)
+
+    checkAnswer(
+      sql(
+        """
+          |SELECT r.a, count(*)
+          |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a)
+          |GROUP BY r.a
+        """.stripMargin),
+      Row(null, 6) :: Nil)
+  }
+
+  test("right outer join") {
+    checkAnswer(
+      lowerCaseData.join(upperCaseData, $"n" === $"N", "right"),
+      Row(1, "a", 1, "A") ::
+        Row(2, "b", 2, "B") ::
+        Row(3, "c", 3, "C") ::
+        Row(4, "d", 4, "D") ::
+        Row(null, null, 5, "E") ::
+        Row(null, null, 6, "F") :: Nil)
+    checkAnswer(
+      lowerCaseData.join(upperCaseData, $"n" === $"N" && $"n" > 1, "right"),
+      Row(null, null, 1, "A") ::
+        Row(2, "b", 2, "B") ::
+        Row(3, "c", 3, "C") ::
+        Row(4, "d", 4, "D") ::
+        Row(null, null, 5, "E") ::
+        Row(null, null, 6, "F") :: Nil)
+    checkAnswer(
+      lowerCaseData.join(upperCaseData, $"n" === $"N" && $"N" > 1, "right"),
+      Row(null, null, 1, "A") ::
+        Row(2, "b", 2, "B") ::
+        Row(3, "c", 3, "C") ::
+        Row(4, "d", 4, "D") ::
+        Row(null, null, 5, "E") ::
+        Row(null, null, 6, "F") :: Nil)
+    checkAnswer(
+      lowerCaseData.join(upperCaseData, $"n" === $"N" && $"l" > $"L", "right"),
+      Row(1, "a", 1, "A") ::
+        Row(2, "b", 2, "B") ::
+        Row(3, "c", 3, "C") ::
+        Row(4, "d", 4, "D") ::
+        Row(null, null, 5, "E") ::
+        Row(null, null, 6, "F") :: Nil)
+
+    // Make sure we are choosing right.outputPartitioning as the
+    // outputPartitioning for the outer join operator.
+    checkAnswer(
+      sql(
+        """
+          |SELECT l.a, count(*)
+          |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N)
+          |GROUP BY l.a
+        """.stripMargin),
+      Row(null,
+        6))
+
+    checkAnswer(
+      sql(
+        """
+          |SELECT r.N, count(*)
+          |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N)
+          |GROUP BY r.N
+        """.stripMargin),
+      Row(1
+        , 1) ::
+        Row(2, 1) ::
+        Row(3, 1) ::
+        Row(4, 1) ::
+        Row(5, 1) ::
+        Row(6, 1) :: Nil)
   }
 
-  // test SortMergeOuterJoin
-  test_outer_join(true)
-  // test ShuffledHashOuterJoin
-  test_outer_join(false)
+  test("full outer join") {
+    upperCaseData.where('N <= 4).registerTempTable("left")
+    upperCaseData.where('N >= 3).registerTempTable("right")
+
+    val left = UnresolvedRelation(TableIdentifier("left"), None)
+    val right = UnresolvedRelation(TableIdentifier("right"), None)
+
+    checkAnswer(
+      left.join(right, $"left.N" === $"right.N", "full"),
+      Row(1, "A", null, null) ::
+        Row(2, "B", null, null) ::
+        Row(3, "C", 3, "C") ::
+        Row(4, "D", 4, "D") ::
+        Row(null, null, 5, "E") ::
+        Row(null, null, 6, "F") :: Nil)
+
+    checkAnswer(
+      left.join(right, ($"left.N" === $"right.N") && ($"left.N" !== 3), "full"),
+      Row(1, "A", null, null) ::
+        Row(2, "B", null, null) ::
+        Row(3, "C", null, null) ::
+        Row(null, null, 3, "C") ::
+        Row(4, "D", 4, "D") ::
+        Row(null, null, 5, "E") ::
+        Row(null, null, 6, "F") :: Nil)
+
+    checkAnswer(
+      left.join(right, ($"left.N" === $"right.N") && ($"right.N" !== 3), "full"),
+      Row(1, "A", null, null) ::
+        Row(2, "B", null, null) ::
+        Row(3, "C", null, null) ::
+        Row(null, null, 3, "C") ::
+        Row(4, "D", 4, "D") ::
+        Row(null, null, 5, "E") ::
+        Row(null, null, 6, "F") :: Nil)
+
+    // Make sure we are UnknownPartitioning as the outputPartitioning for the outer join
+    // operator.
+    checkAnswer(
+      sql(
+        """
+        |SELECT l.a, count(*)
+        |FROM allNulls l FULL OUTER JOIN upperCaseData r ON (l.a = r.N)
+        |GROUP BY l.a
+      """.
+          stripMargin),
+      Row(null, 10))
+
+    checkAnswer(
+      sql(
+        """
+          |SELECT r.N, count(*)
+          |FROM allNulls l FULL OUTER JOIN upperCaseData r ON (l.a = r.N)
+          |GROUP BY r.N
+        """.stripMargin),
+      Row
+        (1, 1) ::
+        Row(2, 1) ::
+        Row(3, 1) ::
+        Row(4, 1) ::
+        Row(5, 1) ::
+        Row(6, 1) ::
+        Row(null, 4) :: Nil)
+
+    checkAnswer(
+      sql(
+        """
+          |SELECT l.N, count(*)
+          |FROM upperCaseData l FULL OUTER JOIN allNulls r ON (l.N = r.a)
+          |GROUP BY l.N
+        """.stripMargin),
+      Row(1
+        , 1) ::
+        Row(2, 1) ::
+        Row(3, 1) ::
+        Row(4, 1) ::
+        Row(5, 1) ::
+        Row(6, 1) ::
+        Row(null, 4) :: Nil)
+
+    checkAnswer(
+      sql(
+        """
+        |SELECT r.a, count(*)
+        |FROM upperCaseData l FULL OUTER JOIN allNulls r ON (l.N = r.a)
+        |GROUP BY r.a
+      """.
+          stripMargin),
+      Row(null, 10))
+  }
 
   test("broadcasted left semi join operator selection") {
     sqlContext.cacheManager.clearCache()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index acabe32c67..52a561d2e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1771,8 +1771,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     // This test is for the fix of https://issues.apache.org/jira/browse/SPARK-10737.
     // This bug will be triggered when Tungsten is enabled and there are multiple
     // SortMergeJoin operators executed in the same task.
-    val confs =
-      SQLConf.SORTMERGE_JOIN.key -> "true" :: SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1" :: Nil
+    val confs = SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1" :: Nil
     withSQLConf(confs: _*) {
       val df1 = (1 to 50).map(i => (s"str_$i", i)).toDF("i", "j")
       val df2 =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 44634dacbd..8c41d79dae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal,
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin}
+import org.apache.spark.sql.execution.joins.{SortMergeJoin, BroadcastHashJoin}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -97,10 +97,10 @@ class PlannerSuite extends SharedSQLContext {
           """.stripMargin).queryExecution.executedPlan
 
         val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
-        val shuffledHashJoins = planned.collect { case join: ShuffledHashJoin => join }
+        val sortMergeJoins = planned.collect { case join: SortMergeJoin => join }
 
         assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
-        assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
+        assert(sortMergeJoins.isEmpty, "Should not use sort merge join")
       }
     }
 
@@ -150,10 +150,10 @@ class PlannerSuite extends SharedSQLContext {
         val planned = a.join(b, $"a.key" === $"b.key").queryExecution.executedPlan
 
         val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
-        val shuffledHashJoins = planned.collect { case join: ShuffledHashJoin => join }
+        val sortMergeJoins = planned.collect { case join: SortMergeJoin => join }
 
         assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
-        assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
+        assert(sortMergeJoins.isEmpty, "Should not use sort merge join")
 
         sqlContext.clearCache()
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
index 066c16e535..2ec1714647 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
@@ -93,20 +93,6 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
       boundCondition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin)
     }
 
-    def makeShuffledHashJoin(
-        leftKeys: Seq[Expression],
-        rightKeys: Seq[Expression],
-        boundCondition: Option[Expression],
-        leftPlan: SparkPlan,
-        rightPlan: SparkPlan,
-        side: BuildSide) = {
-      val shuffledHashJoin =
-        execution.joins.ShuffledHashJoin(leftKeys, rightKeys, side, leftPlan, rightPlan)
-      val filteredJoin =
-        boundCondition.map(Filter(_, shuffledHashJoin)).getOrElse(shuffledHashJoin)
-      EnsureRequirements(sqlContext).apply(filteredJoin)
-    }
-
     def makeSortMergeJoin(
         leftKeys: Seq[Expression],
         rightKeys: Seq[Expression],
@@ -143,30 +129,6 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
       }
     }
 
-    test(s"$testName using ShuffledHashJoin (build=left)") {
-      extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
-        withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
-          checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) =>
-            makeShuffledHashJoin(
-              leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, joins.BuildLeft),
-            expectedAnswer.map(Row.fromTuple),
-            sortAnswers = true)
-        }
-      }
-    }
-
-    test(s"$testName using ShuffledHashJoin (build=right)") {
-      extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
-        withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
-          checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) =>
-            makeShuffledHashJoin(
-              leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, joins.BuildRight),
-            expectedAnswer.map(Row.fromTuple),
-            sortAnswers = true)
-        }
-      }
-    }
-
     test(s"$testName using SortMergeJoin") {
       extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
         withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
index 09e0237a7c..9c80714a9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
@@ -74,18 +74,6 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext {
       ExtractEquiJoinKeys.unapply(join)
     }
 
-    test(s"$testName using ShuffledHashOuterJoin") {
-      extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
-        withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
-          checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
-            EnsureRequirements(sqlContext).apply(
-              ShuffledHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right)),
-            expectedAnswer.map(Row.fromTuple),
-            sortAnswers = true)
-        }
-      }
-    }
-
     if (joinType != FullOuter) {
       test(s"$testName using BroadcastHashOuterJoin") {
         extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 544c1ef303..486bfbbd70 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -169,188 +169,123 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
   test("SortMergeJoin metrics") {
     // Because SortMergeJoin may skip different rows if the number of partitions is different, this
     // test should use the deterministic number of partitions.
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
-      val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
-      testDataForJoin.registerTempTable("testDataForJoin")
-      withTempTable("testDataForJoin") {
-        // Assume the execution plan is
-        // ... -> SortMergeJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
-        val df = sqlContext.sql(
-          "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
-        testSparkPlanMetrics(df, 1, Map(
-          1L -> ("SortMergeJoin", Map(
-            // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
-            "number of left rows" -> 4L,
-            "number of right rows" -> 2L,
-            "number of output rows" -> 4L)))
-        )
-      }
+    val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+    testDataForJoin.registerTempTable("testDataForJoin")
+    withTempTable("testDataForJoin") {
+      // Assume the execution plan is
+      // ... -> SortMergeJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+      val df = sqlContext.sql(
+        "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
+      testSparkPlanMetrics(df, 1, Map(
+        1L -> ("SortMergeJoin", Map(
+          // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
+          "number of left rows" -> 4L,
+          "number of right rows" -> 2L,
+          "number of output rows" -> 4L)))
+      )
     }
   }
 
   test("SortMergeOuterJoin metrics") {
     // Because SortMergeOuterJoin may skip different rows if the number of partitions is different,
     // this test should use the deterministic number of partitions.
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
-      val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
-      testDataForJoin.registerTempTable("testDataForJoin")
-      withTempTable("testDataForJoin") {
-        // Assume the execution plan is
-        // ... -> SortMergeOuterJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
-        val df = sqlContext.sql(
-          "SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
-        testSparkPlanMetrics(df, 1, Map(
-          1L -> ("SortMergeOuterJoin", Map(
-            // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
-            "number of left rows" -> 6L,
-            "number of right rows" -> 2L,
-            "number of output rows" -> 8L)))
-        )
-
-        val df2 = sqlContext.sql(
-          "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a")
-        testSparkPlanMetrics(df2, 1, Map(
-          1L -> ("SortMergeOuterJoin", Map(
-            // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
-            "number of left rows" -> 2L,
-            "number of right rows" -> 6L,
-            "number of output rows" -> 8L)))
-        )
-      }
-    }
-  }
-
-  test("BroadcastHashJoin metrics") {
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
-      val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
-      val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key", "value")
-      // Assume the execution plan is
-      // ... -> BroadcastHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
-      val df = df1.join(broadcast(df2), "key")
-      testSparkPlanMetrics(df, 2, Map(
-        1L -> ("BroadcastHashJoin", Map(
-          "number of left rows" -> 2L,
-          "number of right rows" -> 4L,
-          "number of output rows" -> 2L)))
-      )
-    }
-  }
-
-  test("ShuffledHashJoin metrics") {
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
-      val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
-      testDataForJoin.registerTempTable("testDataForJoin")
-      withTempTable("testDataForJoin") {
-        // Assume the execution plan is
-        // ... -> ShuffledHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
-        val df = sqlContext.sql(
-          "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
-        testSparkPlanMetrics(df, 1, Map(
-          1L -> ("ShuffledHashJoin", Map(
-            "number of left rows" -> 6L,
-            "number of right rows" -> 2L,
-            "number of output rows" -> 4L)))
-        )
-      }
-    }
-  }
-
-  test("ShuffledHashOuterJoin metrics") {
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
-      val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
-      val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
+    val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+    testDataForJoin.registerTempTable("testDataForJoin")
+    withTempTable("testDataForJoin") {
       // Assume the execution plan is
-      // ... -> ShuffledHashOuterJoin(nodeId = 0)
-      val df = df1.join(df2, $"key" === $"key2", "left_outer")
+      // ... -> SortMergeOuterJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+      val df = sqlContext.sql(
+        "SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
       testSparkPlanMetrics(df, 1, Map(
-        0L -> ("ShuffledHashOuterJoin", Map(
-          "number of left rows" -> 3L,
-          "number of right rows" -> 4L,
-          "number of output rows" -> 5L)))
+        1L -> ("SortMergeOuterJoin", Map(
+          // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
+          "number of left rows" -> 6L,
+          "number of right rows" -> 2L,
+          "number of output rows" -> 8L)))
       )
 
-      val df3 = df1.join(df2, $"key" === $"key2", "right_outer")
-      testSparkPlanMetrics(df3, 1, Map(
-        0L -> ("ShuffledHashOuterJoin", Map(
-          "number of left rows" -> 3L,
-          "number of right rows" -> 4L,
-          "number of output rows" -> 6L)))
-      )
-
-      val df4 = df1.join(df2, $"key" === $"key2", "outer")
-      testSparkPlanMetrics(df4, 1, Map(
-        0L -> ("ShuffledHashOuterJoin", Map(
-          "number of left rows" -> 3L,
-          "number of right rows" -> 4L,
-          "number of output rows" -> 7L)))
+      val df2 = sqlContext.sql(
+        "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a")
+      testSparkPlanMetrics(df2, 1, Map(
+        1L -> ("SortMergeOuterJoin", Map(
+          // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
+          "number of left rows" -> 2L,
+          "number of right rows" -> 6L,
+          "number of output rows" -> 8L)))
       )
     }
   }
 
+  test("BroadcastHashJoin metrics") {
+    val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+    val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key", "value")
+    // Assume the execution plan is
+    // ... -> BroadcastHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+    val df = df1.join(broadcast(df2), "key")
+    testSparkPlanMetrics(df, 2, Map(
+      1L -> ("BroadcastHashJoin", Map(
+        "number of left rows" -> 2L,
+        "number of right rows" -> 4L,
+        "number of output rows" -> 2L)))
+    )
+  }
+
   test("BroadcastHashOuterJoin metrics") {
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
-      val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
-      val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
-      // Assume the execution plan is
-      // ... -> BroadcastHashOuterJoin(nodeId = 0)
-      val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer")
-      testSparkPlanMetrics(df, 2, Map(
-        0L -> ("BroadcastHashOuterJoin", Map(
-          "number of left rows" -> 3L,
-          "number of right rows" -> 4L,
-          "number of output rows" -> 5L)))
-      )
+    val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
+    val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
+    // Assume the execution plan is
+    // ... -> BroadcastHashOuterJoin(nodeId = 0)
+    val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer")
+    testSparkPlanMetrics(df, 2, Map(
+      0L -> ("BroadcastHashOuterJoin", Map(
+        "number of left rows" -> 3L,
+        "number of right rows" -> 4L,
+        "number of output rows" -> 5L)))
+    )
 
-      val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer")
-      testSparkPlanMetrics(df3, 2, Map(
-        0L -> ("BroadcastHashOuterJoin", Map(
-          "number of left rows" -> 3L,
-          "number of right rows" -> 4L,
-          "number of output rows" -> 6L)))
-      )
-    }
+    val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer")
+    testSparkPlanMetrics(df3, 2, Map(
+      0L -> ("BroadcastHashOuterJoin", Map(
+        "number of left rows" -> 3L,
+        "number of right rows" -> 4L,
+        "number of output rows" -> 6L)))
+    )
   }
 
   test("BroadcastNestedLoopJoin metrics") {
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
-      val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
-      testDataForJoin.registerTempTable("testDataForJoin")
-      withTempTable("testDataForJoin") {
-        // Assume the execution plan is
-        // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
-        val df = sqlContext.sql(
-          "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
-            "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a")
-        testSparkPlanMetrics(df, 3, Map(
-          1L -> ("BroadcastNestedLoopJoin", Map(
-            "number of left rows" -> 12L, // left needs to be scanned twice
-            "number of right rows" -> 2L,
-            "number of output rows" -> 12L)))
-        )
-      }
+    val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+    testDataForJoin.registerTempTable("testDataForJoin")
+    withTempTable("testDataForJoin") {
+      // Assume the execution plan is
+      // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+      val df = sqlContext.sql(
+        "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
+          "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a")
+      testSparkPlanMetrics(df, 3, Map(
+        1L -> ("BroadcastNestedLoopJoin", Map(
+          "number of left rows" -> 12L, // left needs to be scanned twice
+          "number of right rows" -> 2L,
+          "number of output rows" -> 12L)))
+      )
     }
   }
 
   test("BroadcastLeftSemiJoinHash metrics") {
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
-      val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
-      val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
-      // Assume the execution plan is
-      // ... -> BroadcastLeftSemiJoinHash(nodeId = 0)
-      val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
-      testSparkPlanMetrics(df, 2, Map(
-        0L -> ("BroadcastLeftSemiJoinHash", Map(
-          "number of left rows" -> 2L,
-          "number of right rows" -> 4L,
-          "number of output rows" -> 2L)))
-      )
-    }
+    val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+    val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
+    // Assume the execution plan is
+    // ... -> BroadcastLeftSemiJoinHash(nodeId = 0)
+    val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
+    testSparkPlanMetrics(df, 2, Map(
+      0L -> ("BroadcastLeftSemiJoinHash", Map(
+        "number of left rows" -> 2L,
+        "number of right rows" -> 4L,
+        "number of output rows" -> 2L)))
+    )
   }
 
   test("LeftSemiJoinHash metrics") {
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
       val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
       val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
       // Assume the execution plan is
@@ -366,19 +301,17 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("LeftSemiJoinBNL metrics") {
-    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
-      val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
-      val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
-      // Assume the execution plan is
-      // ... -> LeftSemiJoinBNL(nodeId = 0)
-      val df = df1.join(df2, $"key" < $"key2", "leftsemi")
-      testSparkPlanMetrics(df, 2, Map(
-        0L -> ("LeftSemiJoinBNL", Map(
-          "number of left rows" -> 2L,
-          "number of right rows" -> 4L,
-          "number of output rows" -> 2L)))
-      )
-    }
+    val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+    val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
+    // Assume the execution plan is
+    // ... -> LeftSemiJoinBNL(nodeId = 0)
+    val df = df1.join(df2, $"key" < $"key2", "leftsemi")
+    testSparkPlanMetrics(df, 2, Map(
+      0L -> ("LeftSemiJoinBNL", Map(
+        "number of left rows" -> 2L,
+        "number of right rows" -> 4L,
+        "number of output rows" -> 2L)))
+    )
   }
 
   test("CartesianProduct metrics") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 9bb32f11b7..f775f1e955 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -166,7 +166,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
 
         val shj = df.queryExecution.sparkPlan.collect { case j: SortMergeJoin => j }
         assert(shj.size === 1,
-          "ShuffledHashJoin should be planned when BroadcastHashJoin is turned off")
+          "SortMergeJoin should be planned when BroadcastHashJoin is turned off")
 
         sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=$tmp""")
       }
-- 
GitLab