diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 656e7ecdab0bbf60cc404b995dc927e27289b7b8..be53a8d03848a8dc606d122e7f7ba5d9b080b565 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1061,10 +1061,11 @@ the following case-sensitive options:
   </tr>
 
   <tr>
-    <td><code>partitionColumn, lowerBound, upperBound, numPartitions</code></td>
+    <td><code>partitionColumn, lowerBound, upperBound</code></td>
     <td>
-      These options must all be specified if any of them is specified. They describe how to
-      partition the table when reading in parallel from multiple workers.
+      These options must all be specified if any of them is specified. In addition,
+      <code>numPartitions</code> must be specified. They describe how to partition the table when
+      reading in parallel from multiple workers.
       <code>partitionColumn</code> must be a numeric column from the table in question. Notice
       that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the
       partition stride, not for filtering the rows in table. So all rows in the table will be
@@ -1072,6 +1073,16 @@ the following case-sensitive options:
     </td>
   </tr>
 
+  <tr>
+     <td><code>numPartitions</code></td>
+     <td>
+       The maximum number of partitions that can be used for parallelism in table reading and
+       writing. This also determines the maximum number of concurrent JDBC connections.
+       If the number of partitions to write exceeds this limit, we decrease it to this limit by
+       calling <code>coalesce(numPartitions)</code> before writing.
+     </td>
+  </tr>
+
   <tr>
     <td><code>fetchsize</code></td>
     <td>
@@ -1086,13 +1097,6 @@ the following case-sensitive options:
      </td>
   </tr>
 
-  <tr>
-     <td><code>maxConnections</code></td>
-     <td>
-       The maximum number of concurrent JDBC connections that can be used, if set. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing.
-     </td>
-  </tr>
-
   <tr>
      <td><code>isolationLevel</code></td>
      <td>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index d416eec6ddaec1eec8b7d0dc7b305a00a78d9e90..fe2f4c1d78647933008379497620820165a90baf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -74,19 +74,23 @@ class JDBCOptions(
     }
   }
 
+  // the number of partitions
+  val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt)
+  require(numPartitions.isEmpty || numPartitions.get > 0,
+    s"Invalid value `${numPartitions.get}` for parameter `$JDBC_NUM_PARTITIONS`. " +
+      "The minimum value is 1.")
+
   // ------------------------------------------------------------
   // Optional parameters only for reading
   // ------------------------------------------------------------
   // the column used to partition
-  val partitionColumn = parameters.getOrElse(JDBC_PARTITION_COLUMN, null)
+  val partitionColumn = parameters.get(JDBC_PARTITION_COLUMN)
   // the lower bound of partition column
-  val lowerBound = parameters.getOrElse(JDBC_LOWER_BOUND, null)
+  val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong)
   // the upper bound of the partition column
-  val upperBound = parameters.getOrElse(JDBC_UPPER_BOUND, null)
-  // the number of partitions
-  val numPartitions = parameters.getOrElse(JDBC_NUM_PARTITIONS, null)
-  require(partitionColumn == null ||
-    (lowerBound != null && upperBound != null && numPartitions != null),
+  val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong)
+  require(partitionColumn.isEmpty ||
+    (lowerBound.isDefined && upperBound.isDefined && numPartitions.isDefined),
     s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," +
       s" and '$JDBC_NUM_PARTITIONS' are required.")
   val fetchSize = {
@@ -122,11 +126,6 @@ class JDBCOptions(
       case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
       case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
     }
-  // the maximum number of connections
-  val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS).map(_.toInt)
-  require(maxConnections.isEmpty || maxConnections.get > 0,
-    s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " +
-      "The minimum value is 1.")
 }
 
 object JDBCOptions {
@@ -149,5 +148,4 @@ object JDBCOptions {
   val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
   val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
   val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
-  val JDBC_MAX_CONNECTIONS = newOption("maxConnections")
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index 4420b3b18a9071fa8e1d9458f8dcb91d8aca2684..74f397c01e2f582210c2945669082dfe15f9a594 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -35,11 +35,13 @@ class JdbcRelationProvider extends CreatableRelationProvider
     val upperBound = jdbcOptions.upperBound
     val numPartitions = jdbcOptions.numPartitions
 
-    val partitionInfo = if (partitionColumn == null) {
+    val partitionInfo = if (partitionColumn.isEmpty) {
+      assert(lowerBound.isEmpty && upperBound.isEmpty)
       null
     } else {
+      assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty)
       JDBCPartitioningInfo(
-        partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt)
+        partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get)
     }
     val parts = JDBCRelation.columnPartition(partitionInfo)
     JDBCRelation(parts, jdbcOptions)(sqlContext.sparkSession)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index cdc3c99daa1ab3774e5b8acb765edb5259c01062..c2a1ad84b952e5378cf977a9b6985c29b55d162a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -667,10 +667,10 @@ object JdbcUtils extends Logging {
     val getConnection: () => Connection = createConnectionFactory(options)
     val batchSize = options.batchSize
     val isolationLevel = options.isolationLevel
-    val maxConnections = options.maxConnections
+    val numPartitions = options.numPartitions
     val repartitionedDF =
-      if (maxConnections.isDefined && maxConnections.get < df.rdd.getNumPartitions) {
-        df.coalesce(maxConnections.get)
+      if (numPartitions.isDefined && numPartitions.get < df.rdd.getNumPartitions) {
+        df.coalesce(numPartitions.get)
       } else {
         df
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 5795b4d860cb179a39c40e234d1f05d440f70763..c834419948c530a588da56ad47e29ce43124bd95 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -313,15 +313,15 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
       .save()
   }
 
-  test("SPARK-18413: Add `maxConnections` JDBCOption") {
+  test("SPARK-18413: Use `numPartitions` JDBCOption") {
     val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
     val e = intercept[IllegalArgumentException] {
       df.write.format("jdbc")
         .option("dbtable", "TEST.SAVETEST")
         .option("url", url1)
-        .option(s"${JDBCOptions.JDBC_MAX_CONNECTIONS}", "0")
+        .option(s"${JDBCOptions.JDBC_NUM_PARTITIONS}", "0")
         .save()
     }.getMessage
-    assert(e.contains("Invalid value `0` for parameter `maxConnections`. The minimum value is 1"))
+    assert(e.contains("Invalid value `0` for parameter `numPartitions`. The minimum value is 1"))
   }
 }