Skip to content
Snippets Groups Projects
Commit 6f710f9f authored by Takeshi YAMAMURO's avatar Takeshi YAMAMURO Committed by Yin Huai
Browse files

[SPARK-12476][SQL] Implement JdbcRelation#unhandledFilters for removing unnecessary Spark Filter

Input: SELECT * FROM jdbcTable WHERE col0 = 'xxx'

Current plan:
```
== Optimized Logical Plan ==
Project [col0#0,col1#1]
+- Filter (col0#0 = xxx)
   +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})

== Physical Plan ==
+- Filter (col0#0 = xxx)
   +- Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)]
```

This patch enables a plan below;
```
== Optimized Logical Plan ==
Project [col0#0,col1#1]
+- Filter (col0#0 = xxx)
   +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})

== Physical Plan ==
Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)]
```

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #10427 from maropu/RemoveFilterInJdbcScan.
parent 9267bc68
No related branches found
No related tags found
No related merge requests found
...@@ -189,7 +189,7 @@ private[sql] object JDBCRDD extends Logging { ...@@ -189,7 +189,7 @@ private[sql] object JDBCRDD extends Logging {
* Turns a single Filter into a String representing a SQL expression. * Turns a single Filter into a String representing a SQL expression.
* Returns None for an unhandled filter. * Returns None for an unhandled filter.
*/ */
private def compileFilter(f: Filter): Option[String] = { private[jdbc] def compileFilter(f: Filter): Option[String] = {
Option(f match { Option(f match {
case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" case EqualTo(attr, value) => s"$attr = ${compileValue(value)}"
case EqualNullSafe(attr, value) => case EqualNullSafe(attr, value) =>
......
...@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation( ...@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation(
override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) override val schema: StructType = JDBCRDD.resolveTable(url, table, properties)
// Check if JDBCRDD.compileFilter can accept input filters
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
filters.filter(JDBCRDD.compileFilter(_).isEmpty)
}
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row] // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
JDBCRDD.scanTable( JDBCRDD.scanTable(
......
...@@ -22,12 +22,12 @@ import java.sql.{Date, DriverManager, Timestamp} ...@@ -22,12 +22,12 @@ import java.sql.{Date, DriverManager, Timestamp}
import java.util.{Calendar, GregorianCalendar, Properties} import java.util.{Calendar, GregorianCalendar, Properties}
import org.h2.jdbc.JdbcSQLException import org.h2.jdbc.JdbcSQLException
import org.scalatest.BeforeAndAfter import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.ExplainCommand import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources._
...@@ -183,26 +183,34 @@ class JDBCSuite extends SparkFunSuite ...@@ -183,26 +183,34 @@ class JDBCSuite extends SparkFunSuite
} }
test("SELECT * WHERE (simple predicates)") { test("SELECT * WHERE (simple predicates)") {
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) def checkPushdown(df: DataFrame): DataFrame = {
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) val parentPlan = df.queryExecution.executedPlan
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) // Check if SparkPlan Filter is removed in a physical plan and
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) // the plan only has PhysicalRDD to scan JDBCRelation.
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) assert(parentPlan.isInstanceOf[PhysicalRDD])
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) df
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) }
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')"))
.collect().size == 2) .collect().size == 2)
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')"))
.collect().size == 2) .collect().size == 2)
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'"))
.collect().size == 2) .collect().size == 2)
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' "
+ "AND THEID = 2")).collect().size == 2) + "AND THEID = 2")).collect().size == 2)
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1)
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1)
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1)
assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1)
assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0)
// This is a test to reflect discussion in SPARK-12218. // This is a test to reflect discussion in SPARK-12218.
// The older versions of spark have this kind of bugs in parquet data source. // The older versions of spark have this kind of bugs in parquet data source.
...@@ -210,6 +218,28 @@ class JDBCSuite extends SparkFunSuite ...@@ -210,6 +218,28 @@ class JDBCSuite extends SparkFunSuite
val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')") val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')")
assert(df1.collect.toSet === Set(Row("mary", 2))) assert(df1.collect.toSet === Set(Row("mary", 2)))
assert(df2.collect.toSet === Set(Row("mary", 2))) assert(df2.collect.toSet === Set(Row("mary", 2)))
def checkNotPushdown(df: DataFrame): DataFrame = {
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
// cannot compile given predicates.
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.Filter])
df
}
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0)
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2)
}
test("SELECT COUNT(1) WHERE (predicates)") {
// Check if an answer is correct when Filter is removed from operations such as count() which
// does not require any columns. In some data sources, e.g., Parquet, `requiredColumns` in
// org.apache.spark.sql.sources.interfaces is not given in logical plans, but some filters
// are applied for columns with Filter producing wrong results. On the other hand, JDBCRDD
// correctly handles this case by assigning `requiredColumns` properly. See PR 10427 for more
// discussions.
assert(sql("SELECT COUNT(1) FROM foobar WHERE NAME = 'mary'").collect.toSet === Set(Row(1)))
} }
test("SELECT * WHERE (quoted strings)") { test("SELECT * WHERE (quoted strings)") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment