diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 55c168d552a45745d129bed2b2c4a1474ec0f072..b7d8d932edfcf53753b14d5b12f93734aaaddfb5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -21,8 +21,8 @@ import scala.collection.immutable.HashSet
 
 import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
 import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, Unions}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -62,6 +62,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
       SetOperationPushDown,
       SamplePushDown,
       ReorderJoin,
+      OuterJoinElimination,
       PushPredicateThroughJoin,
       PushPredicateThroughProject,
       PushPredicateThroughGenerate,
@@ -931,6 +932,62 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
   }
 }
 
+/**
+ * Elimination of outer joins, if the predicates can restrict the result sets so that
+ * all null-supplying rows are eliminated
+ *
+ * - full outer -> inner if both sides have such predicates
+ * - left outer -> inner if the right side has such predicates
+ * - right outer -> inner if the left side has such predicates
+ * - full outer -> left outer if only the left side has such predicates
+ * - full outer -> right outer if only the right side has such predicates
+ *
+ * This rule should be executed before pushing down the Filter
+ */
+object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {
+
+  /**
+   * Returns whether the expression returns null or false when all inputs are nulls.
+   */
+  private def canFilterOutNull(e: Expression): Boolean = {
+    if (!e.deterministic) return false
+    val attributes = e.references.toSeq
+    val emptyRow = new GenericInternalRow(attributes.length)
+    val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
+    v == null || v == false
+  }
+
+  private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
+    val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition)
+    val leftConditions = splitConjunctiveConditions
+      .filter(_.references.subsetOf(join.left.outputSet))
+    val rightConditions = splitConjunctiveConditions
+      .filter(_.references.subsetOf(join.right.outputSet))
+
+    val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) ||
+      filter.constraints.filter(_.isInstanceOf[IsNotNull])
+        .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty)
+    val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) ||
+      filter.constraints.filter(_.isInstanceOf[IsNotNull])
+        .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty)
+
+    join.joinType match {
+      case RightOuter if leftHasNonNullPredicate => Inner
+      case LeftOuter if rightHasNonNullPredicate => Inner
+      case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
+      case FullOuter if leftHasNonNullPredicate => LeftOuter
+      case FullOuter if rightHasNonNullPredicate => RightOuter
+      case o => o
+    }
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
+      val newJoinType = buildNewJoinType(f, j)
+      if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
+  }
+}
+
 /**
  * Pushes down [[Filter]] operators where the `condition` can be
  * evaluated using only the attributes of the left or right side of a join.  Other
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..a1dc836a5fd1ca42518f0b7c1b9cb30611a102af
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+class OuterJoinEliminationSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Subqueries", Once,
+        EliminateSubQueries) ::
+      Batch("Outer Join Elimination", Once,
+        OuterJoinElimination,
+        PushPredicateThroughJoin) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+  val testRelation1 = LocalRelation('d.int, 'e.int, 'f.int)
+
+  test("joins: full outer to inner") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+        .where("x.b".attr >= 1 && "y.d".attr >= 2)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation.where('b >= 1)
+    val right = testRelation1.where('d >= 2)
+    val correctAnswer =
+      left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("joins: full outer to right") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("y.d".attr > 2)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation
+    val right = testRelation1.where('d > 2)
+    val correctAnswer =
+      left.join(right, RightOuter, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("joins: full outer to left") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("x.a".attr <=> 2)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation.where('a <=> 2)
+    val right = testRelation1
+    val correctAnswer =
+      left.join(right, LeftOuter, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("joins: right to inner") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, RightOuter, Option("x.a".attr === "y.d".attr)).where("x.b".attr > 2)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation.where('b > 2)
+    val right = testRelation1
+    val correctAnswer =
+      left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("joins: left to inner") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+        .where("y.e".attr.isNotNull)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation
+    val right = testRelation1.where('e.isNotNull)
+    val correctAnswer =
+      left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  // evaluating if mixed OR and NOT expressions can eliminate all null-supplying rows
+  test("joins: left to inner with complicated filter predicates #1") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+        .where(!'e.isNull || ('d.isNotNull && 'f.isNull))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation
+    val right = testRelation1.where(!'e.isNull || ('d.isNotNull && 'f.isNull))
+    val correctAnswer =
+      left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  // eval(emptyRow) of 'e.in(1, 2) will return null instead of false
+  test("joins: left to inner with complicated filter predicates #2") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+        .where('e.in(1, 2))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation
+    val right = testRelation1.where('e.in(1, 2))
+    val correctAnswer =
+      left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  // evaluating if mixed OR and AND expressions can eliminate all null-supplying rows
+  test("joins: left to inner with complicated filter predicates #3") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+        .where((!'e.isNull || ('d.isNotNull && 'f.isNull)) && 'e.isNull)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation
+    val right = testRelation1.where((!'e.isNull || ('d.isNotNull && 'f.isNull)) && 'e.isNull)
+    val correctAnswer =
+      left.join(right, Inner, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  // evaluating if the expressions that have both left and right attributes
+  // can eliminate all null-supplying rows
+  // FULL OUTER => INNER
+  test("joins: left to inner with complicated filter predicates #4") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+        .where("x.b".attr + 3 === "y.e".attr)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation
+    val right = testRelation1
+    val correctAnswer =
+      left.join(right, Inner, Option("b".attr + 3 === "e".attr && "a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index a5e5f156423cccbb3a3c3dacff811a6ffc310471..067a62d011ec493d11f89f914dc8efa6d609ca0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.logical.Join
 import org.apache.spark.sql.execution.joins.BroadcastHashJoin
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
@@ -156,4 +158,50 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
       assert(df1.join(broadcast(pf1)).count() === 4)
     }
   }
+
+  test("join - outer join conversion") {
+    val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
+    val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
+
+    // outer -> left
+    val outerJoin2Left = df.join(df2, $"a.int" === $"b.int", "outer").where($"a.int" === 3)
+    assert(outerJoin2Left.queryExecution.optimizedPlan.collect {
+      case j @ Join(_, _, LeftOuter, _) => j }.size === 1)
+    checkAnswer(
+      outerJoin2Left,
+      Row(3, 4, "3", null, null, null) :: Nil)
+
+    // outer -> right
+    val outerJoin2Right = df.join(df2, $"a.int" === $"b.int", "outer").where($"b.int" === 5)
+    assert(outerJoin2Right.queryExecution.optimizedPlan.collect {
+      case j @ Join(_, _, RightOuter, _) => j }.size === 1)
+    checkAnswer(
+      outerJoin2Right,
+      Row(null, null, null, 5, 6, "5") :: Nil)
+
+    // outer -> inner
+    val outerJoin2Inner = df.join(df2, $"a.int" === $"b.int", "outer").
+      where($"a.int" === 1 && $"b.int2" === 3)
+    assert(outerJoin2Inner.queryExecution.optimizedPlan.collect {
+      case j @ Join(_, _, Inner, _) => j }.size === 1)
+    checkAnswer(
+      outerJoin2Inner,
+      Row(1, 2, "1", 1, 3, "1") :: Nil)
+
+    // right -> inner
+    val rightJoin2Inner = df.join(df2, $"a.int" === $"b.int", "right").where($"a.int" === 1)
+    assert(rightJoin2Inner.queryExecution.optimizedPlan.collect {
+      case j @ Join(_, _, Inner, _) => j }.size === 1)
+    checkAnswer(
+      rightJoin2Inner,
+      Row(1, 2, "1", 1, 3, "1") :: Nil)
+
+    // left -> inner
+    val leftJoin2Inner = df.join(df2, $"a.int" === $"b.int", "left").where($"b.int2" === 3)
+    assert(leftJoin2Inner.queryExecution.optimizedPlan.collect {
+      case j @ Join(_, _, Inner, _) => j }.size === 1)
+    checkAnswer(
+      leftJoin2Inner,
+      Row(1, 2, "1", 1, 3, "1") :: Nil)
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 92ff7e73fad88cc11ca2972cdfc0bf0e28b0b3f2..8f2a0c035136129ce9ef910b2b7e332bd93cd5a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -81,7 +81,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
       ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]),
       ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeOuterJoin]),
       ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
-        classOf[SortMergeOuterJoin]),
+        classOf[SortMergeJoin]), // converted from Right Outer to Inner
       ("SELECT * FROM testData right join testData2 ON key = a and key = 2",
         classOf[SortMergeOuterJoin]),
       ("SELECT * FROM testData full outer join testData2 ON key = a",