From fb07bbe575aabe68422fd3a31865101fb7fa1722 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun <dongjoon@apache.org> Date: Fri, 25 Nov 2016 10:35:07 -0800 Subject: [PATCH] [SPARK-18413][SQL][FOLLOW-UP] Use `numPartitions` instead of `maxConnections` ## What changes were proposed in this pull request? This is a follow-up PR of #15868 to merge `maxConnections` option into `numPartitions` options. ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #15966 from dongjoon-hyun/SPARK-18413-2. --- docs/sql-programming-guide.md | 24 +++++++++++-------- .../datasources/jdbc/JDBCOptions.scala | 24 +++++++++---------- .../jdbc/JdbcRelationProvider.scala | 6 +++-- .../datasources/jdbc/JdbcUtils.scala | 6 ++--- .../spark/sql/jdbc/JDBCWriteSuite.scala | 6 ++--- 5 files changed, 35 insertions(+), 31 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 656e7ecdab..be53a8d038 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1061,10 +1061,11 @@ the following case-sensitive options: </tr> <tr> - <td><code>partitionColumn, lowerBound, upperBound, numPartitions</code></td> + <td><code>partitionColumn, lowerBound, upperBound</code></td> <td> - These options must all be specified if any of them is specified. They describe how to - partition the table when reading in parallel from multiple workers. + These options must all be specified if any of them is specified. In addition, + <code>numPartitions</code> must be specified. They describe how to partition the table when + reading in parallel from multiple workers. <code>partitionColumn</code> must be a numeric column from the table in question. Notice that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be @@ -1072,6 +1073,16 @@ the following case-sensitive options: </td> </tr> + <tr> + <td><code>numPartitions</code></td> + <td> + The maximum number of partitions that can be used for parallelism in table reading and + writing. This also determines the maximum number of concurrent JDBC connections. + If the number of partitions to write exceeds this limit, we decrease it to this limit by + calling <code>coalesce(numPartitions)</code> before writing. + </td> + </tr> + <tr> <td><code>fetchsize</code></td> <td> @@ -1086,13 +1097,6 @@ the following case-sensitive options: </td> </tr> - <tr> - <td><code>maxConnections</code></td> - <td> - The maximum number of concurrent JDBC connections that can be used, if set. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing. - </td> - </tr> - <tr> <td><code>isolationLevel</code></td> <td> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index d416eec6dd..fe2f4c1d78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -74,19 +74,23 @@ class JDBCOptions( } } + // the number of partitions + val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt) + require(numPartitions.isEmpty || numPartitions.get > 0, + s"Invalid value `${numPartitions.get}` for parameter `$JDBC_NUM_PARTITIONS`. " + + "The minimum value is 1.") + // ------------------------------------------------------------ // Optional parameters only for reading // ------------------------------------------------------------ // the column used to partition - val partitionColumn = parameters.getOrElse(JDBC_PARTITION_COLUMN, null) + val partitionColumn = parameters.get(JDBC_PARTITION_COLUMN) // the lower bound of partition column - val lowerBound = parameters.getOrElse(JDBC_LOWER_BOUND, null) + val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong) // the upper bound of the partition column - val upperBound = parameters.getOrElse(JDBC_UPPER_BOUND, null) - // the number of partitions - val numPartitions = parameters.getOrElse(JDBC_NUM_PARTITIONS, null) - require(partitionColumn == null || - (lowerBound != null && upperBound != null && numPartitions != null), + val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong) + require(partitionColumn.isEmpty || + (lowerBound.isDefined && upperBound.isDefined && numPartitions.isDefined), s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," + s" and '$JDBC_NUM_PARTITIONS' are required.") val fetchSize = { @@ -122,11 +126,6 @@ class JDBCOptions( case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE } - // the maximum number of connections - val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS).map(_.toInt) - require(maxConnections.isEmpty || maxConnections.get > 0, - s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " + - "The minimum value is 1.") } object JDBCOptions { @@ -149,5 +148,4 @@ object JDBCOptions { val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions") val JDBC_BATCH_INSERT_SIZE = newOption("batchsize") val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") - val JDBC_MAX_CONNECTIONS = newOption("maxConnections") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 4420b3b18a..74f397c01e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -35,11 +35,13 @@ class JdbcRelationProvider extends CreatableRelationProvider val upperBound = jdbcOptions.upperBound val numPartitions = jdbcOptions.numPartitions - val partitionInfo = if (partitionColumn == null) { + val partitionInfo = if (partitionColumn.isEmpty) { + assert(lowerBound.isEmpty && upperBound.isEmpty) null } else { + assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty) JDBCPartitioningInfo( - partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) + partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get) } val parts = JDBCRelation.columnPartition(partitionInfo) JDBCRelation(parts, jdbcOptions)(sqlContext.sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index cdc3c99daa..c2a1ad84b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -667,10 +667,10 @@ object JdbcUtils extends Logging { val getConnection: () => Connection = createConnectionFactory(options) val batchSize = options.batchSize val isolationLevel = options.isolationLevel - val maxConnections = options.maxConnections + val numPartitions = options.numPartitions val repartitionedDF = - if (maxConnections.isDefined && maxConnections.get < df.rdd.getNumPartitions) { - df.coalesce(maxConnections.get) + if (numPartitions.isDefined && numPartitions.get < df.rdd.getNumPartitions) { + df.coalesce(numPartitions.get) } else { df } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 5795b4d860..c834419948 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -313,15 +313,15 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { .save() } - test("SPARK-18413: Add `maxConnections` JDBCOption") { + test("SPARK-18413: Use `numPartitions` JDBCOption") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) val e = intercept[IllegalArgumentException] { df.write.format("jdbc") .option("dbtable", "TEST.SAVETEST") .option("url", url1) - .option(s"${JDBCOptions.JDBC_MAX_CONNECTIONS}", "0") + .option(s"${JDBCOptions.JDBC_NUM_PARTITIONS}", "0") .save() }.getMessage - assert(e.contains("Invalid value `0` for parameter `maxConnections`. The minimum value is 1")) + assert(e.contains("Invalid value `0` for parameter `numPartitions`. The minimum value is 1")) } } -- GitLab