Skip to content
Snippets Groups Projects
Commit d0eb9ffe authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Reynold Xin
Browse files

[SPARK-7746][SQL] Add FetchSize parameter for JDBC driver

JIRA: https://issues.apache.org/jira/browse/SPARK-7746

Looks like an easy to add parameter but can show significant performance improvement if the JDBC driver accepts it.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #6283 from viirya/jdbc_fetchsize and squashes the following commits:

de47f94 [Liang-Chi Hsieh] Don't keep fetchSize as single parameter.
b7bff2f [Liang-Chi Hsieh] Add FetchSize parameter for JDBC driver.
parent ddec173c
No related branches found
No related tags found
No related merge requests found
......@@ -211,7 +211,8 @@ private[sql] object JDBCRDD extends Logging {
fqTable,
requiredColumns,
filters,
parts)
parts,
properties)
}
}
......@@ -227,7 +228,8 @@ private[sql] class JDBCRDD(
fqTable: String,
columns: Array[String],
filters: Array[Filter],
partitions: Array[Partition])
partitions: Array[Partition],
properties: Properties)
extends RDD[Row](sc, Nil) {
/**
......@@ -356,6 +358,8 @@ private[sql] class JDBCRDD(
val sqlText = s"SELECT $columnList FROM $fqTable $myWhereClause"
val stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
val fetchSize = properties.getProperty("fetchSize", "0").toInt
stmt.setFetchSize(fetchSize)
val rs = stmt.executeQuery()
val conversions = getConversions(schema)
......
......@@ -67,7 +67,15 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))
sql(
s"""
|CREATE TEMPORARY TABLE fetchtwo
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass',
| fetchSize '2')
""".stripMargin.replaceAll("\n", " "))
sql(
s"""
|CREATE TEMPORARY TABLE parts
......@@ -185,6 +193,14 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
assert(names(2).equals("mary"))
}
test("SELECT first field when fetchSize is two") {
val names = sql("SELECT NAME FROM fetchtwo").collect().map(x => x.getString(0)).sortWith(_ < _)
assert(names.size === 3)
assert(names(0).equals("fred"))
assert(names(1).equals("joe 'foo' \"bar\""))
assert(names(2).equals("mary"))
}
test("SELECT second field") {
val ids = sql("SELECT THEID FROM foobar").collect().map(x => x.getInt(0)).sortWith(_ < _)
assert(ids.size === 3)
......@@ -192,6 +208,14 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
assert(ids(1) === 2)
assert(ids(2) === 3)
}
test("SELECT second field when fetchSize is two") {
val ids = sql("SELECT THEID FROM fetchtwo").collect().map(x => x.getInt(0)).sortWith(_ < _)
assert(ids.size === 3)
assert(ids(0) === 1)
assert(ids(1) === 2)
assert(ids(2) === 3)
}
test("SELECT * partitioned") {
assert(sql("SELECT * FROM parts").collect().size == 3)
......@@ -232,6 +256,13 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 3)
}
test("Basic API with FetchSize") {
val properties = new Properties
properties.setProperty("fetchSize", "2")
assert(TestSQLContext.read.jdbc(
urlWithUserAndPass, "TEST.PEOPLE", properties).collect().length === 3)
}
test("Partitioning via JDBCPartitioningInfo API") {
assert(
TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment