Skip to content
Snippets Groups Projects
Commit f10660fe authored by zsxwing's avatar zsxwing Committed by Michael Armbrust
Browse files

[SPARK-10036] [SQL] Load JDBC driver in DataFrameReader.jdbc and DataFrameWriter.jdbc

This PR uses `JDBCRDD.getConnector` to load JDBC driver before creating connection in `DataFrameReader.jdbc` and `DataFrameWriter.jdbc`.

Author: zsxwing <zsxwing@gmail.com>

Closes #8232 from zsxwing/SPARK-10036 and squashes the following commits:

adf75de [zsxwing] Add extraOptions to the connection properties
57f59d4 [zsxwing] Load JDBC driver in DataFrameReader.jdbc and DataFrameWriter.jdbc
parent a4acdabb
No related branches found
No related tags found
No related merge requests found
...@@ -197,7 +197,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { ...@@ -197,7 +197,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
table: String, table: String,
parts: Array[Partition], parts: Array[Partition],
connectionProperties: Properties): DataFrame = { connectionProperties: Properties): DataFrame = {
val relation = JDBCRelation(url, table, parts, connectionProperties)(sqlContext) val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val relation = JDBCRelation(url, table, parts, props)(sqlContext)
sqlContext.baseRelationToDataFrame(relation) sqlContext.baseRelationToDataFrame(relation)
} }
......
...@@ -244,7 +244,13 @@ final class DataFrameWriter private[sql](df: DataFrame) { ...@@ -244,7 +244,13 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* should be included. * should be included.
*/ */
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
val conn = JdbcUtils.createConnection(url, connectionProperties) val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val conn = JdbcUtils.createConnection(url, props)
try { try {
var tableExists = JdbcUtils.tableExists(conn, table) var tableExists = JdbcUtils.tableExists(conn, table)
...@@ -272,7 +278,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { ...@@ -272,7 +278,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
conn.close() conn.close()
} }
JdbcUtils.saveTable(df, url, table, connectionProperties) JdbcUtils.saveTable(df, url, table, props)
} }
/** /**
......
...@@ -118,7 +118,7 @@ private[sql] object JDBCRDD extends Logging { ...@@ -118,7 +118,7 @@ private[sql] object JDBCRDD extends Logging {
*/ */
def resolveTable(url: String, table: String, properties: Properties): StructType = { def resolveTable(url: String, table: String, properties: Properties): StructType = {
val dialect = JdbcDialects.get(url) val dialect = JdbcDialects.get(url)
val conn: Connection = DriverManager.getConnection(url, properties) val conn: Connection = getConnector(properties.getProperty("driver"), url, properties)()
try { try {
val rs = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0").executeQuery() val rs = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0").executeQuery()
try { try {
...@@ -171,7 +171,8 @@ private[sql] object JDBCRDD extends Logging { ...@@ -171,7 +171,8 @@ private[sql] object JDBCRDD extends Logging {
* getConnector is run on the driver code, while the function it returns * getConnector is run on the driver code, while the function it returns
* is run on the executor. * is run on the executor.
* *
* @param driver - The class name of the JDBC driver for the given url. * @param driver - The class name of the JDBC driver for the given url, or null if the class name
* is not necessary.
* @param url - The JDBC url to connect to. * @param url - The JDBC url to connect to.
* *
* @return A function that loads the driver and connects to the url. * @return A function that loads the driver and connects to the url.
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources.jdbc package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Connection, DriverManager, PreparedStatement} import java.sql.{Connection, PreparedStatement}
import java.util.Properties import java.util.Properties
import scala.util.Try import scala.util.Try
...@@ -36,7 +36,7 @@ object JdbcUtils extends Logging { ...@@ -36,7 +36,7 @@ object JdbcUtils extends Logging {
* Establishes a JDBC connection. * Establishes a JDBC connection.
*/ */
def createConnection(url: String, connectionProperties: Properties): Connection = { def createConnection(url: String, connectionProperties: Properties): Connection = {
DriverManager.getConnection(url, connectionProperties) JDBCRDD.getConnector(connectionProperties.getProperty("driver"), url, connectionProperties)()
} }
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment