diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index a92450274e07786ba1656295e5b0cb5f7c19fbc0..d7686972d2eecfb4b5d33c04f6b3cac1a1358198 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2276,7 +2276,7 @@ setMethod("join",
           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
           function(x, y, joinExpr = NULL, joinType = NULL) {
             if (is.null(joinExpr)) {
-              sdf <- callJMethod(x@sdf, "join", y@sdf)
+              sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
             } else {
               if (class(joinExpr) != "Column") stop("joinExpr must be a Column")
               if (is.null(joinType)) {
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index a986092f5d634463c60926c2706aca23bd310155..e5eac918a93a0fbd9315471133c0a6fa90bb6a58 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -644,7 +644,7 @@ class DataFrame(object):
             on = [on]
 
         if on is None or len(on) == 0:
-            jdf = self._jdf.join(other._jdf)
+            jdf = self._jdf.crossJoin(other._jdf)
         elif isinstance(on[0], basestring):
             if how is None:
                 jdf = self._jdf.join(other._jdf, self._jseq(on), "inner")
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index a8af840c1e2a2824f625184e7d0d91a0632e0443..0447436ea79765b2299c22f9bcf3142f4b89d997 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -375,7 +375,7 @@ setQuantifier
 
 relation
     : left=relation
-      ((CROSS | joinType) JOIN right=relation joinCriteria?
+      (joinType JOIN right=relation joinCriteria?
       | NATURAL joinType JOIN right=relation
       )                                           #joinRelation
     | relationPrimary                             #relationDefault
@@ -383,6 +383,7 @@ relation
 
 joinType
     : INNER?
+    | CROSS
     | LEFT OUTER?
     | LEFT SEMI
     | RIGHT OUTER?
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 4df100c2a8304c753ab9ade1fd3b55caa23715fb..75ae588c18ec6fd74d1c30c544bde942dcb6a92c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -36,6 +36,12 @@ trait CatalystConf {
 
   def warehousePath: String
 
+  /** If true, cartesian products between relations will be allowed for all
+   * join types(inner, (left|right|full) outer).
+   * If false, cartesian products will require explicit CROSS JOIN syntax.
+   */
+  def crossJoinEnabled: Boolean
+
   /**
    * Returns the [[Resolver]] for the current configuration, which can be used to determine if two
    * identifiers are equal.
@@ -55,5 +61,6 @@ case class SimpleCatalystConf(
     optimizerInSetConversionThreshold: Int = 10,
     maxCaseBranchesForCodegen: Int = 20,
     runSQLonFile: Boolean = true,
+    crossJoinEnabled: Boolean = false,
     warehousePath: String = "/user/hive/warehouse")
   extends CatalystConf
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e559f235c5a387dc75a0cdecfe5198e3a5650ab5..18f814d6cdfd4d8de97d822e743beca3df639a6c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1003,7 +1003,7 @@ class Analyzer(
           failOnOuterReference(j)
           failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN")
           j
-        case j @ Join(_, right, jt, _) if jt != Inner =>
+        case j @ Join(_, right, jt, _) if !jt.isInstanceOf[InnerLike] =>
           failOnOuterReference(j)
           failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN")
           j
@@ -1899,7 +1899,7 @@ class Analyzer(
         joinedCols ++
           lUniqueOutput.map(_.withNullability(true)) ++
           rUniqueOutput.map(_.withNullability(true))
-      case Inner =>
+      case _ : InnerLike =>
         leftKeys ++ lUniqueOutput ++ rUniqueOutput
       case _ =>
         sys.error("Unsupported natural join type " + joinType)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index f6e32e29ebca816f18b8c18e4843ea8de91b0dd0..e81370c504abb0c5ca60729c7f6a49efeb57995e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -94,7 +94,7 @@ object UnsupportedOperationChecker {
 
           joinType match {
 
-            case Inner =>
+            case _: InnerLike =>
               if (left.isStreaming && right.isStreaming) {
                 throwError("Inner join between two streaming DataFrames/Datasets is not supported")
               }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 7617d342618071856f051c22a427eb68697c28db..d2f0c97989213fea58cf7996231cc443e5ea5da5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -22,6 +22,7 @@ import scala.collection.immutable.HashSet
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.api.java.function.FilterFunction
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
@@ -107,6 +108,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
       RewriteCorrelatedScalarSubquery,
       EliminateSerialization,
       RemoveAliasOnlyProject) ::
+    Batch("Check Cartesian Products", Once,
+      CheckCartesianProducts(conf)) ::
     Batch("Decimal Optimizations", fixedPoint,
       DecimalAggregates) ::
     Batch("Typed Filter Optimization", fixedPoint,
@@ -838,7 +841,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
       val (leftFilterConditions, rightFilterConditions, commonFilterCondition) =
         split(splitConjunctivePredicates(filterCondition), left, right)
       joinType match {
-        case Inner =>
+        case _: InnerLike =>
           // push down the single side `where` condition into respective sides
           val newLeft = leftFilterConditions.
             reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -848,7 +851,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
             commonFilterCondition.partition(e => !SubqueryExpression.hasCorrelatedSubquery(e))
           val newJoinCond = (newJoinConditions ++ joinCondition).reduceLeftOption(And)
 
-          val join = Join(newLeft, newRight, Inner, newJoinCond)
+          val join = Join(newLeft, newRight, joinType, newJoinCond)
           if (others.nonEmpty) {
             Filter(others.reduceLeft(And), join)
           } else {
@@ -885,7 +888,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
         split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
       joinType match {
-        case Inner | LeftExistence(_) =>
+        case _: InnerLike | LeftExistence(_) =>
           // push down the single side only join filter for both sides sub queries
           val newLeft = leftJoinConditions.
             reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -932,6 +935,46 @@ object CombineLimits extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Check if there any cartesian products between joins of any type in the optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is true.
+ *
+ * This rule must be run AFTER the ReorderJoin rule since the join conditions for each join must be
+ * collected before checking if it is a cartesian product. If you have
+ * SELECT * from R, S where R.r = S.s,
+ * the join between R and S is not a cartesian product and therefore should be allowed.
+ * The predicate R.r = S.s is not recognized as a join condition until the ReorderJoin rule.
+ */
+case class CheckCartesianProducts(conf: CatalystConf)
+    extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Check if a join is a cartesian product. Returns true if
+   * there are no join conditions involving references from both left and right.
+   */
+  def isCartesianProduct(join: Join): Boolean = {
+    val conditions = join.condition.map(splitConjunctivePredicates).getOrElse(Nil)
+    !conditions.map(_.references).exists(refs => refs.exists(join.left.outputSet.contains)
+        && refs.exists(join.right.outputSet.contains))
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan =
+    if (conf.crossJoinEnabled) {
+      plan
+    } else plan transform {
+      case j @ Join(left, right, Inner | LeftOuter | RightOuter | FullOuter, condition)
+        if isCartesianProduct(j) =>
+          throw new AnalysisException(
+            s"""Detected cartesian product for ${j.joinType.sql} join between logical plans
+               |${left.treeString(false).trim}
+               |and
+               |${right.treeString(false).trim}
+               |Join condition is missing or trivial.
+               |Use the CROSS JOIN syntax to allow cartesian products between these relations."""
+            .stripMargin)
+    }
+}
+
 /**
  * Speeds up aggregates on fixed-precision decimals by executing them on unscaled Long values.
  *
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
index 50076b1a41c02f254b0189abfab3596179e649f9..7400a01918c5223a2bea0436a2a315bd2a6a8f2e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -50,7 +50,7 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
       empty(p)
 
     case p @ Join(_, _, joinType, _) if p.children.exists(isEmptyLocalRelation) => joinType match {
-      case Inner => empty(p)
+      case _: InnerLike => empty(p)
       // Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
       // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
       case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => empty(p)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 158ad3d91fbadddbc11eb9e1e2a7c3969848378c..1621bffd619f216540e36ec73e34339919ed8612 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 
-
 /**
  * Reorder the joins and push all the conditions into join, so that the bottom ones have at least
  * one condition.
@@ -39,39 +38,46 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
    *
    * The joined plan are picked from left to right, prefer those has at least one join condition.
    *
-   * @param input a list of LogicalPlans to join.
+   * @param input a list of LogicalPlans to inner join and the type of inner join.
    * @param conditions a list of condition for join.
    */
   @tailrec
-  def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
+  def createOrderedJoin(input: Seq[(LogicalPlan, InnerLike)], conditions: Seq[Expression])
+    : LogicalPlan = {
     assert(input.size >= 2)
     if (input.size == 2) {
       val (joinConditions, others) = conditions.partition(
         e => !SubqueryExpression.hasCorrelatedSubquery(e))
-      val join = Join(input(0), input(1), Inner, joinConditions.reduceLeftOption(And))
+      val ((left, leftJoinType), (right, rightJoinType)) = (input(0), input(1))
+      val innerJoinType = (leftJoinType, rightJoinType) match {
+        case (Inner, Inner) => Inner
+        case (_, _) => Cross
+      }
+      val join = Join(left, right, innerJoinType, joinConditions.reduceLeftOption(And))
       if (others.nonEmpty) {
         Filter(others.reduceLeft(And), join)
       } else {
         join
       }
     } else {
-      val left :: rest = input.toList
+      val (left, _) :: rest = input.toList
       // find out the first join that have at least one join condition
-      val conditionalJoin = rest.find { plan =>
+      val conditionalJoin = rest.find { planJoinPair =>
+        val plan = planJoinPair._1
         val refs = left.outputSet ++ plan.outputSet
         conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
           .exists(_.references.subsetOf(refs))
       }
       // pick the next one if no condition left
-      val right = conditionalJoin.getOrElse(rest.head)
+      val (right, innerJoinType) = conditionalJoin.getOrElse(rest.head)
 
       val joinedRefs = left.outputSet ++ right.outputSet
       val (joinConditions, others) = conditions.partition(
         e => e.references.subsetOf(joinedRefs) && !SubqueryExpression.hasCorrelatedSubquery(e))
-      val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And))
+      val joined = Join(left, right, innerJoinType, joinConditions.reduceLeftOption(And))
 
       // should not have reference to same logical plan
-      createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others)
+      createOrderedJoin(Seq((joined, Inner)) ++ rest.filterNot(_._1 eq right), others)
     }
   }
 
@@ -82,7 +88,6 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
   }
 }
 
-
 /**
  * Elimination of outer joins, if the predicates can restrict the result sets so that
  * all null-supplying rows are eliminated
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 42fbc16d0396aeab313d4b868ab442a236e0e806..e4cb9f016133abf3ac3693dda6a0ca61b00776c8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -539,6 +539,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
     def join(ctx: JoinRelationContext, left: LogicalPlan, right: LogicalPlan): Join = {
       val baseJoinType = ctx.joinType match {
         case null => Inner
+        case jt if jt.CROSS != null => Cross
         case jt if jt.FULL != null => FullOuter
         case jt if jt.SEMI != null => LeftSemi
         case jt if jt.ANTI != null => LeftAnti
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 476c66af76b29af9ac0e8238a636c30f196855e8..41cabb8cb3390113899979dd4286b4894741de35 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -159,23 +159,30 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
  */
 object ExtractFiltersAndInnerJoins extends PredicateHelper {
 
-  // flatten all inner joins, which are next to each other
-  def flattenJoin(plan: LogicalPlan): (Seq[LogicalPlan], Seq[Expression]) = plan match {
-    case Join(left, right, Inner, cond) =>
-      val (plans, conditions) = flattenJoin(left)
-      (plans ++ Seq(right), conditions ++ cond.toSeq)
+  /**
+   * Flatten all inner joins, which are next to each other.
+   * Return a list of logical plans to be joined with a boolean for each plan indicating if it
+   * was involved in an explicit cross join. Also returns the entire list of join conditions for
+   * the left-deep tree.
+   */
+  def flattenJoin(plan: LogicalPlan, parentJoinType: InnerLike = Inner)
+      : (Seq[(LogicalPlan, InnerLike)], Seq[Expression]) = plan match {
+    case Join(left, right, joinType: InnerLike, cond) =>
+      val (plans, conditions) = flattenJoin(left, joinType)
+      (plans ++ Seq((right, joinType)), conditions ++ cond.toSeq)
 
-    case Filter(filterCondition, j @ Join(left, right, Inner, joinCondition)) =>
+    case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition)) =>
       val (plans, conditions) = flattenJoin(j)
       (plans, conditions ++ splitConjunctivePredicates(filterCondition))
 
-    case _ => (Seq(plan), Seq())
+    case _ => (Seq((plan, parentJoinType)), Seq())
   }
 
-  def unapply(plan: LogicalPlan): Option[(Seq[LogicalPlan], Seq[Expression])] = plan match {
-    case f @ Filter(filterCondition, j @ Join(_, _, Inner, _)) =>
+  def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]
+      = plan match {
+    case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, _)) =>
       Some(flattenJoin(f))
-    case j @ Join(_, _, Inner, _) =>
+    case j @ Join(_, _, joinType, _) =>
       Some(flattenJoin(j))
     case _ => None
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
index 80674d9b4bc9c078aeef44c3695bca1c32d0d3fe..61e083e6fc2c32edfe3c5ee492b33bee98b464cf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
@@ -28,6 +28,7 @@ object JoinType {
     case "rightouter" | "right" => RightOuter
     case "leftsemi" => LeftSemi
     case "leftanti" => LeftAnti
+    case "cross" => Cross
     case _ =>
       val supported = Seq(
         "inner",
@@ -35,7 +36,8 @@ object JoinType {
         "leftouter", "left",
         "rightouter", "right",
         "leftsemi",
-        "leftanti")
+        "leftanti",
+        "cross")
 
       throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
         "Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
@@ -46,10 +48,24 @@ sealed abstract class JoinType {
   def sql: String
 }
 
-case object Inner extends JoinType {
+/**
+ * The explicitCartesian flag indicates if the inner join was constructed with a CROSS join
+ * indicating a cartesian product has been explicitly requested.
+ */
+sealed abstract class InnerLike extends JoinType {
+  def explicitCartesian: Boolean
+}
+
+case object Inner extends InnerLike {
+  override def explicitCartesian: Boolean = false
   override def sql: String = "INNER"
 }
 
+case object Cross extends InnerLike {
+  override def explicitCartesian: Boolean = true
+  override def sql: String = "CROSS"
+}
+
 case object LeftOuter extends JoinType {
   override def sql: String = "LEFT OUTER"
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 010aec7ba1a4241cc07a88bcce6a6ef2631801e7..d2d33e40a8c8f47eedf0d5d82ca5ba84c34dc56e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -293,7 +293,7 @@ case class Join(
 
   override protected def validConstraints: Set[Expression] = {
     joinType match {
-      case Inner if condition.isDefined =>
+      case _: InnerLike if condition.isDefined =>
         left.constraints
           .union(right.constraints)
           .union(splitConjunctivePredicates(condition.get).toSet)
@@ -302,7 +302,7 @@ case class Join(
           .union(splitConjunctivePredicates(condition.get).toSet)
       case j: ExistenceJoin =>
         left.constraints
-      case Inner =>
+      case _: InnerLike =>
         left.constraints.union(right.constraints)
       case LeftExistence(_) =>
         left.constraints
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 13bf034f831cf1851d819bd01dcc7861c0a01364..e7c8615bc5e04b19e6a3ec18fc4528e4d2242711 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count, Max}
-import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.{Cross, Inner, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData}
 import org.apache.spark.sql.types._
@@ -396,7 +396,7 @@ class AnalysisErrorSuite extends AnalysisTest {
   }
 
   test("error test for self-join") {
-    val join = Join(testRelation, testRelation, Inner, None)
+    val join = Join(testRelation, testRelation, Cross, None)
     val error = intercept[AnalysisException] {
       SimpleAnalyzer.checkAnalysis(join)
     }
@@ -475,7 +475,7 @@ class AnalysisErrorSuite extends AnalysisTest {
         LocalRelation(
           AttributeReference("c", BinaryType)(exprId = ExprId(4)),
           AttributeReference("d", IntegerType)(exprId = ExprId(3))),
-        Inner,
+        Cross,
         Some(EqualTo(AttributeReference("a", BinaryType)(exprId = ExprId(2)),
           AttributeReference("c", BinaryType)(exprId = ExprId(4)))))
 
@@ -489,7 +489,7 @@ class AnalysisErrorSuite extends AnalysisTest {
         LocalRelation(
           AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)),
           AttributeReference("d", IntegerType)(exprId = ExprId(3))),
-        Inner,
+        Cross,
         Some(EqualTo(AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
           AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)))))
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 8971edc7d3b9a569409ca8686af0bd727d4070e5..50ebad25cd258d59d107c0f95a348bb82cc55c73 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.{Cross, Inner}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types._
 
@@ -341,7 +341,7 @@ class AnalysisSuite extends AnalysisTest {
         Join(
           Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
           Project(Seq($"y.key"), SubqueryAlias("y", input, None)),
-          Inner, None))
+          Cross, None))
 
     assertAnalysisSuccess(query)
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index dbb3e6a5272ecd3accd9756b46fe56dbd7d27957..087718b3ecf1ad1a5835928453e76b57fa3634da 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
-import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.{Cross, Inner, InnerLike, PlanTest}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 
@@ -54,6 +54,18 @@ class JoinOptimizationSuite extends PlanTest {
     val z = testRelation.subquery('z)
 
     def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], Seq[Expression])]) {
+      val expectedNoCross = expected map {
+        seq_pair => {
+          val plans = seq_pair._1
+          val noCartesian = plans map { plan => (plan, Inner) }
+          (noCartesian, seq_pair._2)
+        }
+      }
+      testExtractCheckCross(plan, expectedNoCross)
+    }
+
+    def testExtractCheckCross
+        (plan: LogicalPlan, expected: Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]) {
       assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected)
     }
 
@@ -70,6 +82,16 @@ class JoinOptimizationSuite extends PlanTest {
     testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq()))
     testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr),
       Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr)))
+
+    testExtractCheckCross(x.join(y, Cross), Some(Seq((x, Cross), (y, Cross)), Seq()))
+    testExtractCheckCross(x.join(y, Cross).join(z, Cross),
+      Some(Seq((x, Cross), (y, Cross), (z, Cross)), Seq()))
+    testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Cross),
+      Some(Seq((x, Cross), (y, Cross), (z, Cross)), Seq("x.b".attr === "y.d".attr)))
+    testExtractCheckCross(x.join(y, Inner, Some("x.b".attr === "y.d".attr)).join(z, Cross),
+      Some(Seq((x, Inner), (y, Inner), (z, Cross)), Seq("x.b".attr === "y.d".attr)))
+    testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Inner),
+      Some(Seq((x, Cross), (y, Cross), (z, Inner)), Seq("x.b".attr === "y.d".attr)))
   }
 
   test("reorder inner joins") {
@@ -77,18 +99,28 @@ class JoinOptimizationSuite extends PlanTest {
     val y = testRelation1.subquery('y)
     val z = testRelation.subquery('z)
 
-    val originalQuery = {
-      x.join(y).join(z)
-        .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr))
+    val queryAnswers = Seq(
+      (
+        x.join(y).join(z).where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)),
+        x.join(z, condition = Some("x.b".attr === "z.b".attr))
+          .join(y, condition = Some("y.d".attr === "z.a".attr))
+      ),
+      (
+        x.join(y, Cross).join(z, Cross)
+          .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)),
+        x.join(z, Cross, Some("x.b".attr === "z.b".attr))
+          .join(y, Cross, Some("y.d".attr === "z.a".attr))
+      ),
+      (
+        x.join(y, Inner).join(z, Cross).where("x.b".attr === "z.a".attr),
+        x.join(z, Cross, Some("x.b".attr === "z.a".attr)).join(y, Inner)
+      )
+    )
+
+    queryAnswers foreach { queryAnswerPair =>
+      val optimized = Optimize.execute(queryAnswerPair._1.analyze)
+      comparePlans(optimized, analysis.EliminateSubqueryAliases(queryAnswerPair._2.analyze))
     }
-
-    val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer =
-      x.join(z, condition = Some("x.b".attr === "z.b".attr))
-        .join(y, condition = Some("y.d".attr === "z.a".attr))
-        .analyze
-
-    comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
   }
 
   test("broadcasthint sets relation statistics to smallest value") {
@@ -98,7 +130,7 @@ class JoinOptimizationSuite extends PlanTest {
       Project(Seq($"x.key", $"y.key"),
         Join(
           SubqueryAlias("x", input, None),
-          BroadcastHint(SubqueryAlias("y", input, None)), Inner, None)).analyze
+          BroadcastHint(SubqueryAlias("y", input, None)), Cross, None)).analyze
 
     val optimized = Optimize.execute(query)
 
@@ -106,7 +138,7 @@ class JoinOptimizationSuite extends PlanTest {
       Join(
         Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
         BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None))),
-        Inner, None).analyze
+        Cross, None).analyze
 
     comparePlans(optimized, expected)
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index c549832ef3eda8399570783905b10d67b272c6e7..908dde7a6698826f9930cea7995ba58150b8cec7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -67,6 +67,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
     // Note that `None` is used to compare with OptimizeWithoutPropagateEmptyRelation.
     val testcases = Seq(
       (true, true, Inner, None),
+      (true, true, Cross, None),
       (true, true, LeftOuter, None),
       (true, true, RightOuter, None),
       (true, true, FullOuter, None),
@@ -74,6 +75,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
       (true, true, LeftSemi, None),
 
       (true, false, Inner, Some(LocalRelation('a.int, 'b.int))),
+      (true, false, Cross, Some(LocalRelation('a.int, 'b.int))),
       (true, false, LeftOuter, None),
       (true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
       (true, false, FullOuter, None),
@@ -81,6 +83,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
       (true, false, LeftSemi, None),
 
       (false, true, Inner, Some(LocalRelation('a.int, 'b.int))),
+      (false, true, Cross, Some(LocalRelation('a.int, 'b.int))),
       (false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
       (false, true, RightOuter, None),
       (false, true, FullOuter, None),
@@ -88,6 +91,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
       (false, true, LeftSemi, Some(LocalRelation('a.int))),
 
       (false, false, Inner, Some(LocalRelation('a.int, 'b.int))),
+      (false, false, Cross, Some(LocalRelation('a.int, 'b.int))),
       (false, false, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
       (false, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
       (false, false, FullOuter, Some(LocalRelation('a.int, 'b.int))),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 2fcbfc7067a13b7b1ab63e31b11db6ee03cf291e..faaea17b64d2a5ddf52522fe8396393bd55d2770 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -346,7 +346,7 @@ class PlanParserSuite extends PlanTest {
     def test(sql: String, jt: JoinType, tests: Seq[(String, JoinType) => Unit]): Unit = {
       tests.foreach(_(sql, jt))
     }
-    test("cross join", Inner, Seq(testUnconditionalJoin))
+    test("cross join", Cross, Seq(testUnconditionalJoin))
     test(",", Inner, Seq(testUnconditionalJoin))
     test("join", Inner, testAll)
     test("inner join", Inner, testAll)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e7dcf0f51f4a5da9143fa2382627098b4101183b..3b3cb820788a2c644dddf167b9bfe557624a5752 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -589,9 +589,9 @@ class Dataset[T] private[sql](
   def stat: DataFrameStatFunctions = new DataFrameStatFunctions(toDF())
 
   /**
-   * Cartesian join with another [[DataFrame]].
+   * Join with another [[DataFrame]].
    *
-   * Note that cartesian joins are very expensive without an extra filter that can be pushed down.
+   * Behaves as an INNER JOIN and requires a subsequent join predicate.
    *
    * @param right Right side of the join operation.
    *
@@ -763,6 +763,20 @@ class Dataset[T] private[sql](
     }
   }
 
+  /**
+   * Explicit cartesian join with another [[DataFrame]].
+   *
+   * Note that cartesian joins are very expensive without an extra filter that can be pushed down.
+   *
+   * @param right Right side of the join operation.
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  def crossJoin(right: Dataset[_]): DataFrame = withPlan {
+    Join(logicalPlan, right.logicalPlan, joinType = Cross, None)
+  }
+
   /**
    * :: Experimental ::
    * Joins this Dataset returning a [[Tuple2]] for each pair where `condition` evaluates to
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index b4899ad688f96b1ee427d494e555923fd2a40a84..c389593b4f767e4403c1b60adad710c6a917f64a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -140,13 +140,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
     }
 
     private def canBuildRight(joinType: JoinType): Boolean = joinType match {
-      case Inner | LeftOuter | LeftSemi | LeftAnti => true
+      case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => true
       case j: ExistenceJoin => true
       case _ => false
     }
 
     private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
-      case Inner | RightOuter => true
+      case _: InnerLike | RightOuter => true
       case _ => false
     }
 
@@ -200,7 +200,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           planLater(left), planLater(right), BuildLeft, joinType, condition) :: Nil
 
       // Pick CartesianProduct for InnerJoin
-      case logical.Join(left, right, Inner, condition) =>
+      case logical.Join(left, right, _: InnerLike, condition) =>
         joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
 
       case logical.Join(left, right, joinType, condition) =>
@@ -212,8 +212,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           }
         // This join could be very slow or OOM
         joins.BroadcastNestedLoopJoinExec(
-          planLater(left), planLater(right), buildSide, joinType, condition,
-          withinBroadcastThreshold = false) :: Nil
+          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
 
       // --- Cases where this strategy does not apply ---------------------------------------------
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 0f24baacd18d6465e51fdfeb84f1e5d7abc44680..0bc261d593df4ba5e0e780969ace3fe058c026f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -79,7 +79,7 @@ case class BroadcastHashJoinExec(
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
     joinType match {
-      case Inner => codegenInner(ctx, input)
+      case _: InnerLike => codegenInner(ctx, input)
       case LeftOuter | RightOuter => codegenOuter(ctx, input)
       case LeftSemi => codegenSemi(ctx, input)
       case LeftAnti => codegenAnti(ctx, input)
@@ -134,7 +134,7 @@ case class BroadcastHashJoinExec(
     ctx.INPUT_ROW = matched
     buildPlan.output.zipWithIndex.map { case (a, i) =>
       val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx)
-      if (joinType == Inner) {
+      if (joinType.isInstanceOf[InnerLike]) {
         ev
       } else {
         // the variables are needed even there is no matched rows
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
index 6a9965f1a24cd6d0fd0136314249600d231d9710..43cdce7de8c7fb7731c6d199a474016c44a8e0fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
@@ -34,8 +34,7 @@ case class BroadcastNestedLoopJoinExec(
     right: SparkPlan,
     buildSide: BuildSide,
     joinType: JoinType,
-    condition: Option[Expression],
-    withinBroadcastThreshold: Boolean = true) extends BinaryExecNode {
+    condition: Option[Expression]) extends BinaryExecNode {
 
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
@@ -65,7 +64,7 @@ case class BroadcastNestedLoopJoinExec(
 
   override def output: Seq[Attribute] = {
     joinType match {
-      case Inner =>
+      case _: InnerLike =>
         left.output ++ right.output
       case LeftOuter =>
         left.output ++ right.output.map(_.withNullability(true))
@@ -340,20 +339,11 @@ case class BroadcastNestedLoopJoinExec(
     )
   }
 
-  protected override def doPrepare(): Unit = {
-    if (!withinBroadcastThreshold && !sqlContext.conf.crossJoinEnabled) {
-      throw new AnalysisException("Both sides of this join are outside the broadcasting " +
-        "threshold and computing it could be prohibitively expensive. To explicitly enable it, " +
-        s"please set ${SQLConf.CROSS_JOINS_ENABLED.key} = true")
-    }
-    super.doPrepare()
-  }
-
   protected override def doExecute(): RDD[InternalRow] = {
     val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]()
 
     val resultRdd = (joinType, buildSide) match {
-      case (Inner, _) =>
+      case (_: InnerLike, _) =>
         innerJoin(broadcastedRelation)
       case (LeftOuter, BuildRight) | (RightOuter, BuildLeft) =>
         outerJoin(broadcastedRelation)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
index 57866df90d27dfb0a2104c1e58eae01295164cf2..15dc9b40662e2b32ddb59e64270d14d425db7bad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
@@ -91,15 +91,6 @@ case class CartesianProductExec(
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
-  protected override def doPrepare(): Unit = {
-    if (!sqlContext.conf.crossJoinEnabled) {
-      throw new AnalysisException("Cartesian joins could be prohibitively expensive and are " +
-        "disabled by default. To explicitly enable them, please set " +
-        s"${SQLConf.CROSS_JOINS_ENABLED.key} = true")
-    }
-    super.doPrepare()
-  }
-
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index d46a80423fa35085ddf10c2846b68ac624a1a933..fb6bfa7b2735c53a564f9b9feb0bf92d199ad598 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -38,7 +38,7 @@ trait HashJoin {
 
   override def output: Seq[Attribute] = {
     joinType match {
-      case Inner =>
+      case _: InnerLike =>
         left.output ++ right.output
       case LeftOuter =>
         left.output ++ right.output.map(_.withNullability(true))
@@ -225,7 +225,7 @@ trait HashJoin {
       numOutputRows: SQLMetric): Iterator[InternalRow] = {
 
     val joinedIter = joinType match {
-      case Inner =>
+      case _: InnerLike =>
         innerJoin(streamedIter, hashed)
       case LeftOuter | RightOuter =>
         outerJoin(streamedIter, hashed)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 5c9c1e6062f0d2eec8d1eabf44f0499d9abf8f20..b46af2a99a1e0ef7b83cb409274226d6cee32786 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -45,7 +45,7 @@ case class SortMergeJoinExec(
 
   override def output: Seq[Attribute] = {
     joinType match {
-      case Inner =>
+      case _: InnerLike =>
         left.output ++ right.output
       case LeftOuter =>
         left.output ++ right.output.map(_.withNullability(true))
@@ -64,7 +64,8 @@ case class SortMergeJoinExec(
   }
 
   override def outputPartitioning: Partitioning = joinType match {
-    case Inner => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
+    case _: InnerLike =>
+      PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
     // For left and right outer joins, the output is partitioned by the streamed input's join keys.
     case LeftOuter => left.outputPartitioning
     case RightOuter => right.outputPartitioning
@@ -111,7 +112,7 @@ case class SortMergeJoinExec(
       val resultProj: InternalRow => InternalRow = UnsafeProjection.create(output, output)
 
       joinType match {
-        case Inner =>
+        case _: InnerLike =>
           new RowIterator {
             private[this] var currentLeftRow: InternalRow = _
             private[this] var currentRightMatches: ArrayBuffer[InternalRow] = _
@@ -318,7 +319,7 @@ case class SortMergeJoinExec(
   }
 
   override def supportCodegen: Boolean = {
-    joinType == Inner
+    joinType.isInstanceOf[InnerLike]
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a54342f82e249882cab6e9e576f8bf86dcd71110..1d6ca5a965cbfddbf00aa0d6454ea216e3af47ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -362,7 +362,8 @@ object SQLConf {
     .createWithDefault(true)
 
   val CROSS_JOINS_ENABLED = SQLConfigBuilder("spark.sql.crossJoin.enabled")
-    .doc("When false, we will throw an error if a query contains a cross join")
+    .doc("When false, we will throw an error if a query contains a cartesian product without " +
+        "explicit CROSS JOIN syntax.")
     .booleanConf
     .createWithDefault(false)
 
@@ -683,8 +684,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
 
-  def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
-
   // Do not use a value larger than 4000 as the default value of this property.
   // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
   def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
@@ -709,6 +708,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
   override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
 
   override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
+
+  override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cross-join.sql b/sql/core/src/test/resources/sql-tests/inputs/cross-join.sql
new file mode 100644
index 0000000000000000000000000000000000000000..aa7312437487af4c74cffb43ebf210266f2606a1
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/cross-join.sql
@@ -0,0 +1,35 @@
+-- Cross join detection and error checking is done in JoinSuite since explain output is
+-- used in the error message and the ids are not stable. Only positive cases are checked here.
+
+create temporary view nt1 as select * from values
+  ("one", 1),
+  ("two", 2),
+  ("three", 3)
+  as nt1(k, v1);
+
+create temporary view nt2 as select * from values
+  ("one", 1),
+  ("two", 22),
+  ("one", 5)
+  as nt2(k, v2);
+
+-- Cross joins with and without predicates
+SELECT * FROM nt1 cross join nt2;
+SELECT * FROM nt1 cross join nt2 where nt1.k = nt2.k;
+SELECT * FROM nt1 cross join nt2 on (nt1.k = nt2.k);
+SELECT * FROM nt1 cross join nt2 where nt1.v1 = 1 and nt2.v2 = 22;
+
+SELECT a.key, b.key FROM
+(SELECT k key FROM nt1 WHERE v1 < 2) a
+CROSS JOIN
+(SELECT k key FROM nt2 WHERE v2 = 22) b;
+
+-- Join reordering 
+create temporary view A(a, va) as select * from nt1;
+create temporary view B(b, vb) as select * from nt1;
+create temporary view C(c, vc) as select * from nt1;
+create temporary view D(d, vd) as select * from nt1;
+
+-- Allowed since cross join with C is explicit
+select * from ((A join B on (a = b)) cross join C) join D on (a = d);
+
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte.sql b/sql/core/src/test/resources/sql-tests/inputs/cte.sql
index 10d34deff4ee3c0c321a947ba3ec31f44e3ed9fd..3914db26914b479de99c2ca71332c29021bb6f44 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/cte.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/cte.sql
@@ -11,4 +11,4 @@ WITH t AS (SELECT 1 FROM t) SELECT * FROM t;
 WITH s1 AS (SELECT 1 FROM s2), s2 AS (SELECT 1 FROM s1) SELECT * FROM s1, s2;
 
 -- WITH clause should reference the previous CTE
-WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2;
+WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1 cross join t2;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql b/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
index f50f1ebad970e0dbe59b82e9d46903fc105acbc7..cdc6c81e10047ec4bed0dca9ee9fa421794a0b79 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
@@ -24,6 +24,9 @@ CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (97) as t1(int_col1)
 
 CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (0) as t2(int_col1);
 
+-- Set the cross join enabled flag for the LEFT JOIN test since there's no join condition.
+-- Ultimately the join should be optimized away.
+set spark.sql.crossJoin.enabled = true;
 SELECT *
 FROM (
 SELECT
@@ -31,6 +34,6 @@ SELECT
     FROM t1
     LEFT JOIN t2 ON false
 ) t where (t.int_col) is not null;
-
+set spark.sql.crossJoin.enabled = false;
 
 
diff --git a/sql/core/src/test/resources/sql-tests/results/cross-join.sql.out b/sql/core/src/test/resources/sql-tests/results/cross-join.sql.out
new file mode 100644
index 0000000000000000000000000000000000000000..562e174fc0bb26b0de4e845c5ed65591192823fb
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/cross-join.sql.out
@@ -0,0 +1,129 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 12
+
+
+-- !query 0
+create temporary view nt1 as select * from values
+  ("one", 1),
+  ("two", 2),
+  ("three", 3)
+  as nt1(k, v1)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+create temporary view nt2 as select * from values
+  ("one", 1),
+  ("two", 22),
+  ("one", 5)
+  as nt2(k, v2)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+SELECT * FROM nt1 cross join nt2
+-- !query 2 schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query 2 output
+one	1	one	1
+one	1	one	5
+one	1	two	22
+three	3	one	1
+three	3	one	5
+three	3	two	22
+two	2	one	1
+two	2	one	5
+two	2	two	22
+
+
+-- !query 3
+SELECT * FROM nt1 cross join nt2 where nt1.k = nt2.k
+-- !query 3 schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query 3 output
+one	1	one	1
+one	1	one	5
+two	2	two	22
+
+
+-- !query 4
+SELECT * FROM nt1 cross join nt2 on (nt1.k = nt2.k)
+-- !query 4 schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query 4 output
+one	1	one	1
+one	1	one	5
+two	2	two	22
+
+
+-- !query 5
+SELECT * FROM nt1 cross join nt2 where nt1.v1 = 1 and nt2.v2 = 22
+-- !query 5 schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query 5 output
+one	1	two	22
+
+
+-- !query 6
+SELECT a.key, b.key FROM
+(SELECT k key FROM nt1 WHERE v1 < 2) a
+CROSS JOIN
+(SELECT k key FROM nt2 WHERE v2 = 22) b
+-- !query 6 schema
+struct<key:string,key:string>
+-- !query 6 output
+one	two
+
+
+-- !query 7
+create temporary view A(a, va) as select * from nt1
+-- !query 7 schema
+struct<>
+-- !query 7 output
+
+
+
+-- !query 8
+create temporary view B(b, vb) as select * from nt1
+-- !query 8 schema
+struct<>
+-- !query 8 output
+
+
+
+-- !query 9
+create temporary view C(c, vc) as select * from nt1
+-- !query 9 schema
+struct<>
+-- !query 9 output
+
+
+
+-- !query 10
+create temporary view D(d, vd) as select * from nt1
+-- !query 10 schema
+struct<>
+-- !query 10 output
+
+
+
+-- !query 11
+select * from ((A join B on (a = b)) cross join C) join D on (a = d)
+-- !query 11 schema
+struct<a:string,va:int,b:string,vb:int,c:string,vc:int,d:string,vd:int>
+-- !query 11 output
+one	1	one	1	one	1	one	1
+one	1	one	1	three	3	one	1
+one	1	one	1	two	2	one	1
+three	3	three	3	one	1	three	3
+three	3	three	3	three	3	three	3
+three	3	three	3	two	2	three	3
+two	2	two	2	one	1	two	2
+two	2	two	2	three	3	two	2
+two	2	two	2	two	2	two	2
diff --git a/sql/core/src/test/resources/sql-tests/results/cte.sql.out b/sql/core/src/test/resources/sql-tests/results/cte.sql.out
index ddee5bf2d473b8a6f9cdd3477d524555666adb86..9fbad8f3800a9a0577bea53e7102d4a5d7abe493 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte.sql.out
@@ -47,7 +47,7 @@ Table or view not found: s2; line 1 pos 26
 
 
 -- !query 5
-WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2
+WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1 cross join t2
 -- !query 5 schema
 struct<id:int,2:int>
 -- !query 5 output
diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
index b39fdb0e5872085a62ed0d25776e014cb66cf70c..cc50b9444bb4bdba3ae0366c405ca04a4d575d88 100644
--- a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 6
+-- Number of queries: 8
 
 
 -- !query 0
@@ -59,6 +59,14 @@ struct<>
 
 
 -- !query 5
+set spark.sql.crossJoin.enabled = true
+-- !query 5 schema
+struct<key:string,value:string>
+-- !query 5 output
+spark.sql.crossJoin.enabled
+
+
+-- !query 6
 SELECT *
 FROM (
 SELECT
@@ -66,7 +74,15 @@ SELECT
     FROM t1
     LEFT JOIN t2 ON false
 ) t where (t.int_col) is not null
--- !query 5 schema
+-- !query 6 schema
 struct<int_col:int>
--- !query 5 output
+-- !query 6 output
 97
+
+
+-- !query 7
+set spark.sql.crossJoin.enabled = false
+-- !query 7 schema
+struct<key:string,value:string>
+-- !query 7 output
+spark.sql.crossJoin.enabled
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 4abf5e42b9c342ce7e7c4046ea72edd356ec903a..541ffb58e727fd4fe61661b0f7c8a17f244d3d71 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -104,6 +104,21 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
         .collect().toSeq)
   }
 
+  test("join - cross join") {
+    val df1 = Seq((1, "1"), (3, "3")).toDF("int", "str")
+    val df2 = Seq((2, "2"), (4, "4")).toDF("int", "str")
+
+    checkAnswer(
+      df1.crossJoin(df2),
+      Row(1, "1", 2, "2") :: Row(1, "1", 4, "4") ::
+        Row(3, "3", 2, "2") :: Row(3, "3", 4, "4") :: Nil)
+
+    checkAnswer(
+      df2.crossJoin(df1),
+      Row(2, "2", 1, "1") :: Row(2, "2", 3, "3") ::
+        Row(4, "4", 1, "1") :: Row(4, "4", 3, "3") :: Nil)
+  }
+
   test("join - using aliases after self join") {
     val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
     checkAnswer(
@@ -145,7 +160,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
     assert(plan1.collect { case p: BroadcastHashJoinExec => p }.size === 1)
 
     // no join key -- should not be a broadcast join
-    val plan2 = df1.join(broadcast(df2)).queryExecution.sparkPlan
+    val plan2 = df1.crossJoin(broadcast(df2)).queryExecution.sparkPlan
     assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size === 0)
 
     // planner should not crash without a join
@@ -155,7 +170,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
     withTempPath { path =>
       df1.write.parquet(path.getCanonicalPath)
       val pf1 = spark.read.parquet(path.getCanonicalPath)
-      assert(df1.join(broadcast(pf1)).count() === 4)
+      assert(df1.crossJoin(broadcast(pf1)).count() === 4)
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f89951760f7d2217c284b753be58c99be29c22a1..c2d256bdd335b55232b010342d27742721a7f50f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -626,9 +626,9 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
   test("drop(name: String) search and drop all top level columns that matchs the name") {
     val df1 = Seq((1, 2)).toDF("a", "b")
     val df2 = Seq((3, 4)).toDF("a", "b")
-    checkAnswer(df1.join(df2), Row(1, 2, 3, 4))
+    checkAnswer(df1.crossJoin(df2), Row(1, 2, 3, 4))
     // Finds and drops all columns that match the name (case insensitive).
-    checkAnswer(df1.join(df2).drop("A"), Row(2, 4))
+    checkAnswer(df1.crossJoin(df2).drop("A"), Row(2, 4))
   }
 
   test("withColumnRenamed") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 8ce6ea66b6bbf69e26cdd26b408a25e0bf6b7794..3243f352a5337f2500f32729a2a57741054fdbb7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -466,7 +466,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
 
   test("self join") {
     val ds = Seq("1", "2").toDS().as("a")
-    val joined = ds.joinWith(ds, lit(true))
+    val joined = ds.joinWith(ds, lit(true), "cross")
     checkDataset(joined, ("1", "1"), ("1", "2"), ("2", "1"), ("2", "2"))
   }
 
@@ -486,7 +486,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
   test("Kryo encoder self join") {
     implicit val kryoEncoder = Encoders.kryo[KryoData]
     val ds = Seq(KryoData(1), KryoData(2)).toDS()
-    assert(ds.joinWith(ds, lit(true)).collect().toSet ==
+    assert(ds.joinWith(ds, lit(true), "cross").collect().toSet ==
       Set(
         (KryoData(1), KryoData(1)),
         (KryoData(1), KryoData(2)),
@@ -514,7 +514,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
   test("Java encoder self join") {
     implicit val kryoEncoder = Encoders.javaSerialization[JavaData]
     val ds = Seq(JavaData(1), JavaData(2)).toDS()
-    assert(ds.joinWith(ds, lit(true)).collect().toSet ==
+    assert(ds.joinWith(ds, lit(true), "cross").collect().toSet ==
       Set(
         (JavaData(1), JavaData(1)),
         (JavaData(1), JavaData(2)),
@@ -532,7 +532,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
     val ds2 = Seq((nullInt, "1"), (new java.lang.Integer(22), "2")).toDS()
 
     checkDataset(
-      ds1.joinWith(ds2, lit(true)),
+      ds1.joinWith(ds2, lit(true), "cross"),
       ((nullInt, "1"), (nullInt, "1")),
       ((nullInt, "1"), (new java.lang.Integer(22), "2")),
       ((new java.lang.Integer(22), "2"), (nullInt, "1")),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 44889d92ee306a63d3dcc7209e3f87ca26821c47..913b2ae9762cc544b55ded84cc81687ff1745142 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -225,8 +225,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
             Row(2, 2, 1, null) ::
             Row(2, 2, 2, 2) :: Nil)
       }
-      assert(e.getMessage.contains("Cartesian joins could be prohibitively expensive and are " +
-        "disabled by default"))
+      assert(e.getMessage.contains("Detected cartesian product for INNER join " +
+        "between logical plans"))
     }
   }
 
@@ -482,7 +482,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 
     // we set the threshold is greater than statistic of the cached table testData
     withSQLConf(
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (sizeInByteOfTestData + 1).toString()) {
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (sizeInByteOfTestData + 1).toString(),
+      SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
 
       assert(statisticSizeInByte(spark.table("testData2")) >
         spark.conf.get(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
@@ -573,4 +574,34 @@ class JoinSuite extends QueryTest with SharedSQLContext {
         Row(3, 1) ::
         Row(3, 2) :: Nil)
   }
+
+  test("cross join detection") {
+    testData.createOrReplaceTempView("A")
+    testData.createOrReplaceTempView("B")
+    testData2.createOrReplaceTempView("C")
+    testData3.createOrReplaceTempView("D")
+    upperCaseData.where('N >= 3).createOrReplaceTempView("`right`")
+    val cartesianQueries = Seq(
+      /** The following should error out since there is no explicit cross join */
+      "SELECT * FROM testData inner join testData2",
+      "SELECT * FROM testData left outer join testData2",
+      "SELECT * FROM testData right outer join testData2",
+      "SELECT * FROM testData full outer join testData2",
+      "SELECT * FROM testData, testData2",
+      "SELECT * FROM testData, testData2 where testData.key = 1 and testData2.a = 22",
+      /** The following should fail because after reordering there are cartesian products */
+      "select * from (A join B on (A.key = B.key)) join D on (A.key=D.a) join C",
+      "select * from ((A join B on (A.key = B.key)) join C) join D on (A.key = D.a)",
+      /** Cartesian product involving C, which is not involved in a CROSS join */
+      "select * from ((A join B on (A.key = B.key)) cross join D) join C on (A.key = D.a)");
+
+     def checkCartesianDetection(query: String): Unit = {
+      val e = intercept[Exception] {
+        checkAnswer(sql(query), Nil);
+      }
+      assert(e.getMessage.contains("Detected cartesian product"))
+    }
+
+    cartesianQueries.foreach(checkCartesianDetection)
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index d3cfa953a3123750504240becb7af440ef27f717..afd47897ed4b22c189164ccd09e2927ae9e4d260 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -361,7 +361,8 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext {
         |with
         | v0 as (select 0 as key, 1 as value),
         | v1 as (select key, count(value) over (partition by key) cnt_val from v0),
-        | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key)
+        | v2 as (select v1.key, v1_lag.cnt_val from v1 cross join v1 v1_lag
+        |        where v1.key = v1_lag.key)
         | select key, cnt_val from v2 order by key limit 1
       """.stripMargin), Row(0, 1))
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
index 35dab63672c05a470f4205e529357bb9699f76f6..4408ece1122583fc3c290ac38c4db19ca7adf1cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
@@ -109,8 +109,8 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
         leftPlan: SparkPlan,
         rightPlan: SparkPlan,
         side: BuildSide) = {
-      val shuffledHashJoin =
-        joins.ShuffledHashJoinExec(leftKeys, rightKeys, Inner, side, None, leftPlan, rightPlan)
+      val shuffledHashJoin = joins.ShuffledHashJoinExec(leftKeys, rightKeys, Inner,
+        side, None, leftPlan, rightPlan)
       val filteredJoin =
         boundCondition.map(FilterExec(_, shuffledHashJoin)).getOrElse(shuffledHashJoin)
       EnsureRequirements(spark.sessionState.conf).apply(filteredJoin)
@@ -122,8 +122,8 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
         boundCondition: Option[Expression],
         leftPlan: SparkPlan,
         rightPlan: SparkPlan) = {
-      val sortMergeJoin =
-        joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition, leftPlan, rightPlan)
+      val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition,
+        leftPlan, rightPlan)
       EnsureRequirements(spark.sessionState.conf).apply(sortMergeJoin)
     }
 
diff --git a/sql/hive/src/test/resources/sqlgen/join_2_tables.sql b/sql/hive/src/test/resources/sqlgen/join_2_tables.sql
index 9dd200c3c0cfa18cb19a2746b8e209ca9fc00942..0f033a04aea47ea5085ec30af5fa83038fdac0f0 100644
--- a/sql/hive/src/test/resources/sqlgen/join_2_tables.sql
+++ b/sql/hive/src/test/resources/sqlgen/join_2_tables.sql
@@ -1,7 +1,7 @@
 -- This file is automatically generated by LogicalPlanToSQLSuite.
 SELECT COUNT(a.value), b.KEY, a.KEY
-FROM parquet_t1 a, parquet_t1 b
+FROM parquet_t1 a CROSS JOIN parquet_t1 b
 GROUP BY a.KEY, b.KEY
 HAVING MAX(a.KEY) > 0
 --------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `count(value)`, `gen_attr_1` AS `KEY`, `gen_attr_2` AS `KEY` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT count(`gen_attr_4`) AS `gen_attr_0`, `gen_attr_1`, `gen_attr_2`, max(`gen_attr_2`) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_4` FROM `default`.`parquet_t1`) AS gen_subquery_0 INNER JOIN (SELECT `key` AS `gen_attr_1`, `value` AS `gen_attr_5` FROM `default`.`parquet_t1`) AS gen_subquery_1 GROUP BY `gen_attr_2`, `gen_attr_1` HAVING (`gen_attr_3` > CAST(0 AS BIGINT))) AS gen_subquery_2) AS gen_subquery_3
+SELECT `gen_attr_0` AS `count(value)`, `gen_attr_1` AS `KEY`, `gen_attr_2` AS `KEY` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT count(`gen_attr_4`) AS `gen_attr_0`, `gen_attr_1`, `gen_attr_2`, max(`gen_attr_2`) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_4` FROM `default`.`parquet_t1`) AS gen_subquery_0 CROSS JOIN (SELECT `key` AS `gen_attr_1`, `value` AS `gen_attr_5` FROM `default`.`parquet_t1`) AS gen_subquery_1 GROUP BY `gen_attr_2`, `gen_attr_1` HAVING (`gen_attr_3` > CAST(0 AS BIGINT))) AS gen_subquery_2) AS gen_subquery_3
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
index 9c6da6a628dcfa80229311c773c897f07aad27d6..3e0fdc1f8b925ac8fc69af77e1c1eb28f490d9d9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
@@ -642,7 +642,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
     checkColumnNames(
       """SELECT x.a, y.a, x.b, y.b
         |FROM (SELECT 1 AS a, 2 AS b) x
-        |INNER JOIN (SELECT 1 AS a, 2 AS b) y
+        |CROSS JOIN (SELECT 1 AS a, 2 AS b) y
         |ON x.a = y.a
       """.stripMargin,
       "a", "a", "b", "b"
@@ -810,7 +810,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
     checkSQL(
       """
         |SELECT COUNT(a.value), b.KEY, a.KEY
-        |FROM parquet_t1 a, parquet_t1 b
+        |FROM parquet_t1 a CROSS JOIN parquet_t1 b
         |GROUP BY a.KEY, b.KEY
         |HAVING MAX(a.KEY) > 0
       """.stripMargin,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 3c7dbb449c5215874354200cb7dec643abf3e823..1d1a958d3fea3a7b41395585ea2bb73a206217fa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -318,10 +318,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
   createQueryTest("trivial join ON clause",
     "SELECT * FROM src a JOIN src b ON a.key = b.key")
 
-  createQueryTest("small.cartesian",
-    "SELECT a.key, b.key FROM (SELECT key FROM src WHERE key < 1) a JOIN " +
-      "(SELECT key FROM src WHERE key = 2) b")
-
   createQueryTest("length.udf",
     "SELECT length(\"test\") FROM src LIMIT 1")
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index e92bbdea75a7b2a3e6fb99cfb7eccad9a032478f..2f6d9fb96b8251c91beb8158d30ad227c659d6da 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -592,9 +592,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
 
   test("self-join") {
     val table = spark.table("normal_parquet")
-    val selfJoin = table.as("t1").join(table.as("t2"))
+    val selfJoin = table.as("t1").crossJoin(table.as("t2"))
     checkAnswer(selfJoin,
-      sql("SELECT * FROM normal_parquet x JOIN normal_parquet y"))
+      sql("SELECT * FROM normal_parquet x CROSS JOIN normal_parquet y"))
   }
 }