Skip to content
Snippets Groups Projects
Commit c1e9a6d2 authored by gatorsmile's avatar gatorsmile Committed by Wenchen Fan
Browse files

[SPARK-17393][SQL] Error Handling when CTAS Against the Same Data Source Table Using Overwrite Mode

### What changes were proposed in this pull request?
When we trying to read a table and then write to the same table using the `Overwrite` save mode, we got a very confusing error message:
For example,
```Scala
      Seq((1, 2)).toDF("i", "j").write.saveAsTable("tab1")
      table("tab1").write.mode(SaveMode.Overwrite).saveAsTable("tab1")
```

```
Job aborted.
org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp
...
Caused by: org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
	at org.apache.spark.sql.execution.datasources
```

After the PR, we will issue an `AnalysisException`:
```
Cannot overwrite table `tab1` that is also being read from
```
### How was this patch tested?
Added test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #14954 from gatorsmile/ctasQueryAnalyze.
parent 1b001b52
No related branches found
No related tags found
No related merge requests found
......@@ -304,6 +304,25 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
failAnalysis(s"Database name ${tblIdent.database.get} is not a valid name for " +
s"metastore. Metastore only accepts table name containing characters, numbers and _.")
}
if (query.isDefined &&
mode == SaveMode.Overwrite &&
catalog.tableExists(tableDesc.identifier)) {
// Need to remove SubQuery operator.
EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
// Only do the check if the table is a data source table
// (the relation is a BaseRelation).
case l @ LogicalRelation(dest: BaseRelation, _, _) =>
// Get all input data source relations of the query.
val srcRelations = query.get.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
}
if (srcRelations.contains(dest)) {
failAnalysis(
s"Cannot overwrite table ${tableDesc.identifier} that is also being read from")
}
case _ => // OK
}
}
case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: InsertableRelation, _, _),
......@@ -357,32 +376,6 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")
case CreateTable(tableDesc, mode, Some(query)) =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
if (mode == SaveMode.Overwrite && catalog.tableExists(tableDesc.identifier)) {
// Need to remove SubQuery operator.
EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
// Only do the check if the table is a data source table
// (the relation is a BaseRelation).
case l @ LogicalRelation(dest: BaseRelation, _, _) =>
// Get all input data source relations of the query.
val srcRelations = query.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
}
if (srcRelations.contains(dest)) {
failAnalysis(
s"Cannot overwrite table ${tableDesc.identifier} that is also being read from.")
} else {
// OK
}
case _ => // OK
}
} else {
// OK
}
case _ => // OK
}
}
......
......@@ -1151,6 +1151,58 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}
test("saveAsTable - source and target are the same table") {
val tableName = "tab1"
withTable(tableName) {
Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName)
table(tableName).write.mode(SaveMode.Append).saveAsTable(tableName)
checkAnswer(table(tableName),
Seq(Row(1, 2), Row(1, 2)))
table(tableName).write.mode(SaveMode.Ignore).saveAsTable(tableName)
checkAnswer(table(tableName),
Seq(Row(1, 2), Row(1, 2)))
var e = intercept[AnalysisException] {
table(tableName).write.mode(SaveMode.Overwrite).saveAsTable(tableName)
}.getMessage
assert(e.contains(s"Cannot overwrite table `$tableName` that is also being read from"))
e = intercept[AnalysisException] {
table(tableName).write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName)
}.getMessage
assert(e.contains(s"Table `$tableName` already exists"))
}
}
test("insertInto - source and target are the same table") {
val tableName = "tab1"
withTable(tableName) {
Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName)
table(tableName).write.mode(SaveMode.Append).insertInto(tableName)
checkAnswer(
table(tableName),
Seq(Row(1, 2), Row(1, 2)))
table(tableName).write.mode(SaveMode.Ignore).insertInto(tableName)
checkAnswer(
table(tableName),
Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2)))
table(tableName).write.mode(SaveMode.ErrorIfExists).insertInto(tableName)
checkAnswer(
table(tableName),
Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2)))
val e = intercept[AnalysisException] {
table(tableName).write.mode(SaveMode.Overwrite).insertInto(tableName)
}.getMessage
assert(e.contains(s"Cannot overwrite a path that is also being read from"))
}
}
test("saveAsTable[append]: less columns") {
withTable("saveAsTable_less_columns") {
Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_less_columns")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment