Skip to content
Snippets Groups Projects
Commit cc1d2dcb authored by Dongjoon Hyun's avatar Dongjoon Hyun Committed by Sean Owen
Browse files

[SPARK-16463][SQL] Support `truncate` option in Overwrite mode for JDBC DataFrameWriter

## What changes were proposed in this pull request?

This PR adds a boolean option, `truncate`, for `SaveMode.Overwrite` of JDBC DataFrameWriter. If this option is `true`, it try to take advantage of `TRUNCATE TABLE` instead of `DROP TABLE`. This is a trivial option, but will provide great **convenience** for BI tool users based on RDBMS tables generated by Spark.

**Goal**
- Without `CREATE/DROP` privilege, we can save dataframe to database. Sometime these are not allowed for security.
- It will preserve the existing table information, so users can add and keep some additional `INDEX` and `CONSTRAINT`s for the table.
- Sometime, `TRUNCATE` is faster than the combination of `DROP/CREATE`.

**Supported DBMS**
The following is `truncate`-option support table. Due to the different behavior of `TRUNCATE TABLE` among DBMSs, it's not always safe to use `TRUNCATE TABLE`. Spark will ignore the `truncate` option for **unknown** and **some** DBMS with **default CASCADING** behavior. Newly added JDBCDialect should implement corresponding function to support `truncate` option additionally.

Spark Dialects | `truncate` OPTION SUPPORT
---------------|-------------------------------
MySQLDialect | O
PostgresDialect | X
DB2Dialect | O
MsSqlServerDialect | O
DerbyDialect | O
OracleDialect | O

**Before (TABLE with INDEX case)**: SparkShell & MySQL CLI are interleaved intentionally.
```scala
scala> val (url, prop)=("jdbc:mysql://localhost:3306/temp?useSSL=false", new java.util.Properties)
scala> prop.setProperty("user","root")
scala> df.write.mode("overwrite").jdbc(url, "table_with_index", prop)
scala> spark.range(10).write.mode("overwrite").jdbc(url, "table_with_index", prop)
mysql> DESC table_with_index;
+-------+------------+------+-----+---------+-------+
| Field | Type       | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+-------+
| id    | bigint(20) | NO   |     | NULL    |       |
+-------+------------+------+-----+---------+-------+
mysql> CREATE UNIQUE INDEX idx_id ON table_with_index(id);
mysql> DESC table_with_index;
+-------+------------+------+-----+---------+-------+
| Field | Type       | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+-------+
| id    | bigint(20) | NO   | PRI | NULL    |       |
+-------+------------+------+-----+---------+-------+
scala> spark.range(10).write.mode("overwrite").jdbc(url, "table_with_index", prop)
mysql> DESC table_with_index;
+-------+------------+------+-----+---------+-------+
| Field | Type       | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+-------+
| id    | bigint(20) | NO   |     | NULL    |       |
+-------+------------+------+-----+---------+-------+
```

**After (TABLE with INDEX case)**
```scala
scala> spark.range(10).write.mode("overwrite").option("truncate", true).jdbc(url, "table_with_index", prop)
mysql> DESC table_with_index;
+-------+------------+------+-----+---------+-------+
| Field | Type       | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+-------+
| id    | bigint(20) | NO   | PRI | NULL    |       |
+-------+------------+------+-----+---------+-------+
```

**Error Handling**
- In case of exceptions, Spark will not retry. Users should turn off the `truncate` option.
- In case of schema change:
  - If one of the column names changes, this will raise exceptions intuitively.
  - If there exists only type difference, this will work like Append mode.

## How was this patch tested?

Pass the Jenkins tests with a updated testcase.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #14086 from dongjoon-hyun/SPARK-16410.
parent d6795c7a
No related branches found
No related tags found
No related merge requests found
Showing with 70 additions and 4 deletions
...@@ -387,6 +387,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ...@@ -387,6 +387,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
* your external database systems. * your external database systems.
* *
* You can set the following JDBC-specific option(s) for storing JDBC:
* <li>`truncate` (default `false`): use `TRUNCATE TABLE` instead of `DROP TABLE`.</li>
*
* In case of failures, users should turn off `truncate` option to use `DROP TABLE` again. Also,
* due to the different behavior of `TRUNCATE TABLE` among DBMS, it's not always safe to use this.
* MySQLDialect, DB2Dialect, MsSqlServerDialect, DerbyDialect, and OracleDialect supports this
* while PostgresDialect and default JDBCDirect doesn't. For unknown and unsupported JDBCDirect,
* the user option `truncate` is ignored.
*
* @param url JDBC database url of the form `jdbc:subprotocol:subname` * @param url JDBC database url of the form `jdbc:subprotocol:subname`
* @param table Name of the table in the external database. * @param table Name of the table in the external database.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
...@@ -423,8 +432,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ...@@ -423,8 +432,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
} }
if (mode == SaveMode.Overwrite && tableExists) { if (mode == SaveMode.Overwrite && tableExists) {
JdbcUtils.dropTable(conn, table) if (extraOptions.getOrElse("truncate", "false").toBoolean &&
tableExists = false JdbcUtils.isCascadingTruncateTable(url) == Some(false)) {
JdbcUtils.truncateTable(conn, table)
} else {
JdbcUtils.dropTable(conn, table)
tableExists = false
}
} }
// Create the table if the table didn't exist. // Create the table if the table didn't exist.
......
...@@ -98,6 +98,22 @@ object JdbcUtils extends Logging { ...@@ -98,6 +98,22 @@ object JdbcUtils extends Logging {
} }
} }
/**
* Truncates a table from the JDBC database.
*/
def truncateTable(conn: Connection, table: String): Unit = {
val statement = conn.createStatement
try {
statement.executeUpdate(s"TRUNCATE TABLE $table")
} finally {
statement.close()
}
}
def isCascadingTruncateTable(url: String): Option[Boolean] = {
JdbcDialects.get(url).isCascadingTruncateTable()
}
/** /**
* Returns a PreparedStatement that inserts a row into table via conn. * Returns a PreparedStatement that inserts a row into table via conn.
*/ */
......
...@@ -28,4 +28,6 @@ private object DB2Dialect extends JdbcDialect { ...@@ -28,4 +28,6 @@ private object DB2Dialect extends JdbcDialect {
case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR)) case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
case _ => None case _ => None
} }
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
} }
...@@ -108,6 +108,13 @@ abstract class JdbcDialect extends Serializable { ...@@ -108,6 +108,13 @@ abstract class JdbcDialect extends Serializable {
def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = { def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = {
} }
/**
* Return Some[true] iff `TRUNCATE TABLE` causes cascading default.
* Some[true] : TRUNCATE TABLE causes cascading.
* Some[false] : TRUNCATE TABLE does not cause cascading.
* None: The behavior of TRUNCATE TABLE is unknown (default).
*/
def isCascadingTruncateTable(): Option[Boolean] = None
} }
/** /**
......
...@@ -38,4 +38,6 @@ private object MsSqlServerDialect extends JdbcDialect { ...@@ -38,4 +38,6 @@ private object MsSqlServerDialect extends JdbcDialect {
case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP)) case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
case _ => None case _ => None
} }
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
} }
...@@ -44,4 +44,6 @@ private case object MySQLDialect extends JdbcDialect { ...@@ -44,4 +44,6 @@ private case object MySQLDialect extends JdbcDialect {
override def getTableExistsQuery(table: String): String = { override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1" s"SELECT 1 FROM $table LIMIT 1"
} }
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
} }
...@@ -53,4 +53,6 @@ private case object OracleDialect extends JdbcDialect { ...@@ -53,4 +53,6 @@ private case object OracleDialect extends JdbcDialect {
case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
case _ => None case _ => None
} }
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
} }
...@@ -94,4 +94,6 @@ private object PostgresDialect extends JdbcDialect { ...@@ -94,4 +94,6 @@ private object PostgresDialect extends JdbcDialect {
} }
} }
override def isCascadingTruncateTable(): Option[Boolean] = Some(true)
} }
...@@ -40,6 +40,14 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { ...@@ -40,6 +40,14 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
properties.setProperty("password", "testPass") properties.setProperty("password", "testPass")
properties.setProperty("rowId", "false") properties.setProperty("rowId", "false")
val testH2Dialect = new JdbcDialect {
override def canHandle(url: String) : Boolean = url.startsWith("jdbc:h2")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] =
Some(StringType)
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
before { before {
Utils.classForName("org.h2.Driver") Utils.classForName("org.h2.Driver")
conn = DriverManager.getConnection(url) conn = DriverManager.getConnection(url)
...@@ -145,14 +153,25 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { ...@@ -145,14 +153,25 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
assert(2 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties()).collect()(0).length) assert(2 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties()).collect()(0).length)
} }
test("CREATE then INSERT to truncate") { test("Truncate") {
JdbcDialects.registerDialect(testH2Dialect)
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
val df3 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
df.write.jdbc(url1, "TEST.TRUNCATETEST", properties) df.write.jdbc(url1, "TEST.TRUNCATETEST", properties)
df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.TRUNCATETEST", properties) df2.write.mode(SaveMode.Overwrite).option("truncate", true)
.jdbc(url1, "TEST.TRUNCATETEST", properties)
assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count())
assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
val m = intercept[SparkException] {
df3.write.mode(SaveMode.Overwrite).option("truncate", true)
.jdbc(url1, "TEST.TRUNCATETEST", properties)
}.getMessage
assert(m.contains("Column \"seq\" not found"))
assert(0 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count())
JdbcDialects.unregisterDialect(testH2Dialect)
} }
test("Incompatible INSERT to append") { test("Incompatible INSERT to append") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment