Skip to content
Snippets Groups Projects
Commit ebb9a3b6 authored by hyukjinkwon's avatar hyukjinkwon Committed by Cheng Lian
Browse files

[SPARK-15916][SQL] JDBC filter push down should respect operator precedence

## What changes were proposed in this pull request?

This PR fixes the problem that the precedence order is messed when pushing where-clause expression to JDBC layer.

**Case 1:**

For sql `select * from table where (a or b) and c`, the where-clause is wrongly converted to JDBC where-clause `a or (b and c)` after filter push down. The consequence is that JDBC may returns less or more rows than expected.

**Case 2:**

For sql `select * from table where always_false_condition`, the result table may not be empty if the JDBC RDD is partitioned using where-clause:
```
spark.read.jdbc(url, table, predicates = Array("partition 1 where clause", "partition 2 where clause"...)
```

## How was this patch tested?

Unit test.

This PR also close #13640

Author: hyukjinkwon <gurwls223@gmail.com>
Author: Sean Zhong <seanzhong@databricks.com>

Closes #13743 from clockfly/SPARK-15916.
parent 7d65a0db
No related branches found
No related tags found
No related merge requests found
...@@ -305,14 +305,14 @@ private[sql] class JDBCRDD( ...@@ -305,14 +305,14 @@ private[sql] class JDBCRDD(
* `filters`, but as a WHERE clause suitable for injection into a SQL query. * `filters`, but as a WHERE clause suitable for injection into a SQL query.
*/ */
private val filterWhereClause: String = private val filterWhereClause: String =
filters.flatMap(JDBCRDD.compileFilter).mkString(" AND ") filters.flatMap(JDBCRDD.compileFilter).map(p => s"($p)").mkString(" AND ")
/** /**
* A WHERE clause representing both `filters`, if any, and the current partition. * A WHERE clause representing both `filters`, if any, and the current partition.
*/ */
private def getWhereClause(part: JDBCPartition): String = { private def getWhereClause(part: JDBCPartition): String = {
if (part.whereClause != null && filterWhereClause.length > 0) { if (part.whereClause != null && filterWhereClause.length > 0) {
"WHERE " + filterWhereClause + " AND " + part.whereClause "WHERE " + s"($filterWhereClause)" + " AND " + s"(${part.whereClause})"
} else if (part.whereClause != null) { } else if (part.whereClause != null) {
"WHERE " + part.whereClause "WHERE " + part.whereClause
} else if (filterWhereClause.length > 0) { } else if (filterWhereClause.length > 0) {
......
...@@ -661,4 +661,30 @@ class JDBCSuite extends SparkFunSuite ...@@ -661,4 +661,30 @@ class JDBCSuite extends SparkFunSuite
assert(oracleDialect.getJDBCType(StringType). assert(oracleDialect.getJDBCType(StringType).
map(_.databaseTypeDefinition).get == "VARCHAR2(255)") map(_.databaseTypeDefinition).get == "VARCHAR2(255)")
} }
private def assertEmptyQuery(sqlString: String): Unit = {
assert(sql(sqlString).collect().isEmpty)
}
test("SPARK-15916: JDBC filter operator push down should respect operator precedence") {
val TRUE = "NAME != 'non_exists'"
val FALSE1 = "THEID > 1000000000"
val FALSE2 = "THEID < -1000000000"
assertEmptyQuery(s"SELECT * FROM foobar WHERE ($TRUE OR $FALSE1) AND $FALSE2")
assertEmptyQuery(s"SELECT * FROM foobar WHERE $FALSE1 AND ($FALSE2 OR $TRUE)")
// Tests JDBCPartition whereClause clause push down.
withTempTable("tempFrame") {
val jdbcPartitionWhereClause = s"$FALSE1 OR $TRUE"
val df = spark.read.jdbc(
urlWithUserAndPass,
"TEST.PEOPLE",
predicates = Array[String](jdbcPartitionWhereClause),
new Properties)
df.createOrReplaceTempView("tempFrame")
assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2")
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment