diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index c83ec0fcb54b7df4ebd4ba17e37442060d8adb16..69ceea632977debd91c295abf1e90f8e73021469 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -77,6 +77,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
       CombineLimits,
       CombineUnions,
       // Constant folding and strength reduction
+      NullFiltering,
       NullPropagation,
       OptimizeIn,
       ConstantFolding,
@@ -593,6 +594,54 @@ object NullPropagation extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness
+ * by inserting isNotNull filters in the query plan. These filters are currently inserted beneath
+ * existing Filters and Join operators and are inferred based on their data constraints.
+ *
+ * Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
+ * LeftSemi joins.
+ */
+object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case filter @ Filter(condition, child) =>
+      // We generate a list of additional isNotNull filters from the operator's existing constraints
+      // but remove those that are either already part of the filter condition or are part of the
+      // operator's child constraints.
+      val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
+        (child.constraints ++ splitConjunctivePredicates(condition))
+      if (newIsNotNullConstraints.nonEmpty) {
+        Filter(And(newIsNotNullConstraints.reduce(And), condition), child)
+      } else {
+        filter
+      }
+
+    case join @ Join(left, right, joinType, condition) =>
+      val leftIsNotNullConstraints = join.constraints
+        .filter(_.isInstanceOf[IsNotNull])
+        .filter(_.references.subsetOf(left.outputSet)) -- left.constraints
+      val rightIsNotNullConstraints =
+        join.constraints
+          .filter(_.isInstanceOf[IsNotNull])
+          .filter(_.references.subsetOf(right.outputSet)) -- right.constraints
+      val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) {
+        Filter(leftIsNotNullConstraints.reduce(And), left)
+      } else {
+        left
+      }
+      val newRightChild = if (rightIsNotNullConstraints.nonEmpty) {
+        Filter(rightIsNotNullConstraints.reduce(And), right)
+      } else {
+        right
+      }
+      if (newLeftChild != left || newRightChild != right) {
+        Join(newLeftChild, newRightChild, joinType, condition)
+      } else {
+        join
+      }
+  }
+}
+
 /**
  * Replaces [[Expression Expressions]] that can be statically evaluated with
  * equivalent [[Literal]] values.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..7e52d5ef6749c34dae903a514918d36d397d116b
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+class NullFilteringSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Batch("NullFiltering", Once, NullFiltering) ::
+      Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+
+  test("filter: filter out nulls in condition") {
+    val originalQuery = testRelation.where('a === 1).analyze
+    val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("single inner join: filter out nulls on either side on equi-join keys") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val originalQuery = x.join(y,
+      condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze
+    val left = x.where(IsNotNull('a) && IsNotNull('b))
+    val right = y.where(IsNotNull('a) && IsNotNull('c))
+    val correctAnswer = left.join(right,
+      condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("single inner join with pre-existing filters: filter out nulls on either side") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val originalQuery = x.where('b > 5).join(y.where('c === 10),
+      condition = Some("x.a".attr === "y.a".attr)).analyze
+    val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5)
+    val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10)
+    val correctAnswer = left.join(right,
+      condition = Some("x.a".attr === "y.a".attr)).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("single outer join: no null filters are generated") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val originalQuery = x.join(y, FullOuter,
+      condition = Some("x.a".attr === "y.a".attr)).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, originalQuery)
+  }
+
+  test("multiple inner joins: filter out nulls on all sides on equi-join keys") {
+    val t1 = testRelation.subquery('t1)
+    val t2 = testRelation.subquery('t2)
+    val t3 = testRelation.subquery('t3)
+    val t4 = testRelation.subquery('t4)
+
+    val originalQuery = t1
+      .join(t2, condition = Some("t1.b".attr === "t2.b".attr))
+      .join(t3, condition = Some("t2.b".attr === "t3.b".attr))
+      .join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze
+    val correctAnswer = t1.where(IsNotNull('b))
+      .join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr))
+      .join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr))
+      .join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index f9874088b588447350058bbc05df04afb4a43f80..0541844e0bfcd42a244861dd76ebb9fb371e8275 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.util._
 /**
  * Provides helper methods for comparing plans.
  */
-abstract class PlanTest extends SparkFunSuite {
+abstract class PlanTest extends SparkFunSuite with PredicateHelper {
   /**
    * Since attribute references are given globally unique ids during analysis,
    * we must normalize them to check if two different queries are identical.
@@ -39,10 +39,22 @@ abstract class PlanTest extends SparkFunSuite {
     }
   }
 
+  /**
+   * Normalizes the filter conditions that appear in the plan. For instance,
+   * ((expr 1 && expr 2) && expr 3), (expr 1 && expr 2 && expr 3), (expr 3 && (expr 1 && expr 2)
+   * etc., will all now be equivalent.
+   */
+  private def normalizeFilters(plan: LogicalPlan) = {
+    plan transform {
+      case filter @ Filter(condition: Expression, child: LogicalPlan) =>
+        Filter(splitConjunctivePredicates(condition).sortBy(_.hashCode()).reduce(And), child)
+    }
+  }
+
   /** Fails the test if the two plans do not match */
   protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
-    val normalized1 = normalizeExprIds(plan1)
-    val normalized2 = normalizeExprIds(plan2)
+    val normalized1 = normalizeFilters(normalizeExprIds(plan1))
+    val normalized2 = normalizeFilters(normalizeExprIds(plan2))
     if (normalized1 != normalized2) {
       fail(
         s"""
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index f66e08e6ca5c8ee60e7b7dea9704efde785b4e78..a733237a5e717724486a4d23536a7b8ae03f6c71 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -159,7 +159,7 @@ class PlannerSuite extends SharedSQLContext {
 
       withTempTable("testPushed") {
         val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan
-        assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]"))
+        assert(exp.toString.contains("PushedFilters: [IsNotNull(key), EqualTo(key,15)]"))
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index bd51154c58aa695d6d590384f0b5c43d1c076fd1..d2947676a0e584345f0e65a44ca1bb1bd29df7b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -74,10 +74,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
         selectedFilters.foreach { pred =>
           val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
           assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
-          maybeFilter.foreach { f =>
-            // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
-            assert(f.getClass === filterClass)
-          }
+          // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
+          maybeFilter.exists(_.getClass === filterClass)
         }
         checker(stripSparkFilter(query), expected)
       }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
index c94e73c4aa30094ba9552a90cc25dd5af5eb28f2..6ca334dc6d5fed5167271ddab0e9ae74279f4621 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -61,8 +61,8 @@ class OrcFilterSuite extends QueryTest with OrcTest {
       (predicate: Predicate, filterOperator: PredicateLeaf.Operator)
       (implicit df: DataFrame): Unit = {
     def checkComparisonOperator(filter: SearchArgument) = {
-      val operator = filter.getLeaves.asScala.head.getOperator
-      assert(operator === filterOperator)
+      val operator = filter.getLeaves.asScala
+      assert(operator.map(_.getOperator).contains(filterOperator))
     }
     checkFilterPredicate(df, predicate, checkComparisonOperator)
   }
@@ -216,8 +216,9 @@ class OrcFilterSuite extends QueryTest with OrcTest {
       )
       checkFilterPredicate(
         !('_1 < 4),
-        """leaf-0 = (LESS_THAN _1 4)
-          |expr = (not leaf-0)""".stripMargin.trim
+        """leaf-0 = (IS_NULL _1)
+          |leaf-1 = (LESS_THAN _1 4)
+          |expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim
       )
       checkFilterPredicate(
         '_1 < 2 || '_1 > 3,
@@ -227,9 +228,10 @@ class OrcFilterSuite extends QueryTest with OrcTest {
       )
       checkFilterPredicate(
         '_1 < 2 && '_1 > 3,
-        """leaf-0 = (LESS_THAN _1 2)
-          |leaf-1 = (LESS_THAN_EQUALS _1 3)
-          |expr = (and leaf-0 (not leaf-1))""".stripMargin.trim
+        """leaf-0 = (IS_NULL _1)
+          |leaf-1 = (LESS_THAN _1 2)
+          |leaf-2 = (LESS_THAN_EQUALS _1 3)
+          |expr = (and (not leaf-0) leaf-1 (not leaf-2))""".stripMargin.trim
       )
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index 9ab3e11609cec8cda12a94b0f90ad5feedcf86a9..e64bb77a03a58fa7f1ea71e82cdd776bebf00ca7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -192,14 +192,14 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
       }
 
       markup("Checking pushed filters")
-      assert(SimpleTextRelation.pushedFilters === pushedFilters.toSet)
+      assert(pushedFilters.toSet.subsetOf(SimpleTextRelation.pushedFilters))
 
       val expectedInconvertibleFilters = inconvertibleFilters.map(_.expr).toSet
       val expectedUnhandledFilters = unhandledFilters.map(_.expr).toSet
       val expectedPartitioningFilters = partitioningFilters.map(_.expr).toSet
 
       markup("Checking unhandled and inconvertible filters")
-      assert(expectedInconvertibleFilters ++ expectedUnhandledFilters === nonPushedFilters)
+      assert((expectedInconvertibleFilters ++ expectedUnhandledFilters).subsetOf(nonPushedFilters))
 
       markup("Checking partitioning filters")
       val actualPartitioningFilters = splitConjunctivePredicates(filter.expr).filter {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 9cdf1fc585866ed63fd6f8002e5cfc1c0c2e7938..bb552d6aa3e3fe71573e31aa727f19556ac88bd8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -141,12 +141,17 @@ class SimpleTextRelation(
       // Constructs a filter predicate to simulate filter push-down
       val predicate = {
         val filterCondition: Expression = filters.collect {
-          // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` filter
+          // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` and
+          // `isNotNull` filters
           case sources.GreaterThan(column, value) =>
             val dataType = dataSchema(column).dataType
             val literal = Literal.create(value, dataType)
             val attribute = inputAttributes.find(_.name == column).get
             expressions.GreaterThan(attribute, literal)
+          case sources.IsNotNull(column) =>
+            val dataType = dataSchema(column).dataType
+            val attribute = inputAttributes.find(_.name == column).get
+            expressions.IsNotNull(attribute)
         }.reduceOption(expressions.And).getOrElse(Literal(true))
         InterpretedPredicate.create(filterCondition, inputAttributes)
       }
@@ -184,11 +189,12 @@ class SimpleTextRelation(
     }
   }
 
-  // `SimpleTextRelation` only handles `GreaterThan` filter.  This is used to test filter push-down
-  // and `BaseRelation.unhandledFilters()`.
+  // `SimpleTextRelation` only handles `GreaterThan` and `IsNotNull` filters.  This is used to test
+  // filter push-down and `BaseRelation.unhandledFilters()`.
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
     filters.filter {
       case _: GreaterThan => false
+      case _: IsNotNull => false
       case _ => true
     }
   }