Skip to content
Snippets Groups Projects
Commit bdfab9f9 authored by Prince J Wesley's avatar Prince J Wesley Committed by Sean Owen
Browse files

[SPARK-16909][SPARK CORE] Streaming for postgreSQL JDBC driver

As per the postgreSQL JDBC driver [implementation](https://github.com/pgjdbc/pgjdbc/blob/ab2a6d89081fc2c1fdb2a8600f413db33669022c/pgjdbc/src/main/java/org/postgresql/PGProperty.java#L99), the default record fetch size is 0(which means, it caches all record)

This fix enforces default record fetch size as 10 to enable streaming of data.

Author: Prince J Wesley <princejohnwesley@gmail.com>

Closes #14502 from princejwesley/spark-postgres.
parent 6c1ecb19
No related branches found
No related tags found
No related merge requests found
......@@ -79,14 +79,20 @@ class JdbcRDD[T: ClassTag](
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
// setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
// rather than pulling entire resultset into memory.
// see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
val url = conn.getMetaData.getURL
if (url.startsWith("jdbc:mysql:")) {
// setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force
// streaming results, rather than pulling entire resultset into memory.
// See the below URL
// dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
stmt.setFetchSize(Integer.MIN_VALUE)
logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
} else {
stmt.setFetchSize(100)
}
logInfo(s"statement fetch size set to: ${stmt.getFetchSize}")
stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
val rs = stmt.executeQuery()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment