diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index f741dcfbf2002ac6a08ac55b4f7bbcb694d6f01b..239e73ef6986c94e4aae520c92a70da6290475d6 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -243,8 +243,10 @@ query ; insertInto - : INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)? - | INSERT INTO TABLE? tableIdentifier partitionSpec? + : INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable + | INSERT INTO TABLE? tableIdentifier partitionSpec? #insertIntoTable + | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? #insertOverwriteHiveDir + | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? #insertOverwriteDir ; partitionSpecLocation @@ -745,6 +747,7 @@ nonReserved | AND | CASE | CAST | DISTINCT | DIV | ELSE | END | FUNCTION | INTERVAL | MACRO | OR | STRATIFY | THEN | UNBOUNDED | WHEN | DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT | CURRENT_DATE | CURRENT_TIMESTAMP + | DIRECTORY ; SELECT: 'SELECT'; @@ -815,6 +818,7 @@ WITH: 'WITH'; VALUES: 'VALUES'; CREATE: 'CREATE'; TABLE: 'TABLE'; +DIRECTORY: 'DIRECTORY'; VIEW: 'VIEW'; REPLACE: 'REPLACE'; INSERT: 'INSERT'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 6ab4153bac70ef780600a3c1a7714511760d5124..33ba0867a33e0c2660efd6ceb1bffb380009efa2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -146,6 +146,9 @@ object UnsupportedOperationChecker { throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " + "streaming DataFrames/Datasets") + case _: InsertIntoDir => + throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets") + // mapGroupsWithState and flatMapGroupsWithState case m: FlatMapGroupsWithState if m.isStreaming => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8a45c5216781b11a24774b0ea36d54e7702650ef..891f61698f177cad79ecc72ade6d1336d965782b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ @@ -178,11 +179,64 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Add an INSERT INTO [TABLE]/INSERT OVERWRITE TABLE operation to the logical plan. + * Parameters used for writing query to a table: + * (tableIdentifier, partitionKeys, exists). + */ + type InsertTableParams = (TableIdentifier, Map[String, Option[String]], Boolean) + + /** + * Parameters used for writing query to a directory: (isLocal, CatalogStorageFormat, provider). + */ + type InsertDirParams = (Boolean, CatalogStorageFormat, Option[String]) + + /** + * Add an + * {{{ + * INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? + * INSERT INTO [TABLE] tableIdentifier [partitionSpec] + * INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat] + * INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS tablePropertyList] + * }}} + * operation to logical plan */ private def withInsertInto( ctx: InsertIntoContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { + ctx match { + case table: InsertIntoTableContext => + val (tableIdent, partitionKeys, exists) = visitInsertIntoTable(table) + InsertIntoTable(UnresolvedRelation(tableIdent), partitionKeys, query, false, exists) + case table: InsertOverwriteTableContext => + val (tableIdent, partitionKeys, exists) = visitInsertOverwriteTable(table) + InsertIntoTable(UnresolvedRelation(tableIdent), partitionKeys, query, true, exists) + case dir: InsertOverwriteDirContext => + val (isLocal, storage, provider) = visitInsertOverwriteDir(dir) + InsertIntoDir(isLocal, storage, provider, query, overwrite = true) + case hiveDir: InsertOverwriteHiveDirContext => + val (isLocal, storage, provider) = visitInsertOverwriteHiveDir(hiveDir) + InsertIntoDir(isLocal, storage, provider, query, overwrite = true) + case _ => + throw new ParseException("Invalid InsertIntoContext", ctx) + } + } + + /** + * Add an INSERT INTO TABLE operation to the logical plan. + */ + override def visitInsertIntoTable( + ctx: InsertIntoTableContext): InsertTableParams = withOrigin(ctx) { + val tableIdent = visitTableIdentifier(ctx.tableIdentifier) + val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty) + + (tableIdent, partitionKeys, false) + } + + /** + * Add an INSERT OVERWRITE TABLE operation to the logical plan. + */ + override def visitInsertOverwriteTable( + ctx: InsertOverwriteTableContext): InsertTableParams = withOrigin(ctx) { + assert(ctx.OVERWRITE() != null) val tableIdent = visitTableIdentifier(ctx.tableIdentifier) val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty) @@ -192,12 +246,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx) } - InsertIntoTable( - UnresolvedRelation(tableIdent), - partitionKeys, - query, - ctx.OVERWRITE != null, - ctx.EXISTS != null) + (tableIdent, partitionKeys, ctx.EXISTS() != null) + } + + /** + * Write to a directory, returning a [[InsertIntoDir]] logical plan. + */ + override def visitInsertOverwriteDir( + ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) { + throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx) + } + + /** + * Write to a directory, returning a [[InsertIntoDir]] logical plan. + */ + override def visitInsertOverwriteHiveDir( + ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) { + throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 4b3054dbfe2f000b6d1fc8f01c4ce8ce5d67721d..f443cd5a69de3ec76624c97b257fb87088d24e01 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ @@ -359,6 +359,30 @@ case class InsertIntoTable( override lazy val resolved: Boolean = false } +/** + * Insert query result into a directory. + * + * @param isLocal Indicates whether the specified directory is local directory + * @param storage Info about output file, row and what serialization format + * @param provider Specifies what data source to use; only used for data source file. + * @param child The query to be executed + * @param overwrite If true, the existing directory will be overwritten + * + * Note that this plan is unresolved and has to be replaced by the concrete implementations + * during analysis. + */ +case class InsertIntoDir( + isLocal: Boolean, + storage: CatalogStorageFormat, + provider: Option[String], + child: LogicalPlan, + overwrite: Boolean = true) + extends UnaryNode { + + override def output: Seq[Attribute] = Seq.empty + override lazy val resolved: Boolean = false +} + /** * A container for holding the view description(CatalogTable), and the output of the view. The * child should be a logical plan parsed from the `CatalogTable.viewText`, should throw an error diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index d3f6ab5654689b9e5fab5f96fae410408bde0362..d38919b5d940e1fa92accf5771f5c2db2c5e5a82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.{CreateTable, _} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} import org.apache.spark.sql.types.StructType @@ -1512,4 +1512,81 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { query: LogicalPlan): LogicalPlan = { RepartitionByExpression(expressions, query, conf.numShufflePartitions) } + + /** + * Return the parameters for [[InsertIntoDir]] logical plan. + * + * Expected format: + * {{{ + * INSERT OVERWRITE DIRECTORY + * [path] + * [OPTIONS table_property_list] + * select_statement; + * }}} + */ + override def visitInsertOverwriteDir( + ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) { + if (ctx.LOCAL != null) { + throw new ParseException( + "LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source", ctx) + } + + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + var storage = DataSource.buildStorageFormatFromOptions(options) + + val path = Option(ctx.path).map(string).getOrElse("") + + if (!(path.isEmpty ^ storage.locationUri.isEmpty)) { + throw new ParseException( + "Directory path and 'path' in OPTIONS should be specified one, but not both", ctx) + } + + if (!path.isEmpty) { + val customLocation = Some(CatalogUtils.stringToURI(path)) + storage = storage.copy(locationUri = customLocation) + } + + val provider = ctx.tableProvider.qualifiedName.getText + + (false, storage, Some(provider)) + } + + /** + * Return the parameters for [[InsertIntoDir]] logical plan. + * + * Expected format: + * {{{ + * INSERT OVERWRITE [LOCAL] DIRECTORY + * path + * [ROW FORMAT row_format] + * [STORED AS file_format] + * select_statement; + * }}} + */ + override def visitInsertOverwriteHiveDir( + ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) { + validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) + val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) + .getOrElse(CatalogStorageFormat.empty) + val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) + .getOrElse(CatalogStorageFormat.empty) + + val path = string(ctx.path) + // The path field is required + if (path.isEmpty) { + operationNotAllowed("INSERT OVERWRITE DIRECTORY must be accompanied by path", ctx) + } + + val defaultStorage = HiveSerDe.getDefaultStorage(conf) + + val storage = CatalogStorageFormat( + locationUri = Some(CatalogUtils.stringToURI(path)), + inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), + outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), + serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), + compressed = false, + properties = rowStorage.properties ++ fileStorage.properties) + + (ctx.LOCAL != null, storage, Some(DDLUtils.HIVE_PROVIDER)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala new file mode 100644 index 0000000000000000000000000000000000000000..633de4c37af940df7d566222d21d74cf69d04172 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.SparkException +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources._ + +/** + * A command used to write the result of a query to a directory. + * + * The syntax of using this command in SQL is: + * {{{ + * INSERT OVERWRITE DIRECTORY (path=STRING)? + * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...]) + * SELECT ... + * }}} + * + * @param storage storage format used to describe how the query result is stored. + * @param provider the data source type to be used + * @param query the logical plan representing data to write to + * @param overwrite whthere overwrites existing directory + */ +case class InsertIntoDataSourceDirCommand( + storage: CatalogStorageFormat, + provider: String, + query: LogicalPlan, + overwrite: Boolean) extends RunnableCommand { + + override def children: Seq[LogicalPlan] = Seq(query) + + override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { + assert(children.length == 1) + assert(storage.locationUri.nonEmpty, "Directory path is required") + assert(provider.nonEmpty, "Data source is required") + + // Create the relation based on the input logical plan: `query`. + val pathOption = storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) + + val dataSource = DataSource( + sparkSession, + className = provider, + options = storage.properties ++ pathOption, + catalogTable = None) + + val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass) + if (!isFileFormat) { + throw new SparkException( + "Only Data Sources providing FileFormat are supported: " + dataSource.providingClass) + } + + val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists + try { + sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query)) + dataSource.writeAndRead(saveMode, query) + } catch { + case ex: AnalysisException => + logError(s"Failed to write to directory " + storage.locationUri.toString, ex) + throw ex + } + + Seq.empty[Row] + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 7611e1c2e268cf3916801dc2163de01ef6148e8d..b06f4ccaa3bbf804eed3676dfbd1414338ab058f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter import org.apache.spark.sql.internal.HiveSerDe @@ -869,4 +870,18 @@ object DDLUtils { } } } + + /** + * Throws exception if outputPath tries to overwrite inputpath. + */ + def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = { + val inputPaths = query.collect { + case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths + }.flatten + + if (inputPaths.contains(outputPath)) { + throw new AnalysisException( + "Cannot overwrite a path that is also being read from.") + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 5d6223dffd285bc985565797fcb7993c2982cf2a..018f24e290b4b11fc63b1c5f5a04726caf7fe4c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -17,8 +17,11 @@ package org.apache.spark.sql.execution.datasources +import java.util.Locale import java.util.concurrent.Callable +import org.apache.hadoop.fs.Path + import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -29,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} @@ -142,6 +145,14 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast parts, query, overwrite, false) if parts.isEmpty => InsertIntoDataSourceCommand(l, query, overwrite) + case InsertIntoDir(_, storage, provider, query, overwrite) + if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) != DDLUtils.HIVE_PROVIDER => + + val outputPath = new Path(storage.locationUri.get) + if (overwrite) DDLUtils.verifyNotReadPath(query, outputPath) + + InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite) + case i @ InsertIntoTable( l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, query, overwrite, _) => // If the InsertIntoTable command is for a partitioned HadoopFsRelation and @@ -178,15 +189,9 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } val outputPath = t.location.rootPaths.head - val inputPaths = actualQuery.collect { - case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths - }.flatten + if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath) val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append - if (overwrite && inputPaths.contains(outputPath)) { - throw new AnalysisException( - "Cannot overwrite a path that is also being read from.") - } val partitionSchema = actualQuery.resolve( t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 4ee38215f5973a2c80b92adcd03a94376ab1f6b4..fa5172ca8a3e7a5d9f7ec8a0d0b847770edc6b6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -32,7 +32,8 @@ import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, Project, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Project, ScriptTransformation} import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -524,6 +525,55 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { assert(e.message.contains("you can only specify one of them.")) } + test("insert overwrite directory") { + val v1 = "INSERT OVERWRITE DIRECTORY '/tmp/file' USING parquet SELECT 1 as a" + parser.parsePlan(v1) match { + case InsertIntoDir(_, storage, provider, query, overwrite) => + assert(storage.locationUri.isDefined && storage.locationUri.get.toString == "/tmp/file") + case other => + fail(s"Expected to parse ${classOf[InsertIntoDataSourceDirCommand].getClass.getName}" + + " from query," + s" got ${other.getClass.getName}: $v1") + } + + val v2 = "INSERT OVERWRITE DIRECTORY USING parquet SELECT 1 as a" + val e2 = intercept[ParseException] { + parser.parsePlan(v2) + } + assert(e2.message.contains( + "Directory path and 'path' in OPTIONS should be specified one, but not both")) + + val v3 = + """ + | INSERT OVERWRITE DIRECTORY USING json + | OPTIONS ('path' '/tmp/file', a 1, b 0.1, c TRUE) + | SELECT 1 as a + """.stripMargin + parser.parsePlan(v3) match { + case InsertIntoDir(_, storage, provider, query, overwrite) => + assert(storage.locationUri.isDefined && provider == Some("json")) + assert(storage.properties.get("a") == Some("1")) + assert(storage.properties.get("b") == Some("0.1")) + assert(storage.properties.get("c") == Some("true")) + assert(!storage.properties.contains("abc")) + assert(!storage.properties.contains("path")) + case other => + fail(s"Expected to parse ${classOf[InsertIntoDataSourceDirCommand].getClass.getName}" + + " from query," + s"got ${other.getClass.getName}: $v1") + } + + val v4 = + """ + | INSERT OVERWRITE DIRECTORY '/tmp/file' USING json + | OPTIONS ('path' '/tmp/file', a 1, b 0.1, c TRUE) + | SELECT 1 as a + """.stripMargin + val e4 = intercept[ParseException] { + parser.parsePlan(v4) + } + assert(e4.message.contains( + "Directory path and 'path' in OPTIONS should be specified one, but not both")) + } + // ALTER TABLE table_name RENAME TO new_table_name; // ALTER VIEW view_name RENAME TO new_view_name; test("alter table/view: rename table/view") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 41abff2a5da2542cdf5d3fb5787852919054ee28..875b74551addbb22fc60b20babf7d188d6582fbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.sources import java.io.File +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -366,4 +367,63 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { Row(Array(1, 2), Array("a", "b"))) } } + + test("insert overwrite directory") { + withTempDir { dir => + val path = dir.toURI.getPath + + val v1 = + s""" + | INSERT OVERWRITE DIRECTORY '${path}' + | USING json + | OPTIONS (a 1, b 0.1, c TRUE) + | SELECT 1 as a, 'c' as b + """.stripMargin + + spark.sql(v1) + + checkAnswer( + spark.read.json(dir.getCanonicalPath), + sql("SELECT 1 as a, 'c' as b")) + } + } + + test("insert overwrite directory with path in options") { + withTempDir { dir => + val path = dir.toURI.getPath + + val v1 = + s""" + | INSERT OVERWRITE DIRECTORY + | USING json + | OPTIONS ('path' '${path}') + | SELECT 1 as a, 'c' as b + """.stripMargin + + spark.sql(v1) + + checkAnswer( + spark.read.json(dir.getCanonicalPath), + sql("SELECT 1 as a, 'c' as b")) + } + } + + test("insert overwrite directory to data source not providing FileFormat") { + withTempDir { dir => + val path = dir.toURI.getPath + + val v1 = + s""" + | INSERT OVERWRITE DIRECTORY '${path}' + | USING JDBC + | OPTIONS (a 1, b 0.1, c TRUE) + | SELECT 1 as a, 'c' as b + """.stripMargin + val e = intercept[SparkException] { + spark.sql(v1) + }.getMessage + + assert(e.contains("Only Data Sources providing FileFormat are supported")) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 47203a80c37bddc2f39425c844959c3ecc7a503f..caf554d9ea51073c069e023098291ffa56f0a433 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, + ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} @@ -157,6 +158,14 @@ object HiveAnalysis extends Rule[LogicalPlan] { case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => DDLUtils.checkDataSchemaFieldNames(tableDesc) CreateHiveTableAsSelectCommand(tableDesc, query, mode) + + case InsertIntoDir(isLocal, storage, provider, child, overwrite) + if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER => + + val outputPath = new Path(storage.locationUri.get) + if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath) + + InsertIntoHiveDirCommand(isLocal, storage, child, overwrite) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala new file mode 100644 index 0000000000000000000000000000000000000000..15ca1dfc76d19ef759a01bb9f16b6298e5993ee9 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.{File, IOException} +import java.net.URI +import java.text.SimpleDateFormat +import java.util.{Date, Locale, Random} + +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.ql.exec.TaskRunner + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.client.HiveVersion + +// Base trait for getting a temporary location for writing data +private[hive] trait HiveTmpPath extends Logging { + + var createdTempDir: Option[Path] = None + + def getExternalTmpPath( + sparkSession: SparkSession, + hadoopConf: Configuration, + path: Path): Path = { + import org.apache.spark.sql.hive.client.hive._ + + // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under + // a common scratch directory. After the writing is finished, Hive will simply empty the table + // directory and move the staging directory to it. + // After Hive 1.1, Hive will create the staging directory under the table directory, and when + // moving staging directory to table directory, Hive will still empty the table directory, but + // will exclude the staging directory there. + // We have to follow the Hive behavior here, to avoid troubles. For example, if we create + // staging directory under the table director for Hive prior to 1.1, the staging directory will + // be removed by Hive when Hive is trying to empty the table directory. + val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) + val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1) + + // Ensure all the supported versions are considered here. + assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == + allSupportedHiveVersions) + + val externalCatalog = sparkSession.sharedState.externalCatalog + val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version + val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") + + if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { + oldVersionExternalTempPath(path, hadoopConf, scratchDir) + } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { + newVersionExternalTempPath(path, hadoopConf, stagingDir) + } else { + throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) + } + } + + def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = { + // Attempt to delete the staging directory and the inclusive files. If failed, the files are + // expected to be dropped at the normal termination of VM since deleteOnExit is used. + try { + createdTempDir.foreach { path => + val fs = path.getFileSystem(hadoopConf) + if (fs.delete(path, true)) { + // If we successfully delete the staging directory, remove it from FileSystem's cache. + fs.cancelDeleteOnExit(path) + } + } + } catch { + case NonFatal(e) => + val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) + } + } + + // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 + private def oldVersionExternalTempPath( + path: Path, + hadoopConf: Configuration, + scratchDir: String): Path = { + val extURI: URI = path.toUri + val scratchPath = new Path(scratchDir, executionId) + var dirPath = new Path( + extURI.getScheme, + extURI.getAuthority, + scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) + + try { + val fs: FileSystem = dirPath.getFileSystem(hadoopConf) + dirPath = new Path(fs.makeQualified(dirPath).toString()) + + if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { + throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) + } + createdTempDir = Some(dirPath) + fs.deleteOnExit(dirPath) + } catch { + case e: IOException => + throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e) + } + dirPath + } + + // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 + private def newVersionExternalTempPath( + path: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { + val extURI: URI = path.toUri + if (extURI.getScheme == "viewfs") { + getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir) + } else { + new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") + } + } + + private def getExtTmpPathRelTo( + path: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { + new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 + } + + private def getExternalScratchDir( + extURI: URI, + hadoopConf: Configuration, + stagingDir: String): Path = { + getStagingDir( + new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), + hadoopConf, + stagingDir) + } + + private def getStagingDir( + inputPath: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { + val inputPathUri: URI = inputPath.toUri + val inputPathName: String = inputPathUri.getPath + val fs: FileSystem = inputPath.getFileSystem(hadoopConf) + var stagingPathName: String = + if (inputPathName.indexOf(stagingDir) == -1) { + new Path(inputPathName, stagingDir).toString + } else { + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) + } + + // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the + // staging directory needs to avoid being deleted when users set hive.exec.stagingdir + // under the table directory. + if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) && + !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) { + logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + + "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + + "directory.") + stagingPathName = new Path(inputPathName, ".hive-staging").toString + } + + val dir: Path = + fs.makeQualified( + new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) + logDebug("Created staging dir = " + dir + " for path = " + inputPath) + try { + if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { + throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") + } + createdTempDir = Some(dir) + fs.deleteOnExit(dir) + } catch { + case e: IOException => + throw new RuntimeException( + "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) + } + dir + } + + private def executionId: String = { + val rand: Random = new Random + val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) + "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) + } +} + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala new file mode 100644 index 0000000000000000000000000000000000000000..2110038db36ac4711caa28104126466b03610a2d --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import scala.language.existentials + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.hadoop.mapred._ + +import org.apache.spark.SparkException +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.hive.client.HiveClientImpl + +/** + * Command for writing the results of `query` to file system. + * + * The syntax of using this command in SQL is: + * {{{ + * INSERT OVERWRITE [LOCAL] DIRECTORY + * path + * [ROW FORMAT row_format] + * [STORED AS file_format] + * SELECT ... + * }}} + * + * @param isLocal whether the path specified in `storage` is a local directory + * @param storage storage format used to describe how the query result is stored. + * @param query the logical plan representing data to write to + * @param overwrite whether overwrites existing directory + */ +case class InsertIntoHiveDirCommand( + isLocal: Boolean, + storage: CatalogStorageFormat, + query: LogicalPlan, + overwrite: Boolean) extends SaveAsHiveFile with HiveTmpPath { + + override def children: Seq[LogicalPlan] = query :: Nil + + override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { + assert(children.length == 1) + assert(storage.locationUri.nonEmpty) + + val hiveTable = HiveClientImpl.toHiveTable(CatalogTable( + identifier = TableIdentifier(storage.locationUri.get.toString, Some("default")), + tableType = org.apache.spark.sql.catalyst.catalog.CatalogTableType.VIEW, + storage = storage, + schema = query.schema + )) + hiveTable.getMetadata.put(serdeConstants.SERIALIZATION_LIB, + storage.serde.getOrElse(classOf[LazySimpleSerDe].getName)) + + val tableDesc = new TableDesc( + hiveTable.getInputFormatClass, + hiveTable.getOutputFormatClass, + hiveTable.getMetadata + ) + + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val jobConf = new JobConf(hadoopConf) + + val targetPath = new Path(storage.locationUri.get) + val writeToPath = + if (isLocal) { + val localFileSystem = FileSystem.getLocal(jobConf) + localFileSystem.makeQualified(targetPath) + } else { + val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf) + val dfs = qualifiedPath.getFileSystem(jobConf) + if (!dfs.exists(qualifiedPath)) { + dfs.mkdirs(qualifiedPath.getParent) + } + qualifiedPath + } + + val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, writeToPath) + val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc( + tmpPath.toString, tableDesc, false) + + try { + saveAsHiveFile( + sparkSession = sparkSession, + plan = children.head, + hadoopConf = hadoopConf, + fileSinkConf = fileSinkConf, + outputLocation = tmpPath.toString) + + val fs = writeToPath.getFileSystem(hadoopConf) + if (overwrite && fs.exists(writeToPath)) { + fs.listStatus(writeToPath).foreach { existFile => + if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true) + } + } + + fs.listStatus(tmpPath).foreach { + tmpFile => fs.rename(tmpFile.getPath, writeToPath) + } + } catch { + case e: Throwable => + throw new SparkException( + "Failed inserting overwrite directory " + storage.locationUri.get, e) + } finally { + deleteExternalTmpPath(hadoopConf) + } + + Seq.empty[Row] + } +} + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 46610f84dd822ebec090d24b9c389e2f1259ecd1..5bdc97a2982df9dc668698a0957eb7cfa074ea6a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,32 +17,22 @@ package org.apache.spark.sql.hive.execution -import java.io.{File, IOException} -import java.net.URI -import java.text.SimpleDateFormat -import java.util.{Date, Locale, Random} - import scala.util.control.NonFatal -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.ErrorMsg -import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException -import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.command.{CommandUtils, DataWritingCommand} -import org.apache.spark.sql.execution.datasources.FileFormatWriter +import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} -import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion} +import org.apache.spark.sql.hive.client.HiveClientImpl /** @@ -80,152 +70,10 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, - ifPartitionNotExists: Boolean) extends DataWritingCommand { + ifPartitionNotExists: Boolean) extends SaveAsHiveFile with HiveTmpPath { override def children: Seq[LogicalPlan] = query :: Nil - var createdTempDir: Option[Path] = None - - private def executionId: String = { - val rand: Random = new Random - val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) - "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) - } - - private def getStagingDir( - inputPath: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val inputPathUri: URI = inputPath.toUri - val inputPathName: String = inputPathUri.getPath - val fs: FileSystem = inputPath.getFileSystem(hadoopConf) - var stagingPathName: String = - if (inputPathName.indexOf(stagingDir) == -1) { - new Path(inputPathName, stagingDir).toString - } else { - inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) - } - - // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the - // staging directory needs to avoid being deleted when users set hive.exec.stagingdir - // under the table directory. - if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) && - !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) { - logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + - "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + - "directory.") - stagingPathName = new Path(inputPathName, ".hive-staging").toString - } - - val dir: Path = - fs.makeQualified( - new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) - logDebug("Created staging dir = " + dir + " for path = " + inputPath) - try { - if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") - } - createdTempDir = Some(dir) - fs.deleteOnExit(dir) - } catch { - case e: IOException => - throw new RuntimeException( - "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) - } - dir - } - - private def getExternalScratchDir( - extURI: URI, - hadoopConf: Configuration, - stagingDir: String): Path = { - getStagingDir( - new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), - hadoopConf, - stagingDir) - } - - def getExternalTmpPath( - path: Path, - hiveVersion: HiveVersion, - hadoopConf: Configuration, - stagingDir: String, - scratchDir: String): Path = { - import org.apache.spark.sql.hive.client.hive._ - - // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under - // a common scratch directory. After the writing is finished, Hive will simply empty the table - // directory and move the staging directory to it. - // After Hive 1.1, Hive will create the staging directory under the table directory, and when - // moving staging directory to table directory, Hive will still empty the table directory, but - // will exclude the staging directory there. - // We have to follow the Hive behavior here, to avoid troubles. For example, if we create - // staging directory under the table director for Hive prior to 1.1, the staging directory will - // be removed by Hive when Hive is trying to empty the table directory. - val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) - val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1) - - // Ensure all the supported versions are considered here. - assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == - allSupportedHiveVersions) - - if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { - oldVersionExternalTempPath(path, hadoopConf, scratchDir) - } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { - newVersionExternalTempPath(path, hadoopConf, stagingDir) - } else { - throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) - } - } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 - def oldVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - scratchDir: String): Path = { - val extURI: URI = path.toUri - val scratchPath = new Path(scratchDir, executionId) - var dirPath = new Path( - extURI.getScheme, - extURI.getAuthority, - scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) - - try { - val fs: FileSystem = dirPath.getFileSystem(hadoopConf) - dirPath = new Path(fs.makeQualified(dirPath).toString()) - - if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) - } - createdTempDir = Some(dirPath) - fs.deleteOnExit(dirPath) - } catch { - case e: IOException => - throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e) - } - dirPath - } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 - def newVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val extURI: URI = path.toUri - if (extURI.getScheme == "viewfs") { - getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir) - } else { - new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") - } - } - - def getExtTmpPathRelTo( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 - } - /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the @@ -234,12 +82,8 @@ case class InsertIntoHiveTable( override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { assert(children.length == 1) - val sessionState = sparkSession.sessionState val externalCatalog = sparkSession.sharedState.externalCatalog - val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version - val hadoopConf = sessionState.newHadoopConf() - val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") - val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") + val hadoopConf = sparkSession.sessionState.newHadoopConf() val hiveQlTable = HiveClientImpl.toHiveTable(table) // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer @@ -254,23 +98,8 @@ case class InsertIntoHiveTable( hiveQlTable.getMetadata ) val tableLocation = hiveQlTable.getDataLocation - val tmpLocation = - getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir) + val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean - - if (isCompressed) { - // Please note that isCompressed, "mapreduce.output.fileoutputformat.compress", - // "mapreduce.output.fileoutputformat.compress.codec", and - // "mapreduce.output.fileoutputformat.compress.type" - // have no impact on ORC because it uses table properties to store compression information. - hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") - fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(hadoopConf - .get("mapreduce.output.fileoutputformat.compress.codec")) - fileSinkConf.setCompressType(hadoopConf - .get("mapreduce.output.fileoutputformat.compress.type")) - } val numDynamicPartitions = partition.values.count(_.isEmpty) val numStaticPartitions = partition.values.count(_.nonEmpty) @@ -332,11 +161,6 @@ case class InsertIntoHiveTable( case _ => // do nothing since table has no bucketing } - val committer = FileCommitProtocol.instantiate( - sparkSession.sessionState.conf.fileCommitProtocolClass, - jobId = java.util.UUID.randomUUID().toString, - outputPath = tmpLocation.toString) - val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name => query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse { throw new AnalysisException( @@ -344,17 +168,13 @@ case class InsertIntoHiveTable( }.asInstanceOf[Attribute] } - FileFormatWriter.write( + saveAsHiveFile( sparkSession = sparkSession, plan = children.head, - fileFormat = new HiveFileFormat(fileSinkConf), - committer = committer, - outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty), hadoopConf = hadoopConf, - partitionColumns = partitionAttributes, - bucketSpec = None, - statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), - options = Map.empty) + fileSinkConf = fileSinkConf, + outputLocation = tmpLocation.toString, + partitionAttributes = partitionAttributes) if (partition.nonEmpty) { if (numDynamicPartitions > 0) { @@ -422,18 +242,7 @@ case class InsertIntoHiveTable( // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. - try { - createdTempDir.foreach { path => - val fs = path.getFileSystem(hadoopConf) - if (fs.delete(path, true)) { - // If we successfully delete the staging directory, remove it from FileSystem's cache. - fs.cancelDeleteOnExit(path) - } - } - } catch { - case NonFatal(e) => - logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) - } + deleteExternalTmpPath(hadoopConf) // un-cache this table. sparkSession.catalog.uncacheTable(table.identifier.quotedString) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala new file mode 100644 index 0000000000000000000000000000000000000000..7de9b421245f04e80bf785f32fc192d9f535c0c2 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.DataWritingCommand +import org.apache.spark.sql.execution.datasources.FileFormatWriter +import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} + +// Base trait from which all hive insert statement physical execution extends. +private[hive] trait SaveAsHiveFile extends DataWritingCommand { + + protected def saveAsHiveFile( + sparkSession: SparkSession, + plan: SparkPlan, + hadoopConf: Configuration, + fileSinkConf: FileSinkDesc, + outputLocation: String, + partitionAttributes: Seq[Attribute] = Nil): Unit = { + + val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean + if (isCompressed) { + // Please note that isCompressed, "mapreduce.output.fileoutputformat.compress", + // "mapreduce.output.fileoutputformat.compress.codec", and + // "mapreduce.output.fileoutputformat.compress.type" + // have no impact on ORC because it uses table properties to store compression information. + hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(hadoopConf + .get("mapreduce.output.fileoutputformat.compress.codec")) + fileSinkConf.setCompressType(hadoopConf + .get("mapreduce.output.fileoutputformat.compress.type")) + } + + val committer = FileCommitProtocol.instantiate( + sparkSession.sessionState.conf.fileCommitProtocolClass, + jobId = java.util.UUID.randomUUID().toString, + outputPath = outputLocation) + + FileFormatWriter.write( + sparkSession = sparkSession, + plan = plan, + fileFormat = new HiveFileFormat(fileSinkConf), + committer = committer, + outputSpec = FileFormatWriter.OutputSpec(outputLocation, Map.empty), + hadoopConf = hadoopConf, + partitionColumns = partitionAttributes, + bucketSpec = None, + statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), + options = Map.empty) + } +} + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala similarity index 78% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index e93c6549f1538de0dc939de5260f0642ee39f119..aa5cae33f5cd92111be6d1ea9e4dcab0c629b864 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -34,7 +34,7 @@ case class TestData(key: Int, value: String) case class ThreeCloumntable(key: Int, value: String, key1: String) -class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter +class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter with SQLTestUtils { import spark.implicits._ @@ -548,4 +548,184 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } } + + test("insert overwrite to dir from hive metastore table") { + withTempDir { dir => + val path = dir.toURI.getPath + + sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path}' SELECT * FROM src where key < 10") + + sql( + s""" + |INSERT OVERWRITE LOCAL DIRECTORY '${path}' + |STORED AS orc + |SELECT * FROM src where key < 10 + """.stripMargin) + + // use orc data source to check the data of path is right. + withTempView("orc_source") { + sql( + s""" + |CREATE TEMPORARY VIEW orc_source + |USING org.apache.spark.sql.hive.orc + |OPTIONS ( + | PATH '${dir.getCanonicalPath}' + |) + """.stripMargin) + + checkAnswer( + sql("select * from orc_source"), + sql("select * from src where key < 10")) + } + } + } + + test("insert overwrite to local dir from temp table") { + withTempView("test_insert_table") { + spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table") + + withTempDir { dir => + val path = dir.toURI.getPath + + sql( + s""" + |INSERT OVERWRITE LOCAL DIRECTORY '${path}' + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + |SELECT * FROM test_insert_table + """.stripMargin) + + sql( + s""" + |INSERT OVERWRITE LOCAL DIRECTORY '${path}' + |STORED AS orc + |SELECT * FROM test_insert_table + """.stripMargin) + + // use orc data source to check the data of path is right. + checkAnswer( + spark.read.orc(dir.getCanonicalPath), + sql("select * from test_insert_table")) + } + } + } + + test("insert overwrite to dir from temp table") { + withTempView("test_insert_table") { + spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table") + + withTempDir { dir => + val pathUri = dir.toURI + + sql( + s""" + |INSERT OVERWRITE DIRECTORY '${pathUri}' + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + |SELECT * FROM test_insert_table + """.stripMargin) + + sql( + s""" + |INSERT OVERWRITE DIRECTORY '${pathUri}' + |STORED AS orc + |SELECT * FROM test_insert_table + """.stripMargin) + + // use orc data source to check the data of path is right. + checkAnswer( + spark.read.orc(dir.getCanonicalPath), + sql("select * from test_insert_table")) + } + } + } + + test("multi insert overwrite to dir") { + withTempView("test_insert_table") { + spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table") + + withTempDir { dir => + val pathUri = dir.toURI + + withTempDir { dir2 => + val pathUri2 = dir2.toURI + + sql( + s""" + |FROM test_insert_table + |INSERT OVERWRITE DIRECTORY '${pathUri}' + |STORED AS orc + |SELECT id + |INSERT OVERWRITE DIRECTORY '${pathUri2}' + |STORED AS orc + |SELECT * + """.stripMargin) + + // use orc data source to check the data of path is right. + checkAnswer( + spark.read.orc(dir.getCanonicalPath), + sql("select id from test_insert_table")) + + checkAnswer( + spark.read.orc(dir2.getCanonicalPath), + sql("select * from test_insert_table")) + } + } + } + } + + test("insert overwrite to dir to illegal path") { + withTempView("test_insert_table") { + spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table") + + val e = intercept[IllegalArgumentException] { + sql( + s""" + |INSERT OVERWRITE LOCAL DIRECTORY 'abc://a' + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + |SELECT * FROM test_insert_table + """.stripMargin) + }.getMessage + + assert(e.contains("Wrong FS: abc://a, expected: file:///")) + } + } + + test("insert overwrite to dir with mixed syntax") { + withTempView("test_insert_table") { + spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table") + + val e = intercept[ParseException] { + sql( + s""" + |INSERT OVERWRITE DIRECTORY 'file://tmp' + |USING json + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + |SELECT * FROM test_insert_table + """.stripMargin) + }.getMessage + + assert(e.contains("mismatched input 'ROW'")) + } + } + + test("insert overwrite to dir with multi inserts") { + withTempView("test_insert_table") { + spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table") + + val e = intercept[ParseException] { + sql( + s""" + |INSERT OVERWRITE DIRECTORY 'file://tmp2' + |USING json + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + |SELECT * FROM test_insert_table + |INSERT OVERWRITE DIRECTORY 'file://tmp2' + |USING json + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + |SELECT * FROM test_insert_table + """.stripMargin) + }.getMessage + + assert(e.contains("mismatched input 'ROW'")) + } + } }