diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d82af94dbffb7e9b8c089112f8d6d9f1a4ff13ca..a51b385399d8858d578a220bf5540b1a443de025 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -867,6 +867,25 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
         filter
       }
 
+    case filter @ Filter(condition, watermark: EventTimeWatermark) =>
+      // We can only push deterministic predicates which don't reference the watermark attribute.
+      // We could in theory span() only on determinism and pull out deterministic predicates
+      // on the watermark separately. But it seems unnecessary and a bit confusing to not simply
+      // use the prefix as we do for nondeterminism in other cases.
+
+      val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(
+        p => p.deterministic && !p.references.contains(watermark.eventTime))
+
+      if (pushDown.nonEmpty) {
+        val pushDownPredicate = pushDown.reduceLeft(And)
+        val newWatermark = watermark.copy(child = Filter(pushDownPredicate, watermark.child))
+        // If there is no more filter to stay up, just eliminate the filter.
+        // Otherwise, create "Filter(stayUp) <- watermark <- Filter(pushDownPredicate)".
+        if (stayUp.isEmpty) newWatermark else Filter(stayUp.reduceLeft(And), newWatermark)
+      } else {
+        filter
+      }
+
     case filter @ Filter(_, u: UnaryNode)
         if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
       pushDownPredicate(filter, u.child) { predicate =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 3553d23560dad9f58fdb96fcc83e36c3bdbbeef8..582b3ead5e54aca8b93a110ab011cbb628ce2913 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.unsafe.types.CalendarInterval
 
 class FilterPushdownSuite extends PlanTest {
 
@@ -1134,4 +1135,60 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
       checkAnalysis = false)
   }
+
+  test("watermark pushdown: no pushdown on watermark attribute") {
+    val interval = new CalendarInterval(2, 2000L)
+
+    // Verify that all conditions preceding the first watermark touching condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = EventTimeWatermark('b, interval, testRelation)
+      .where('a === 5 && 'b === 10 && 'c === 5)
+    val correctAnswer = EventTimeWatermark(
+      'b, interval, testRelation.where('a === 5))
+      .where('b === 10 && 'c === 5)
+
+    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
+      checkAnalysis = false)
+  }
+
+  test("watermark pushdown: no pushdown for nondeterministic filter") {
+    val interval = new CalendarInterval(2, 2000L)
+
+    // Verify that all conditions preceding the first watermark touching condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = EventTimeWatermark('c, interval, testRelation)
+      .where('a === 5 && 'b === Rand(10) && 'c === 5)
+    val correctAnswer = EventTimeWatermark(
+      'c, interval, testRelation.where('a === 5))
+      .where('b === Rand(10) && 'c === 5)
+
+    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
+      checkAnalysis = false)
+  }
+
+  test("watermark pushdown: full pushdown") {
+    val interval = new CalendarInterval(2, 2000L)
+
+    // Verify that all conditions preceding the first watermark touching condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = EventTimeWatermark('c, interval, testRelation)
+      .where('a === 5 && 'b === 10)
+    val correctAnswer = EventTimeWatermark(
+      'c, interval, testRelation.where('a === 5 && 'b === 10))
+
+    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
+      checkAnalysis = false)
+  }
+
+  test("watermark pushdown: empty pushdown") {
+    val interval = new CalendarInterval(2, 2000L)
+
+    // Verify that all conditions preceding the first watermark touching condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = EventTimeWatermark('a, interval, testRelation)
+      .where('a === 5 && 'b === 10)
+
+    comparePlans(Optimize.execute(originalQuery.analyze), originalQuery.analyze,
+      checkAnalysis = false)
+  }
 }