Skip to content
Snippets Groups Projects
Commit 43dac2c8 authored by Yijie Shen's avatar Yijie Shen Committed by Yin Huai
Browse files

[SPARK-6941] [SQL] Provide a better error message to when inserting into RDD based table

JIRA: https://issues.apache.org/jira/browse/SPARK-6941

Author: Yijie Shen <henry.yijieshen@gmail.com>

Closes #7342 from yijieshen/SPARK-6941 and squashes the following commits:

f82cbe7 [Yijie Shen] reorder import
dd67e40 [Yijie Shen] resolve comments
09518af [Yijie Shen] fix import order in DataframeSuite
0c635d4 [Yijie Shen] make match more specific
9df388d [Yijie Shen] move check into PreWriteCheck
847ab20 [Yijie Shen] Detect insertion error in DataSourceStrategy
parent b536d5dc
No related branches found
No related tags found
No related merge requests found
...@@ -21,7 +21,7 @@ import org.apache.spark.sql.{SaveMode, AnalysisException} ...@@ -21,7 +21,7 @@ import org.apache.spark.sql.{SaveMode, AnalysisException}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog} import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias} import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias}
import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.DataType
...@@ -119,6 +119,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => ...@@ -119,6 +119,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// The relation in l is not an InsertableRelation. // The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.") failAnalysis(s"$l does not allow insertion.")
case logical.InsertIntoTable(t, _, _, _, _) =>
if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) {
failAnalysis(s"Inserting into an RDD-based table is not allowed.")
} else {
// OK
}
case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) => case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of // When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed. // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
......
...@@ -17,19 +17,23 @@ ...@@ -17,19 +17,23 @@
package org.apache.spark.sql package org.apache.spark.sql
import java.io.File
import scala.language.postfixOps import scala.language.postfixOps
import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint} import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils}
class DataFrameSuite extends QueryTest { class DataFrameSuite extends QueryTest with SQLTestUtils {
import org.apache.spark.sql.TestData._ import org.apache.spark.sql.TestData._
lazy val ctx = org.apache.spark.sql.test.TestSQLContext lazy val ctx = org.apache.spark.sql.test.TestSQLContext
import ctx.implicits._ import ctx.implicits._
def sqlContext: SQLContext = ctx
test("analysis error should be eagerly reported") { test("analysis error should be eagerly reported") {
val oldSetting = ctx.conf.dataFrameEagerAnalysis val oldSetting = ctx.conf.dataFrameEagerAnalysis
// Eager analysis. // Eager analysis.
...@@ -761,4 +765,49 @@ class DataFrameSuite extends QueryTest { ...@@ -761,4 +765,49 @@ class DataFrameSuite extends QueryTest {
assert(f.getMessage.contains("column3")) assert(f.getMessage.contains("column3"))
assert(!f.getMessage.contains("column2")) assert(!f.getMessage.contains("column2"))
} }
test("SPARK-6941: Better error message for inserting into RDD-based Table") {
withTempDir { dir =>
val tempParquetFile = new File(dir, "tmp_parquet")
val tempJsonFile = new File(dir, "tmp_json")
val df = Seq(Tuple1(1)).toDF()
val insertion = Seq(Tuple1(2)).toDF("col")
// pass case: parquet table (HadoopFsRelation)
df.write.mode(SaveMode.Overwrite).parquet(tempParquetFile.getCanonicalPath)
val pdf = ctx.read.parquet(tempParquetFile.getCanonicalPath)
pdf.registerTempTable("parquet_base")
insertion.write.insertInto("parquet_base")
// pass case: json table (InsertableRelation)
df.write.mode(SaveMode.Overwrite).json(tempJsonFile.getCanonicalPath)
val jdf = ctx.read.json(tempJsonFile.getCanonicalPath)
jdf.registerTempTable("json_base")
insertion.write.mode(SaveMode.Overwrite).insertInto("json_base")
// error cases: insert into an RDD
df.registerTempTable("rdd_base")
val e1 = intercept[AnalysisException] {
insertion.write.insertInto("rdd_base")
}
assert(e1.getMessage.contains("Inserting into an RDD-based table is not allowed."))
// error case: insert into a logical plan that is not a LeafNode
val indirectDS = pdf.select("_1").filter($"_1" > 5)
indirectDS.registerTempTable("indirect_ds")
val e2 = intercept[AnalysisException] {
insertion.write.insertInto("indirect_ds")
}
assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed."))
// error case: insert into an OneRowRelation
new DataFrame(ctx, OneRowRelation).registerTempTable("one_row")
val e3 = intercept[AnalysisException] {
insertion.write.insertInto("one_row")
}
assert(e3.getMessage.contains("Inserting into an RDD-based table is not allowed."))
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment