diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 94b6fb084d38aa69ed7cb64da6f4a336eddb6bac..cb5ff679598680c831058b34f14492238a775e8d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import scala.collection.immutable.HashSet
 import org.apache.spark.sql.catalyst.analysis.UnresolvedException
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.types.BooleanType
@@ -48,6 +47,14 @@ trait PredicateHelper {
     }
   }
 
+  protected def splitDisjunctivePredicates(condition: Expression): Seq[Expression] = {
+    condition match {
+      case Or(cond1, cond2) =>
+        splitDisjunctivePredicates(cond1) ++ splitDisjunctivePredicates(cond2)
+      case other => other :: Nil
+    }
+  }
+
   /**
    * Returns true if `expr` can be evaluated using only the output of `plan`.  This method
    * can be used to determine when is is acceptable to move expression evaluation within a query
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0f2eae6400d2135937a37f84cc91d3a096c31875..cd3137980ca43077551f675de106b7259ce2da36 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -294,11 +294,16 @@ object OptimizeIn extends Rule[LogicalPlan] {
 }
 
 /**
- * Simplifies boolean expressions where the answer can be determined without evaluating both sides.
+ * Simplifies boolean expressions:
+ *
+ * 1. Simplifies expressions whose answer can be determined without evaluating both sides.
+ * 2. Eliminates / extracts common factors.
+ * 3. Removes `Not` operator.
+ *
  * Note that this rule can eliminate expressions that might otherwise have been evaluated and thus
  * is only safe when evaluations of expressions does not result in side effects.
  */
-object BooleanSimplification extends Rule[LogicalPlan] {
+object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case q: LogicalPlan => q transformExpressionsUp {
       case and @ And(left, right) =>
@@ -307,7 +312,9 @@ object BooleanSimplification extends Rule[LogicalPlan] {
           case (l, Literal(true, BooleanType)) => l
           case (Literal(false, BooleanType), _) => Literal(false)
           case (_, Literal(false, BooleanType)) => Literal(false)
-          case (_, _) => and
+          // a && a && a ... => a
+          case _ if splitConjunctivePredicates(and).distinct.size == 1 => left
+          case _ => and
         }
 
       case or @ Or(left, right) =>
@@ -316,7 +323,19 @@ object BooleanSimplification extends Rule[LogicalPlan] {
           case (_, Literal(true, BooleanType)) => Literal(true)
           case (Literal(false, BooleanType), r) => r
           case (l, Literal(false, BooleanType)) => l
-          case (_, _) => or
+          // a || a || a ... => a
+          case _ if splitDisjunctivePredicates(or).distinct.size == 1 => left
+          // (a && b && c && ...) || (a && b && d && ...) => a && b && (c || d || ...)
+          case _ =>
+            val lhsSet = splitConjunctivePredicates(left).toSet
+            val rhsSet = splitConjunctivePredicates(right).toSet
+            val common = lhsSet.intersect(rhsSet)
+
+            (lhsSet.diff(common).reduceOption(And) ++ rhsSet.diff(common).reduceOption(And))
+              .reduceOption(Or)
+              .map(_ :: common.toList)
+              .getOrElse(common.toList)
+              .reduce(And)
         }
 
       case not @ Not(exp) =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..906300d8336cb9059af713d1c186b05c9ce55874
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFiltersSuite.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+// For implicit conversions
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+
+class NormalizeFiltersSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Seq(
+      Batch("AnalysisNodes", Once,
+        EliminateAnalysisOperators),
+      Batch("NormalizeFilters", FixedPoint(100),
+        BooleanSimplification,
+        SimplifyFilters))
+  }
+
+  val relation = LocalRelation('a.int, 'b.int, 'c.string)
+
+  def checkExpression(original: Expression, expected: Expression): Unit = {
+    val actual = Optimize(relation.where(original)).collect { case f: Filter => f.condition }.head
+    val result = (actual, expected) match {
+      case (And(l1, r1), And(l2, r2)) => (l1 == l2 && r1 == r2) || (l1 == r2 && l2 == r1)
+      case (Or (l1, r1), Or (l2, r2)) => (l1 == l2 && r1 == r2) || (l1 == r2 && l2 == r1)
+      case (lhs, rhs) => lhs fastEquals rhs
+    }
+
+    assert(result, s"$actual isn't equivalent to $expected")
+  }
+
+  test("a && a => a") {
+    checkExpression('a === 1 && 'a === 1, 'a === 1)
+    checkExpression('a === 1 && 'a === 1 && 'a === 1, 'a === 1)
+  }
+
+  test("a || a => a") {
+    checkExpression('a === 1 || 'a === 1, 'a === 1)
+    checkExpression('a === 1 || 'a === 1 || 'a === 1, 'a === 1)
+  }
+
+  test("(a && b) || (a && c) => a && (b || c)") {
+    checkExpression(
+      ('a === 1 && 'a < 10) || ('a > 2 && 'a === 1),
+      ('a === 1) && ('a < 10 || 'a > 2))
+
+    checkExpression(
+      ('a < 1 && 'b > 2 && 'c.isNull) || ('a < 1 && 'c === "hello" && 'b > 2),
+      ('c.isNull || 'c === "hello") && 'a < 1 && 'b > 2)
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
index 82afa31a99a7ed81490c6846a9ae9de2e90a33dc..1915c25392f1e97dc743071cc9e9feecc588fed9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
@@ -105,7 +105,9 @@ class PartitionBatchPruningSuite extends FunSuite with BeforeAndAfterAll with Be
 
     test(query) {
       val schemaRdd = sql(query)
-      assertResult(expectedQueryResult.toArray, "Wrong query result") {
+      val queryExecution = schemaRdd.queryExecution
+
+      assertResult(expectedQueryResult.toArray, s"Wrong query result: $queryExecution") {
         schemaRdd.collect().map(_.head).toArray
       }
 
@@ -113,8 +115,10 @@ class PartitionBatchPruningSuite extends FunSuite with BeforeAndAfterAll with Be
         case in: InMemoryColumnarTableScan => (in.readPartitions.value, in.readBatches.value)
       }.head
 
-      assert(readBatches === expectedReadBatches, "Wrong number of read batches")
-      assert(readPartitions === expectedReadPartitions, "Wrong number of read partitions")
+      assert(readBatches === expectedReadBatches, s"Wrong number of read batches: $queryExecution")
+      assert(
+        readPartitions === expectedReadPartitions,
+        s"Wrong number of read partitions: $queryExecution")
     }
   }
 }