From 4cb025afafe63d5871356d9dc38d58c1df0da996 Mon Sep 17 00:00:00 2001 From: Xiao Li <gatorsmile@gmail.com> Date: Fri, 24 Feb 2017 23:03:59 -0800 Subject: [PATCH] [SPARK-19735][SQL] Remove HOLD_DDLTIME from Catalog APIs ### What changes were proposed in this pull request? As explained in Hive JIRA https://issues.apache.org/jira/browse/HIVE-12224, HOLD_DDLTIME was broken as soon as it landed. Hive 2.0 removes HOLD_DDLTIME from the API. In Spark SQL, we always set it to FALSE. Like Hive, we should also remove it from our Catalog APIs. ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #17063 from gatorsmile/removalHoldDDLTime. --- .../catalyst/catalog/ExternalCatalog.scala | 5 +--- .../catalyst/catalog/InMemoryCatalog.scala | 5 +--- .../sql/catalyst/catalog/SessionCatalog.scala | 6 ++--- .../spark/sql/execution/command/tables.scala | 2 -- .../spark/sql/hive/HiveExternalCatalog.scala | 10 ++------ .../spark/sql/hive/client/HiveClient.scala | 5 +--- .../sql/hive/client/HiveClientImpl.scala | 8 +------ .../spark/sql/hive/client/HiveShim.scala | 24 ++++++------------- .../hive/execution/InsertIntoHiveTable.scala | 9 +------ .../spark/sql/hive/client/VersionsSuite.scala | 5 +--- 10 files changed, 17 insertions(+), 62 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 5233699fac..a3a4ab37ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -125,7 +125,6 @@ abstract class ExternalCatalog { table: String, loadPath: String, isOverwrite: Boolean, - holdDDLTime: Boolean, isSrcLocal: Boolean): Unit /** @@ -140,7 +139,6 @@ abstract class ExternalCatalog { loadPath: String, partition: TablePartitionSpec, isOverwrite: Boolean, - holdDDLTime: Boolean, inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit @@ -150,8 +148,7 @@ abstract class ExternalCatalog { loadPath: String, partition: TablePartitionSpec, replace: Boolean, - numDP: Int, - holdDDLTime: Boolean): Unit + numDP: Int): Unit // -------------------------------------------------------------------------- // Partitions diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 15aed5f9b1..6bb2b2d4ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -325,7 +325,6 @@ class InMemoryCatalog( table: String, loadPath: String, isOverwrite: Boolean, - holdDDLTime: Boolean, isSrcLocal: Boolean): Unit = { throw new UnsupportedOperationException("loadTable is not implemented") } @@ -336,7 +335,6 @@ class InMemoryCatalog( loadPath: String, partition: TablePartitionSpec, isOverwrite: Boolean, - holdDDLTime: Boolean, inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit = { throw new UnsupportedOperationException("loadPartition is not implemented.") @@ -348,8 +346,7 @@ class InMemoryCatalog( loadPath: String, partition: TablePartitionSpec, replace: Boolean, - numDP: Int, - holdDDLTime: Boolean): Unit = { + numDP: Int): Unit = { throw new UnsupportedOperationException("loadDynamicPartitions is not implemented.") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 73ef0e6a18..0230626a66 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -322,13 +322,12 @@ class SessionCatalog( name: TableIdentifier, loadPath: String, isOverwrite: Boolean, - holdDDLTime: Boolean, isSrcLocal: Boolean): Unit = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Some(db))) - externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime, isSrcLocal) + externalCatalog.loadTable(db, table, loadPath, isOverwrite, isSrcLocal) } /** @@ -341,7 +340,6 @@ class SessionCatalog( loadPath: String, spec: TablePartitionSpec, isOverwrite: Boolean, - holdDDLTime: Boolean, inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) @@ -350,7 +348,7 @@ class SessionCatalog( requireTableExists(TableIdentifier(table, Some(db))) requireNonEmptyValueInPartitionSpec(Seq(spec)) externalCatalog.loadPartition( - db, table, loadPath, spec, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal) + db, table, loadPath, spec, isOverwrite, inheritTableSpecs, isSrcLocal) } def defaultTablePath(tableIdent: TableIdentifier): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index d646a215c3..49407b44d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -312,7 +312,6 @@ case class LoadDataCommand( loadPath.toString, partition.get, isOverwrite, - holdDDLTime = false, inheritTableSpecs = true, isSrcLocal = isLocal) } else { @@ -320,7 +319,6 @@ case class LoadDataCommand( targetTable.identifier, loadPath.toString, isOverwrite, - holdDDLTime = false, isSrcLocal = isLocal) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ea48256147..50bb44f7d4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -736,14 +736,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, loadPath: String, isOverwrite: Boolean, - holdDDLTime: Boolean, isSrcLocal: Boolean): Unit = withClient { requireTableExists(db, table) client.loadTable( loadPath, s"$db.$table", isOverwrite, - holdDDLTime, isSrcLocal) } @@ -753,7 +751,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat loadPath: String, partition: TablePartitionSpec, isOverwrite: Boolean, - holdDDLTime: Boolean, inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit = withClient { requireTableExists(db, table) @@ -773,7 +770,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table, orderedPartitionSpec, isOverwrite, - holdDDLTime, inheritTableSpecs, isSrcLocal) } @@ -784,8 +780,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat loadPath: String, partition: TablePartitionSpec, replace: Boolean, - numDP: Int, - holdDDLTime: Boolean): Unit = withClient { + numDP: Int): Unit = withClient { requireTableExists(db, table) val orderedPartitionSpec = new util.LinkedHashMap[String, String]() @@ -803,8 +798,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table, orderedPartitionSpec, replace, - numDP, - holdDDLTime) + numDP) } // -------------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 8bdcf3111d..16a80f9fff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -208,7 +208,6 @@ private[hive] trait HiveClient { tableName: String, partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering replace: Boolean, - holdDDLTime: Boolean, inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit @@ -217,7 +216,6 @@ private[hive] trait HiveClient { loadPath: String, // TODO URI tableName: String, replace: Boolean, - holdDDLTime: Boolean, isSrcLocal: Boolean): Unit /** Loads new dynamic partitions into an existing table. */ @@ -227,8 +225,7 @@ private[hive] trait HiveClient { tableName: String, partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering replace: Boolean, - numDP: Int, - holdDDLTime: Boolean): Unit + numDP: Int): Unit /** Create a function in an existing database. */ def createFunction(db: String, func: CatalogFunction): Unit diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 24dfd33bc3..c326ac4cc1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -664,7 +664,6 @@ private[hive] class HiveClientImpl( tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, - holdDDLTime: Boolean, inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit = withHiveState { val hiveTable = client.getTable(dbName, tableName, true /* throw exception */) @@ -674,7 +673,6 @@ private[hive] class HiveClientImpl( s"$dbName.$tableName", partSpec, replace, - holdDDLTime, inheritTableSpecs, isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories, isSrcLocal = isSrcLocal) @@ -684,14 +682,12 @@ private[hive] class HiveClientImpl( loadPath: String, // TODO URI tableName: String, replace: Boolean, - holdDDLTime: Boolean, isSrcLocal: Boolean): Unit = withHiveState { shim.loadTable( client, new Path(loadPath), tableName, replace, - holdDDLTime, isSrcLocal) } @@ -701,8 +697,7 @@ private[hive] class HiveClientImpl( tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, - numDP: Int, - holdDDLTime: Boolean): Unit = withHiveState { + numDP: Int): Unit = withHiveState { val hiveTable = client.getTable(dbName, tableName, true /* throw exception */) shim.loadDynamicPartitions( client, @@ -711,7 +706,6 @@ private[hive] class HiveClientImpl( partSpec, replace, numDP, - holdDDLTime, listBucketingEnabled = hiveTable.isStoredAsSubDirectories) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index b052f1e7e4..9fe1c76d33 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -96,7 +96,6 @@ private[client] sealed abstract class Shim { tableName: String, partSpec: JMap[String, String], replace: Boolean, - holdDDLTime: Boolean, inheritTableSpecs: Boolean, isSkewedStoreAsSubdir: Boolean, isSrcLocal: Boolean): Unit @@ -106,7 +105,6 @@ private[client] sealed abstract class Shim { loadPath: Path, tableName: String, replace: Boolean, - holdDDLTime: Boolean, isSrcLocal: Boolean): Unit def loadDynamicPartitions( @@ -116,7 +114,6 @@ private[client] sealed abstract class Shim { partSpec: JMap[String, String], replace: Boolean, numDP: Int, - holdDDLTime: Boolean, listBucketingEnabled: Boolean): Unit def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit @@ -332,12 +329,11 @@ private[client] class Shim_v0_12 extends Shim with Logging { tableName: String, partSpec: JMap[String, String], replace: Boolean, - holdDDLTime: Boolean, inheritTableSpecs: Boolean, isSkewedStoreAsSubdir: Boolean, isSrcLocal: Boolean): Unit = { loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, - holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean) + JBoolean.FALSE, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean) } override def loadTable( @@ -345,9 +341,8 @@ private[client] class Shim_v0_12 extends Shim with Logging { loadPath: Path, tableName: String, replace: Boolean, - holdDDLTime: Boolean, isSrcLocal: Boolean): Unit = { - loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean) + loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, JBoolean.FALSE) } override def loadDynamicPartitions( @@ -357,10 +352,9 @@ private[client] class Shim_v0_12 extends Shim with Logging { partSpec: JMap[String, String], replace: Boolean, numDP: Int, - holdDDLTime: Boolean, listBucketingEnabled: Boolean): Unit = { loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, - numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean) + numDP: JInteger, JBoolean.FALSE, listBucketingEnabled: JBoolean) } override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = { @@ -703,12 +697,11 @@ private[client] class Shim_v0_14 extends Shim_v0_13 { tableName: String, partSpec: JMap[String, String], replace: Boolean, - holdDDLTime: Boolean, inheritTableSpecs: Boolean, isSkewedStoreAsSubdir: Boolean, isSrcLocal: Boolean): Unit = { loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, - holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean, + JBoolean.FALSE, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean, isSrcLocal: JBoolean, JBoolean.FALSE) } @@ -717,9 +710,8 @@ private[client] class Shim_v0_14 extends Shim_v0_13 { loadPath: Path, tableName: String, replace: Boolean, - holdDDLTime: Boolean, isSrcLocal: Boolean): Unit = { - loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean, + loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, JBoolean.FALSE, isSrcLocal: JBoolean, JBoolean.FALSE, JBoolean.FALSE) } @@ -730,10 +722,9 @@ private[client] class Shim_v0_14 extends Shim_v0_13 { partSpec: JMap[String, String], replace: Boolean, numDP: Int, - holdDDLTime: Boolean, listBucketingEnabled: Boolean): Unit = { loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, - numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE) + numDP: JInteger, JBoolean.FALSE, listBucketingEnabled: JBoolean, JBoolean.FALSE) } override def dropTable( @@ -818,10 +809,9 @@ private[client] class Shim_v1_2 extends Shim_v1_1 { partSpec: JMap[String, String], replace: Boolean, numDP: Int, - holdDDLTime: Boolean, listBucketingEnabled: Boolean): Unit = { loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, - numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE, + numDP: JInteger, JBoolean.FALSE, listBucketingEnabled: JBoolean, JBoolean.FALSE, 0L: JLong) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3e654d8eeb..5d5688ecb3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -301,10 +301,6 @@ case class InsertIntoHiveTable( refreshFunction = _ => (), options = Map.empty) - // TODO: Correctly set holdDDLTime. - // In most of the time, we should have holdDDLTime = false. - // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. - val holdDDLTime = false if (partition.nonEmpty) { if (numDynamicPartitions > 0) { externalCatalog.loadDynamicPartitions( @@ -313,8 +309,7 @@ case class InsertIntoHiveTable( tmpLocation.toString, partitionSpec, overwrite, - numDynamicPartitions, - holdDDLTime = holdDDLTime) + numDynamicPartitions) } else { // scalastyle:off // ifNotExists is only valid with static partition, refer to @@ -357,7 +352,6 @@ case class InsertIntoHiveTable( tmpLocation.toString, partitionSpec, isOverwrite = doHiveOverwrite, - holdDDLTime = holdDDLTime, inheritTableSpecs = inheritTableSpecs, isSrcLocal = false) } @@ -368,7 +362,6 @@ case class InsertIntoHiveTable( table.catalogTable.identifier.table, tmpLocation.toString, // TODO: URI overwrite, - holdDDLTime, isSrcLocal = false) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index fe14824cf0..6feb277ca8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -175,7 +175,6 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w emptyDir, tableName = "src", replace = false, - holdDDLTime = false, isSrcLocal = false) } @@ -313,7 +312,6 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w "src_part", partSpec, replace = false, - holdDDLTime = false, inheritTableSpecs = false, isSrcLocal = false) } @@ -329,8 +327,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w "src_part", partSpec, replace = false, - numDP = 1, - holdDDLTime = false) + numDP = 1) } test(s"$version: renamePartitions") { -- GitLab