From ebb9a3b6fd834e2c856a192b4455aab83e9c4dc8 Mon Sep 17 00:00:00 2001
From: hyukjinkwon <gurwls223@gmail.com>
Date: Fri, 17 Jun 2016 17:11:38 -0700
Subject: [PATCH] [SPARK-15916][SQL] JDBC filter push down should respect
 operator precedence

## What changes were proposed in this pull request?

This PR fixes the problem that the precedence order is messed when pushing where-clause expression to JDBC layer.

**Case 1:**

For sql `select * from table where (a or b) and c`, the where-clause is wrongly converted to JDBC where-clause `a or (b and c)` after filter push down. The consequence is that JDBC may returns less or more rows than expected.

**Case 2:**

For sql `select * from table where always_false_condition`, the result table may not be empty if the JDBC RDD is partitioned using where-clause:
```
spark.read.jdbc(url, table, predicates = Array("partition 1 where clause", "partition 2 where clause"...)
```

## How was this patch tested?

Unit test.

This PR also close #13640

Author: hyukjinkwon <gurwls223@gmail.com>
Author: Sean Zhong <seanzhong@databricks.com>

Closes #13743 from clockfly/SPARK-15916.
---
 .../execution/datasources/jdbc/JDBCRDD.scala  |  4 +--
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 26 +++++++++++++++++++
 2 files changed, 28 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 8d0906e574..44cfbb9fbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -305,14 +305,14 @@ private[sql] class JDBCRDD(
    * `filters`, but as a WHERE clause suitable for injection into a SQL query.
    */
   private val filterWhereClause: String =
-    filters.flatMap(JDBCRDD.compileFilter).mkString(" AND ")
+    filters.flatMap(JDBCRDD.compileFilter).map(p => s"($p)").mkString(" AND ")
 
   /**
    * A WHERE clause representing both `filters`, if any, and the current partition.
    */
   private def getWhereClause(part: JDBCPartition): String = {
     if (part.whereClause != null && filterWhereClause.length > 0) {
-      "WHERE " + filterWhereClause + " AND " + part.whereClause
+      "WHERE " + s"($filterWhereClause)" + " AND " + s"(${part.whereClause})"
     } else if (part.whereClause != null) {
       "WHERE " + part.whereClause
     } else if (filterWhereClause.length > 0) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index abb7918ae6..d6ec40c18b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -661,4 +661,30 @@ class JDBCSuite extends SparkFunSuite
     assert(oracleDialect.getJDBCType(StringType).
       map(_.databaseTypeDefinition).get == "VARCHAR2(255)")
   }
+
+  private def assertEmptyQuery(sqlString: String): Unit = {
+    assert(sql(sqlString).collect().isEmpty)
+  }
+
+  test("SPARK-15916: JDBC filter operator push down should respect operator precedence") {
+    val TRUE = "NAME != 'non_exists'"
+    val FALSE1 = "THEID > 1000000000"
+    val FALSE2 = "THEID < -1000000000"
+
+    assertEmptyQuery(s"SELECT * FROM foobar WHERE ($TRUE OR $FALSE1) AND $FALSE2")
+    assertEmptyQuery(s"SELECT * FROM foobar WHERE $FALSE1 AND ($FALSE2 OR $TRUE)")
+
+    // Tests JDBCPartition whereClause clause push down.
+    withTempTable("tempFrame") {
+      val jdbcPartitionWhereClause = s"$FALSE1 OR $TRUE"
+      val df = spark.read.jdbc(
+        urlWithUserAndPass,
+        "TEST.PEOPLE",
+        predicates = Array[String](jdbcPartitionWhereClause),
+        new Properties)
+
+      df.createOrReplaceTempView("tempFrame")
+      assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2")
+    }
+  }
 }
-- 
GitLab