diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b6ad5db74e3c842526577f7baf0a9148a8b613f8..6ba8b33b3fa74e807ced977c09618bfd41b0fa07 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -689,7 +689,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
     // state and all the input rows processed before. In another word, the order of input rows
     // matters for non-deterministic expressions, while pushing down predicates changes the order.
     case filter @ Filter(condition, project @ Project(fields, grandChild))
-      if fields.forall(_.deterministic) =>
+      if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>
 
       // Create a map of Aliases to their values from the child projection.
       // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
@@ -830,6 +830,20 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
       filter
     }
   }
+
+  /**
+   * Check if we can safely push a filter through a projection, by making sure that predicate
+   * subqueries in the condition do not contain the same attributes as the plan they are moved
+   * into. This can happen when the plan and predicate subquery have the same source.
+   */
+  private def canPushThroughCondition(plan: LogicalPlan, condition: Expression): Boolean = {
+    val attributes = plan.outputSet
+    val matched = condition.find {
+      case PredicateSubquery(p, _, _, _) => p.outputSet.intersect(attributes).nonEmpty
+      case _ => false
+    }
+    matched.isEmpty
+  }
 }
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index eab45050f7e6335ffe73fe545f46078f1cc71f1f..89348668340be301e6f1668e856a1b6ac1967ab3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -608,8 +608,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
             | where exists (select 1 from onerow t2 where t1.c1=t2.c1)
             | and   exists (select 1 from onerow LIMIT 1)""".stripMargin),
         Row(1) :: Nil)
-     }
-   }
+    }
+  }
 
   test("SPARK-16804: Correlated subqueries containing LIMIT - 2") {
     withTempView("onerow") {
@@ -623,6 +623,22 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
             |               from   (select 1 from onerow t2 LIMIT 1)
             |               where  t1.c1=t2.c1)""".stripMargin),
         Row(1) :: Nil)
-     }
-   }
+    }
+  }
+
+  test("SPARK-17337: Incorrect column resolution leads to incorrect results") {
+    withTempView("t1", "t2") {
+      Seq(1, 2).toDF("c1").createOrReplaceTempView("t1")
+      Seq(1).toDF("c2").createOrReplaceTempView("t2")
+
+      checkAnswer(
+        sql(
+          """
+            | select *
+            | from   (select t2.c2+1 as c3
+            |         from   t1 left join t2 on t1.c1=t2.c2) t3
+            | where  c3 not in (select c2 from t2)""".stripMargin),
+        Row(2) :: Nil)
+    }
+  }
 }