Skip to content
Snippets Groups Projects
Commit b63938a8 authored by mariusvniekerk's avatar mariusvniekerk Committed by Reynold Xin
Browse files

[SPARK-11881][SQL] Fix for postgresql fetchsize > 0

Reference: https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
In order for PostgreSQL to honor the fetchSize non-zero setting, its Connection.autoCommit needs to be set to false. Otherwise, it will just quietly ignore the fetchSize setting.

This adds a new side-effecting dialect specific beforeFetch method that will fire before a select query is ran.

Author: mariusvniekerk <marius.v.niekerk@gmail.com>

Closes #9861 from mariusvniekerk/SPARK-11881.
parent 6f6bb0e8
No related branches found
No related tags found
No related merge requests found
...@@ -224,6 +224,7 @@ private[sql] object JDBCRDD extends Logging { ...@@ -224,6 +224,7 @@ private[sql] object JDBCRDD extends Logging {
quotedColumns, quotedColumns,
filters, filters,
parts, parts,
url,
properties) properties)
} }
} }
...@@ -241,6 +242,7 @@ private[sql] class JDBCRDD( ...@@ -241,6 +242,7 @@ private[sql] class JDBCRDD(
columns: Array[String], columns: Array[String],
filters: Array[Filter], filters: Array[Filter],
partitions: Array[Partition], partitions: Array[Partition],
url: String,
properties: Properties) properties: Properties)
extends RDD[InternalRow](sc, Nil) { extends RDD[InternalRow](sc, Nil) {
...@@ -361,6 +363,9 @@ private[sql] class JDBCRDD( ...@@ -361,6 +363,9 @@ private[sql] class JDBCRDD(
context.addTaskCompletionListener{ context => close() } context.addTaskCompletionListener{ context => close() }
val part = thePart.asInstanceOf[JDBCPartition] val part = thePart.asInstanceOf[JDBCPartition]
val conn = getConnection() val conn = getConnection()
val dialect = JdbcDialects.get(url)
import scala.collection.JavaConverters._
dialect.beforeFetch(conn, properties.asScala.toMap)
// H2's JDBC driver does not support the setSchema() method. We pass a // H2's JDBC driver does not support the setSchema() method. We pass a
// fully-qualified table name in the SELECT statement. I don't know how to // fully-qualified table name in the SELECT statement. I don't know how to
...@@ -489,6 +494,13 @@ private[sql] class JDBCRDD( ...@@ -489,6 +494,13 @@ private[sql] class JDBCRDD(
} }
try { try {
if (null != conn) { if (null != conn) {
if (!conn.getAutoCommit && !conn.isClosed) {
try {
conn.commit()
} catch {
case e: Throwable => logWarning("Exception committing transaction", e)
}
}
conn.close() conn.close()
} }
logInfo("closed connection") logInfo("closed connection")
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package org.apache.spark.sql.jdbc package org.apache.spark.sql.jdbc
import java.sql.Connection
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.annotation.DeveloperApi import org.apache.spark.annotation.DeveloperApi
...@@ -97,6 +99,15 @@ abstract class JdbcDialect extends Serializable { ...@@ -97,6 +99,15 @@ abstract class JdbcDialect extends Serializable {
s"SELECT * FROM $table WHERE 1=0" s"SELECT * FROM $table WHERE 1=0"
} }
/**
* Override connection specific properties to run before a select is made. This is in place to
* allow dialects that need special treatment to optimize behavior.
* @param connection The connection object
* @param properties The connection properties. This is passed through from the relation.
*/
def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = {
}
} }
/** /**
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.spark.sql.jdbc package org.apache.spark.sql.jdbc
import java.sql.Types import java.sql.{Connection, Types}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
...@@ -70,4 +70,19 @@ private object PostgresDialect extends JdbcDialect { ...@@ -70,4 +70,19 @@ private object PostgresDialect extends JdbcDialect {
override def getTableExistsQuery(table: String): String = { override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1" s"SELECT 1 FROM $table LIMIT 1"
} }
override def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = {
super.beforeFetch(connection, properties)
// According to the postgres jdbc documentation we need to be in autocommit=false if we actually
// want to have fetchsize be non 0 (all the rows). This allows us to not have to cache all the
// rows inside the driver when fetching.
//
// See: https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
//
if (properties.getOrElse("fetchsize", "0").toInt > 0) {
connection.setAutoCommit(false)
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment