diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 9ea955b0100175c62278c08223a59dfc2cc9ac9e..6dc7bfe333498777e1aebd6e30b7848040778e35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -197,7 +197,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
       table: String,
       parts: Array[Partition],
       connectionProperties: Properties): DataFrame = {
-    val relation = JDBCRelation(url, table, parts, connectionProperties)(sqlContext)
+    val props = new Properties()
+    extraOptions.foreach { case (key, value) =>
+      props.put(key, value)
+    }
+    // connectionProperties should override settings in extraOptions
+    props.putAll(connectionProperties)
+    val relation = JDBCRelation(url, table, parts, props)(sqlContext)
     sqlContext.baseRelationToDataFrame(relation)
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 5fa11da4c38cd4b371e5eaa5fd76e37a95bfe8f2..f0bf1be506411dc3876b4d8fcecedbd7eda583e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -244,7 +244,13 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    *                             should be included.
    */
   def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
-    val conn = JdbcUtils.createConnection(url, connectionProperties)
+    val props = new Properties()
+    extraOptions.foreach { case (key, value) =>
+      props.put(key, value)
+    }
+    // connectionProperties should override settings in extraOptions
+    props.putAll(connectionProperties)
+    val conn = JdbcUtils.createConnection(url, props)
 
     try {
       var tableExists = JdbcUtils.tableExists(conn, table)
@@ -272,7 +278,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       conn.close()
     }
 
-    JdbcUtils.saveTable(df, url, table, connectionProperties)
+    JdbcUtils.saveTable(df, url, table, props)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 8eab6a0adccc474e6426756d289998de33e00a6f..e537d631f4559e8e542bcf1eecb4a4a17602e708 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -118,7 +118,7 @@ private[sql] object JDBCRDD extends Logging {
    */
   def resolveTable(url: String, table: String, properties: Properties): StructType = {
     val dialect = JdbcDialects.get(url)
-    val conn: Connection = DriverManager.getConnection(url, properties)
+    val conn: Connection = getConnector(properties.getProperty("driver"), url, properties)()
     try {
       val rs = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0").executeQuery()
       try {
@@ -171,7 +171,8 @@ private[sql] object JDBCRDD extends Logging {
    * getConnector is run on the driver code, while the function it returns
    * is run on the executor.
    *
-   * @param driver - The class name of the JDBC driver for the given url.
+   * @param driver - The class name of the JDBC driver for the given url, or null if the class name
+   *                 is not necessary.
    * @param url - The JDBC url to connect to.
    *
    * @return A function that loads the driver and connects to the url.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 8ee3b8bda8fc76fa820f63aff1d7316b13ad595f..2d0e736ee4766f4b8df182ae53a1dedcc7e5feb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
-import java.sql.{Connection, DriverManager, PreparedStatement}
+import java.sql.{Connection, PreparedStatement}
 import java.util.Properties
 
 import scala.util.Try
@@ -36,7 +36,7 @@ object JdbcUtils extends Logging {
    * Establishes a JDBC connection.
    */
   def createConnection(url: String, connectionProperties: Properties): Connection = {
-    DriverManager.getConnection(url, connectionProperties)
+    JDBCRDD.getConnector(connectionProperties.getProperty("driver"), url, connectionProperties)()
   }
 
   /**