From 2adb11f6db591a7d8199e42dd23c7fb23ef5df3b Mon Sep 17 00:00:00 2001 From: Wenchen Fan <wenchen@databricks.com> Date: Mon, 9 May 2016 12:54:45 -0700 Subject: [PATCH] [SPARK-15173][SQL] DataFrameWriter.insertInto should work with datasource table stored in hive When we parse `CREATE TABLE USING`, we should build a `CreateTableUsing` plan with the `managedIfNoPath` set to true. Then we will add default table path to options when write it to hive. new test in `SQLQuerySuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #12949 from cloud-fan/bug. --- .../spark/sql/execution/SparkSqlParser.scala | 2 +- .../command/createDataSourceTables.scala | 5 +++-- .../sql/execution/datasources/DataSource.scala | 15 +++++++++++---- .../spark/sql/hive/execution/SQLQuerySuite.scala | 8 ++++++++ 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 146e036bb4..a85ac16295 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -307,7 +307,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query) } else { val struct = Option(ctx.colTypeList()).map(createStructType) - CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = false) + CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = true) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index e07ab99ef3..16d6115737 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -97,7 +97,7 @@ case class CreateDataSourceTableCommand( userSpecifiedSchema = userSpecifiedSchema, className = provider, bucketSpec = None, - options = optionsWithPath).resolveRelation() + options = optionsWithPath).resolveRelation(checkPathExist = false) CreateDataSourceTableUtils.createDataSourceTable( sparkSession = sparkSession, @@ -382,7 +382,8 @@ object CreateDataSourceTableUtils extends Logging { // TODO: Support persisting partitioned data source relations in Hive compatible format val qualifiedTableName = tableIdent.quotedString val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean - val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match { + val resolvedRelation = dataSource.resolveRelation(checkPathExist = false) + val (hiveCompatibleTable, logMessage) = (maybeSerDe, resolvedRelation) match { case _ if skipHiveMetadata => val message = s"Persisting partitioned data source relation $qualifiedTableName into " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 618ea3d669..0342ec569d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -239,8 +239,15 @@ case class DataSource( } } - /** Create a resolved [[BaseRelation]] that can be used to read data from this [[DataSource]] */ - def resolveRelation(): BaseRelation = { + /** + * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this + * [[DataSource]] + * + * @param checkPathExist A flag to indicate whether to check the existence of path or not. + * This flag will be set to false when we create an empty table (the + * path of the table does not exist). + */ + def resolveRelation(checkPathExist: Boolean = true): BaseRelation = { val caseInsensitiveOptions = new CaseInsensitiveMap(options) val relation = (providingClass.newInstance(), userSpecifiedSchema) match { // TODO: Throw when too much is given. @@ -288,11 +295,11 @@ case class DataSource( val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified) - if (globPath.isEmpty) { + if (checkPathExist && globPath.isEmpty) { throw new AnalysisException(s"Path does not exist: $qualified") } // Sufficient to check head of the globPath seq for non-glob scenario - if (!fs.exists(globPath.head)) { + if (checkPathExist && !fs.exists(globPath.head)) { throw new AnalysisException(s"Path does not exist: ${globPath.head}") } globPath diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 4845da7c85..1d597fe16d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1553,4 +1553,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { assert(cause.getMessage.contains("Column ordering must be ASC, was 'DESC'")) } } + + test("insert into datasource table") { + withTable("tbl") { + sql("CREATE TABLE tbl(i INT, j STRING) USING parquet") + Seq(1 -> "a").toDF("i", "j").write.mode("overwrite").insertInto("tbl") + checkAnswer(sql("SELECT * FROM tbl"), Row(1, "a")) + } + } } -- GitLab