From cc1d2dcb612fb5df39c9a9e57a3484ecad90c745 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun <dongjoon@apache.org> Date: Sun, 24 Jul 2016 09:25:02 +0100 Subject: [PATCH] [SPARK-16463][SQL] Support `truncate` option in Overwrite mode for JDBC DataFrameWriter ## What changes were proposed in this pull request? This PR adds a boolean option, `truncate`, for `SaveMode.Overwrite` of JDBC DataFrameWriter. If this option is `true`, it try to take advantage of `TRUNCATE TABLE` instead of `DROP TABLE`. This is a trivial option, but will provide great **convenience** for BI tool users based on RDBMS tables generated by Spark. **Goal** - Without `CREATE/DROP` privilege, we can save dataframe to database. Sometime these are not allowed for security. - It will preserve the existing table information, so users can add and keep some additional `INDEX` and `CONSTRAINT`s for the table. - Sometime, `TRUNCATE` is faster than the combination of `DROP/CREATE`. **Supported DBMS** The following is `truncate`-option support table. Due to the different behavior of `TRUNCATE TABLE` among DBMSs, it's not always safe to use `TRUNCATE TABLE`. Spark will ignore the `truncate` option for **unknown** and **some** DBMS with **default CASCADING** behavior. Newly added JDBCDialect should implement corresponding function to support `truncate` option additionally. Spark Dialects | `truncate` OPTION SUPPORT ---------------|------------------------------- MySQLDialect | O PostgresDialect | X DB2Dialect | O MsSqlServerDialect | O DerbyDialect | O OracleDialect | O **Before (TABLE with INDEX case)**: SparkShell & MySQL CLI are interleaved intentionally. ```scala scala> val (url, prop)=("jdbc:mysql://localhost:3306/temp?useSSL=false", new java.util.Properties) scala> prop.setProperty("user","root") scala> df.write.mode("overwrite").jdbc(url, "table_with_index", prop) scala> spark.range(10).write.mode("overwrite").jdbc(url, "table_with_index", prop) mysql> DESC table_with_index; +-------+------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------+------------+------+-----+---------+-------+ | id | bigint(20) | NO | | NULL | | +-------+------------+------+-----+---------+-------+ mysql> CREATE UNIQUE INDEX idx_id ON table_with_index(id); mysql> DESC table_with_index; +-------+------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------+------------+------+-----+---------+-------+ | id | bigint(20) | NO | PRI | NULL | | +-------+------------+------+-----+---------+-------+ scala> spark.range(10).write.mode("overwrite").jdbc(url, "table_with_index", prop) mysql> DESC table_with_index; +-------+------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------+------------+------+-----+---------+-------+ | id | bigint(20) | NO | | NULL | | +-------+------------+------+-----+---------+-------+ ``` **After (TABLE with INDEX case)** ```scala scala> spark.range(10).write.mode("overwrite").option("truncate", true).jdbc(url, "table_with_index", prop) mysql> DESC table_with_index; +-------+------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------+------------+------+-----+---------+-------+ | id | bigint(20) | NO | PRI | NULL | | +-------+------------+------+-----+---------+-------+ ``` **Error Handling** - In case of exceptions, Spark will not retry. Users should turn off the `truncate` option. - In case of schema change: - If one of the column names changes, this will raise exceptions intuitively. - If there exists only type difference, this will work like Append mode. ## How was this patch tested? Pass the Jenkins tests with a updated testcase. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #14086 from dongjoon-hyun/SPARK-16410. --- .../apache/spark/sql/DataFrameWriter.scala | 18 +++++++++++++-- .../datasources/jdbc/JdbcUtils.scala | 16 +++++++++++++ .../apache/spark/sql/jdbc/DB2Dialect.scala | 2 ++ .../apache/spark/sql/jdbc/JdbcDialects.scala | 7 ++++++ .../spark/sql/jdbc/MsSqlServerDialect.scala | 2 ++ .../apache/spark/sql/jdbc/MySQLDialect.scala | 2 ++ .../apache/spark/sql/jdbc/OracleDialect.scala | 2 ++ .../spark/sql/jdbc/PostgresDialect.scala | 2 ++ .../spark/sql/jdbc/JDBCWriteSuite.scala | 23 +++++++++++++++++-- 9 files changed, 70 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e6a8dfac0a..753b64b983 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -387,6 +387,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash * your external database systems. * + * You can set the following JDBC-specific option(s) for storing JDBC: + * <li>`truncate` (default `false`): use `TRUNCATE TABLE` instead of `DROP TABLE`.</li> + * + * In case of failures, users should turn off `truncate` option to use `DROP TABLE` again. Also, + * due to the different behavior of `TRUNCATE TABLE` among DBMS, it's not always safe to use this. + * MySQLDialect, DB2Dialect, MsSqlServerDialect, DerbyDialect, and OracleDialect supports this + * while PostgresDialect and default JDBCDirect doesn't. For unknown and unsupported JDBCDirect, + * the user option `truncate` is ignored. + * * @param url JDBC database url of the form `jdbc:subprotocol:subname` * @param table Name of the table in the external database. * @param connectionProperties JDBC database connection arguments, a list of arbitrary string @@ -423,8 +432,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } if (mode == SaveMode.Overwrite && tableExists) { - JdbcUtils.dropTable(conn, table) - tableExists = false + if (extraOptions.getOrElse("truncate", "false").toBoolean && + JdbcUtils.isCascadingTruncateTable(url) == Some(false)) { + JdbcUtils.truncateTable(conn, table) + } else { + JdbcUtils.dropTable(conn, table) + tableExists = false + } } // Create the table if the table didn't exist. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index ce71a7d1e6..cb474cbd0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -98,6 +98,22 @@ object JdbcUtils extends Logging { } } + /** + * Truncates a table from the JDBC database. + */ + def truncateTable(conn: Connection, table: String): Unit = { + val statement = conn.createStatement + try { + statement.executeUpdate(s"TRUNCATE TABLE $table") + } finally { + statement.close() + } + } + + def isCascadingTruncateTable(url: String): Option[Boolean] = { + JdbcDialects.get(url).isCascadingTruncateTable() + } + /** * Returns a PreparedStatement that inserts a row into table via conn. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index f12b6ca9d6..190463df0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -28,4 +28,6 @@ private object DB2Dialect extends JdbcDialect { case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR)) case _ => None } + + override def isCascadingTruncateTable(): Option[Boolean] = Some(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 948106fd06..78107809a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -108,6 +108,13 @@ abstract class JdbcDialect extends Serializable { def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = { } + /** + * Return Some[true] iff `TRUNCATE TABLE` causes cascading default. + * Some[true] : TRUNCATE TABLE causes cascading. + * Some[false] : TRUNCATE TABLE does not cause cascading. + * None: The behavior of TRUNCATE TABLE is unknown (default). + */ + def isCascadingTruncateTable(): Option[Boolean] = None } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 3eb722b070..70122f2599 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -38,4 +38,6 @@ private object MsSqlServerDialect extends JdbcDialect { case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP)) case _ => None } + + override def isCascadingTruncateTable(): Option[Boolean] = Some(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index e1717049f3..b2cff7877d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -44,4 +44,6 @@ private case object MySQLDialect extends JdbcDialect { override def getTableExistsQuery(table: String): String = { s"SELECT 1 FROM $table LIMIT 1" } + + override def isCascadingTruncateTable(): Option[Boolean] = Some(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index b795e8b42d..ce8731efd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -53,4 +53,6 @@ private case object OracleDialect extends JdbcDialect { case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) case _ => None } + + override def isCascadingTruncateTable(): Option[Boolean] = Some(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 6baf1b6f16..fb959d881e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -94,4 +94,6 @@ private object PostgresDialect extends JdbcDialect { } } + + override def isCascadingTruncateTable(): Option[Boolean] = Some(true) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 2c6449fa68..d99b3cf975 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -40,6 +40,14 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { properties.setProperty("password", "testPass") properties.setProperty("rowId", "false") + val testH2Dialect = new JdbcDialect { + override def canHandle(url: String) : Boolean = url.startsWith("jdbc:h2") + override def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = + Some(StringType) + override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + } + before { Utils.classForName("org.h2.Driver") conn = DriverManager.getConnection(url) @@ -145,14 +153,25 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties()).collect()(0).length) } - test("CREATE then INSERT to truncate") { + test("Truncate") { + JdbcDialects.registerDialect(testH2Dialect) val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) + val df3 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3) df.write.jdbc(url1, "TEST.TRUNCATETEST", properties) - df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.TRUNCATETEST", properties) + df2.write.mode(SaveMode.Overwrite).option("truncate", true) + .jdbc(url1, "TEST.TRUNCATETEST", properties) assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) + + val m = intercept[SparkException] { + df3.write.mode(SaveMode.Overwrite).option("truncate", true) + .jdbc(url1, "TEST.TRUNCATETEST", properties) + }.getMessage + assert(m.contains("Column \"seq\" not found")) + assert(0 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) + JdbcDialects.unregisterDialect(testH2Dialect) } test("Incompatible INSERT to append") { -- GitLab