diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 75bf780d41424796328c9e3bf9c46c04ac5111cd..ed423e7e334b6ab751ae01314c09da0a572ef52b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -366,7 +366,7 @@ package object dsl { def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan = InsertIntoTable( analysis.UnresolvedRelation(TableIdentifier(tableName)), - Map.empty, logicalPlan, overwrite, false) + Map.empty, logicalPlan, overwrite, ifPartitionNotExists = false) def as(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 7a54995453797f87af628e3b4234de1ef70d2c84..d291ca0020838843a57943641e62da08c65fb82d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -410,17 +410,20 @@ case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) exten * would have Map('a' -> Some('1'), 'b' -> None). * @param query the logical plan representing data to write to. * @param overwrite overwrite existing table or partitions. - * @param ifNotExists If true, only write if the table or partition does not exist. + * @param ifPartitionNotExists If true, only write if the partition does not exist. + * Only valid for static partitions. */ case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, - ifNotExists: Boolean) + ifPartitionNotExists: Boolean) extends LogicalPlan { - assert(overwrite || !ifNotExists) - assert(partition.values.forall(_.nonEmpty) || !ifNotExists) + // IF NOT EXISTS is only valid in INSERT OVERWRITE + assert(overwrite || !ifPartitionNotExists) + // IF NOT EXISTS is only valid in static partitions + assert(partition.values.forall(_.nonEmpty) || !ifPartitionNotExists) // We don't want `table` in children as sometimes we don't want to transform it. override def children: Seq[LogicalPlan] = query :: Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b97adf7221d1865df1e1a4cac2069c25e070176b..c5d69c204642ede6e840a60e650a1708e9f29aea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -303,7 +303,7 @@ object SQLConf { val HIVE_MANAGE_FILESOURCE_PARTITIONS = buildConf("spark.sql.hive.manageFilesourcePartitions") .doc("When true, enable metastore partition management for file source tables as well. " + - "This includes both datasource and converted Hive tables. When partition managment " + + "This includes both datasource and converted Hive tables. When partition management " + "is enabled, datasource tables store partition in the Hive metastore, and use the " + "metastore to prune partitions during query planning.") .booleanConf diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index cca0291b3d5afb06130d7766cf504ad803df7a88..d78741d032f38356c4569b1d1aaa0bf73b9e03bd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -176,14 +176,14 @@ class PlanParserSuite extends PlanTest { def insert( partition: Map[String, Option[String]], overwrite: Boolean = false, - ifNotExists: Boolean = false): LogicalPlan = - InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists) + ifPartitionNotExists: Boolean = false): LogicalPlan = + InsertIntoTable(table("s"), partition, plan, overwrite, ifPartitionNotExists) // Single inserts assertEqual(s"insert overwrite table s $sql", insert(Map.empty, overwrite = true)) assertEqual(s"insert overwrite table s partition (e = 1) if not exists $sql", - insert(Map("e" -> Option("1")), overwrite = true, ifNotExists = true)) + insert(Map("e" -> Option("1")), overwrite = true, ifPartitionNotExists = true)) assertEqual(s"insert into s $sql", insert(Map.empty)) assertEqual(s"insert into table s partition (c = 'd', e = 1) $sql", @@ -193,9 +193,9 @@ class PlanParserSuite extends PlanTest { val plan2 = table("t").where('x > 5).select(star()) assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5", InsertIntoTable( - table("s"), Map.empty, plan.limit(1), false, ifNotExists = false).union( + table("s"), Map.empty, plan.limit(1), false, ifPartitionNotExists = false).union( InsertIntoTable( - table("u"), Map.empty, plan2, false, ifNotExists = false))) + table("u"), Map.empty, plan2, false, ifPartitionNotExists = false))) } test ("insert with if not exists") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1732a8e08b73f0ecfc87e14ec66fede8719f67e3..b71c5eb843eec6a61fcc3f82fe4921d97bce8f50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -286,7 +286,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { partition = Map.empty[String, Option[String]], query = df.logicalPlan, overwrite = mode == SaveMode.Overwrite, - ifNotExists = false) + ifPartitionNotExists = false) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index bb7d1f70b62d94259cc2aa8f4ef616422cc7b17e..14c40605ea31c01f16aa917fa4a13eaddb4d92fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -430,6 +430,7 @@ case class DataSource( InsertIntoHadoopFsRelationCommand( outputPath = outputPath, staticPartitions = Map.empty, + ifPartitionNotExists = false, partitionColumns = partitionAttributes, bucketSpec = bucketSpec, fileFormat = format, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index d307122b5c70d5b74224dfb79a1f14d441db5cbd..21d75a404911b56ab246df1db51485a9c93c5f2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -142,8 +142,8 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast parts, query, overwrite, false) if parts.isEmpty => InsertIntoDataSourceCommand(l, query, overwrite) - case InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) => + case i @ InsertIntoTable( + l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, _) => // If the InsertIntoTable command is for a partitioned HadoopFsRelation and // the user has specified static partitions, we add a Project operator on top of the query // to include those constant column values in the query result. @@ -195,6 +195,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast InsertIntoHadoopFsRelationCommand( outputPath, staticPartitions, + i.ifPartitionNotExists, partitionSchema, t.bucketSpec, t.fileFormat, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 19b51d4d9530a560912eb0543820febb7c0e83b8..c9d31449d362944b64ffaf17e0a010e2ae238edc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -37,10 +37,13 @@ import org.apache.spark.sql.execution.command._ * overwrites: when the spec is empty, all partitions are overwritten. * When it covers a prefix of the partition keys, only partitions matching * the prefix are overwritten. + * @param ifPartitionNotExists If true, only write if the partition does not exist. + * Only valid for static partitions. */ case class InsertIntoHadoopFsRelationCommand( outputPath: Path, staticPartitions: TablePartitionSpec, + ifPartitionNotExists: Boolean, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], fileFormat: FileFormat, @@ -61,8 +64,8 @@ case class InsertIntoHadoopFsRelationCommand( val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect { case (x, ys) if ys.length > 1 => "\"" + x + "\"" }.mkString(", ") - throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + - s"cannot save to file.") + throw new AnalysisException(s"Duplicate column(s): $duplicateColumns found, " + + "cannot save to file.") } val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) @@ -76,11 +79,12 @@ case class InsertIntoHadoopFsRelationCommand( var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty + var matchingPartitions: Seq[CatalogTablePartition] = Seq.empty // When partitions are tracked by the catalog, compute all custom partition locations that // may be relevant to the insertion job. if (partitionsTrackedByCatalog) { - val matchingPartitions = sparkSession.sessionState.catalog.listPartitions( + matchingPartitions = sparkSession.sessionState.catalog.listPartitions( catalogTable.get.identifier, Some(staticPartitions)) initialMatchingPartitions = matchingPartitions.map(_.spec) customPartitionLocations = getCustomPartitionLocations( @@ -101,8 +105,12 @@ case class InsertIntoHadoopFsRelationCommand( case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(s"path $qualifiedOutputPath already exists.") case (SaveMode.Overwrite, true) => - deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) - true + if (ifPartitionNotExists && matchingPartitions.nonEmpty) { + false + } else { + deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) + true + } case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => true case (SaveMode.Ignore, exists) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 09a5eda6e543fba410e20900028f5116a159baf7..4f090d545cd189706cf5e22e1f600b9d4ab3da13 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -160,9 +160,9 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { */ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case InsertIntoTable(relation: CatalogRelation, partSpec, query, overwrite, ifNotExists) - if DDLUtils.isHiveTable(relation.tableMeta) => - InsertIntoHiveTable(relation.tableMeta, partSpec, query, overwrite, ifNotExists) + case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, ifPartitionNotExists) + if DDLUtils.isHiveTable(r.tableMeta) => + InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) @@ -207,11 +207,11 @@ case class RelationConversions( override def apply(plan: LogicalPlan): LogicalPlan = { plan transformUp { // Write path - case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists) + case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). - if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && - !r.isPartitioned && isConvertible(r) => - InsertIntoTable(convert(r), partition, query, overwrite, ifNotExists) + if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && + !r.isPartitioned && isConvertible(r) => + InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) // Read path case relation: CatalogRelation diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 41c6b18e9d79481352adb877e9ba53099fa3f863..65e8b4e3c725ca0e078a821d509f38f02f847ef5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -62,7 +62,7 @@ case class CreateHiveTableAsSelectCommand( Map(), query, overwrite = false, - ifNotExists = false)).toRdd + ifPartitionNotExists = false)).toRdd } else { // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data @@ -78,7 +78,7 @@ case class CreateHiveTableAsSelectCommand( Map(), query, overwrite = true, - ifNotExists = false)).toRdd + ifPartitionNotExists = false)).toRdd } catch { case NonFatal(e) => // drop the created table. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 10e17c5f734334971aeaaec5778d191eb07789c0..10ce8e3730a0d29cc1b3bec4fb0f127af213b32b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -71,14 +71,15 @@ import org.apache.spark.SparkException * }}}. * @param query the logical plan representing data to write to. * @param overwrite overwrite existing table or partitions. - * @param ifNotExists If true, only write if the table or partition does not exist. + * @param ifPartitionNotExists If true, only write if the partition does not exist. + * Only valid for static partitions. */ case class InsertIntoHiveTable( table: CatalogTable, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, - ifNotExists: Boolean) extends RunnableCommand { + ifPartitionNotExists: Boolean) extends RunnableCommand { override protected def innerChildren: Seq[LogicalPlan] = query :: Nil @@ -375,7 +376,7 @@ case class InsertIntoHiveTable( var doHiveOverwrite = overwrite - if (oldPart.isEmpty || !ifNotExists) { + if (oldPart.isEmpty || !ifPartitionNotExists) { // SPARK-18107: Insert overwrite runs much slower than hive-client. // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive // version and we may not want to catch up new Hive version every time. We delete the diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 7bd3973550043fbaf9f3a53269654a4e9128a654..cc80f2e481cbf3ed7d752bb62ea22ebd72c831bf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -166,72 +166,54 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef sql("DROP TABLE tmp_table") } - test("INSERT OVERWRITE - partition IF NOT EXISTS") { - withTempDir { tmpDir => - val table = "table_with_partition" - withTable(table) { - val selQuery = s"select c1, p1, p2 from $table" - sql( - s""" - |CREATE TABLE $table(c1 string) - |PARTITIONED by (p1 string,p2 string) - |location '${tmpDir.toURI.toString}' - """.stripMargin) - sql( - s""" - |INSERT OVERWRITE TABLE $table - |partition (p1='a',p2='b') - |SELECT 'blarr' - """.stripMargin) - checkAnswer( - sql(selQuery), - Row("blarr", "a", "b")) - - sql( - s""" - |INSERT OVERWRITE TABLE $table - |partition (p1='a',p2='b') - |SELECT 'blarr2' - """.stripMargin) - checkAnswer( - sql(selQuery), - Row("blarr2", "a", "b")) + testPartitionedTable("INSERT OVERWRITE - partition IF NOT EXISTS") { tableName => + val selQuery = s"select a, b, c, d from $tableName" + sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |partition (b=2, c=3) + |SELECT 1, 4 + """.stripMargin) + checkAnswer(sql(selQuery), Row(1, 2, 3, 4)) - var e = intercept[AnalysisException] { - sql( - s""" - |INSERT OVERWRITE TABLE $table - |partition (p1='a',p2) IF NOT EXISTS - |SELECT 'blarr3', 'newPartition' - """.stripMargin) - } - assert(e.getMessage.contains( - "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [p2]")) + sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |partition (b=2, c=3) + |SELECT 5, 6 + """.stripMargin) + checkAnswer(sql(selQuery), Row(5, 2, 3, 6)) + + val e = intercept[AnalysisException] { + sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |partition (b=2, c) IF NOT EXISTS + |SELECT 7, 8, 3 + """.stripMargin) + } + assert(e.getMessage.contains( + "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [c]")) - e = intercept[AnalysisException] { - sql( - s""" - |INSERT OVERWRITE TABLE $table - |partition (p1='a',p2) IF NOT EXISTS - |SELECT 'blarr3', 'b' - """.stripMargin) - } - assert(e.getMessage.contains( - "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [p2]")) + // If the partition already exists, the insert will overwrite the data + // unless users specify IF NOT EXISTS + sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |partition (b=2, c=3) IF NOT EXISTS + |SELECT 9, 10 + """.stripMargin) + checkAnswer(sql(selQuery), Row(5, 2, 3, 6)) - // If the partition already exists, the insert will overwrite the data - // unless users specify IF NOT EXISTS - sql( - s""" - |INSERT OVERWRITE TABLE $table - |partition (p1='a',p2='b') IF NOT EXISTS - |SELECT 'blarr3' - """.stripMargin) - checkAnswer( - sql(selQuery), - Row("blarr2", "a", "b")) - } - } + // ADD PARTITION has the same effect, even if no actual data is inserted. + sql(s"ALTER TABLE $tableName ADD PARTITION (b=21, c=31)") + sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |partition (b=21, c=31) IF NOT EXISTS + |SELECT 20, 24 + """.stripMargin) + checkAnswer(sql(selQuery), Row(5, 2, 3, 6)) } test("Insert ArrayType.containsNull == false") {