Skip to content
Snippets Groups Projects
Commit 2adb11f6 authored by Wenchen Fan's avatar Wenchen Fan Committed by Yin Huai
Browse files

[SPARK-15173][SQL] DataFrameWriter.insertInto should work with datasource table stored in hive

When we parse `CREATE TABLE USING`, we should build a `CreateTableUsing` plan with the `managedIfNoPath` set to true. Then we will add default table path to options when write it to hive.

new test in `SQLQuerySuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12949 from cloud-fan/bug.
parent c3e23bc0
No related branches found
No related tags found
No related merge requests found
......@@ -307,7 +307,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query)
} else {
val struct = Option(ctx.colTypeList()).map(createStructType)
CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = false)
CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = true)
}
}
......
......@@ -97,7 +97,7 @@ case class CreateDataSourceTableCommand(
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
bucketSpec = None,
options = optionsWithPath).resolveRelation()
options = optionsWithPath).resolveRelation(checkPathExist = false)
CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
......@@ -382,7 +382,8 @@ object CreateDataSourceTableUtils extends Logging {
// TODO: Support persisting partitioned data source relations in Hive compatible format
val qualifiedTableName = tableIdent.quotedString
val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match {
val resolvedRelation = dataSource.resolveRelation(checkPathExist = false)
val (hiveCompatibleTable, logMessage) = (maybeSerDe, resolvedRelation) match {
case _ if skipHiveMetadata =>
val message =
s"Persisting partitioned data source relation $qualifiedTableName into " +
......
......@@ -239,8 +239,15 @@ case class DataSource(
}
}
/** Create a resolved [[BaseRelation]] that can be used to read data from this [[DataSource]] */
def resolveRelation(): BaseRelation = {
/**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]]
*
* @param checkPathExist A flag to indicate whether to check the existence of path or not.
* This flag will be set to false when we create an empty table (the
* path of the table does not exist).
*/
def resolveRelation(checkPathExist: Boolean = true): BaseRelation = {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
......@@ -288,11 +295,11 @@ case class DataSource(
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
if (globPath.isEmpty) {
if (checkPathExist && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
if (!fs.exists(globPath.head)) {
if (checkPathExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
......
......@@ -1553,4 +1553,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
assert(cause.getMessage.contains("Column ordering must be ASC, was 'DESC'"))
}
}
test("insert into datasource table") {
withTable("tbl") {
sql("CREATE TABLE tbl(i INT, j STRING) USING parquet")
Seq(1 -> "a").toDF("i", "j").write.mode("overwrite").insertInto("tbl")
checkAnswer(sql("SELECT * FROM tbl"), Row(1, "a"))
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment