Skip to content
Snippets Groups Projects
Commit e3c1366b authored by Reynold Xin's avatar Reynold Xin Committed by Yin Huai
Browse files

[SPARK-14865][SQL] Better error handling for view creation.

## What changes were proposed in this pull request?
This patch improves error handling in view creation. CreateViewCommand itself will analyze the view SQL query first, and if it cannot successfully analyze it, throw an AnalysisException.

In addition, I also added the following two conservative guards for easier identification of Spark bugs:

1. If there is a bug and the generated view SQL cannot be analyzed, throw an exception at runtime. Note that this is not an AnalysisException because it is not caused by the user and more likely indicate a bug in Spark.
2. SQLBuilder when it gets an unresolved plan, it will also show the plan in the error message.

I also took the chance to simplify the internal implementation of CreateViewCommand, and *removed* a fallback path that would've masked an exception from before.

## How was this patch tested?
1. Added a unit test for the user facing error handling.
2. Manually introduced some bugs in Spark to test the internal defensive error handling.
3. Also added a test case to test nested views (not super relevant).

Author: Reynold Xin <rxin@databricks.com>

Closes #12633 from rxin/SPARK-14865.
parent 890abd12
No related branches found
No related tags found
No related merge requests found
...@@ -39,7 +39,8 @@ import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType} ...@@ -39,7 +39,8 @@ import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
* supported by this builder (yet). * supported by this builder (yet).
*/ */
class SQLBuilder(logicalPlan: LogicalPlan) extends Logging { class SQLBuilder(logicalPlan: LogicalPlan) extends Logging {
require(logicalPlan.resolved, "SQLBuilder only supports resolved logical query plans") require(logicalPlan.resolved,
"SQLBuilder only supports resolved logical query plans. Current plan:\n" + logicalPlan)
def this(df: Dataset[_]) = this(df.queryExecution.analyzed) def this(df: Dataset[_]) = this(df.queryExecution.analyzed)
......
...@@ -63,9 +63,12 @@ case class CreateViewCommand( ...@@ -63,9 +63,12 @@ case class CreateViewCommand(
} }
override def run(sqlContext: SQLContext): Seq[Row] = { override def run(sqlContext: SQLContext): Seq[Row] = {
val analzyedPlan = sqlContext.executePlan(child).analyzed // If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = sqlContext.executePlan(child)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
require(tableDesc.schema == Nil || tableDesc.schema.length == analzyedPlan.output.length) require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length)
val sessionState = sqlContext.sessionState val sessionState = sqlContext.sessionState
if (sessionState.catalog.tableExists(tableIdentifier)) { if (sessionState.catalog.tableExists(tableIdentifier)) {
...@@ -74,7 +77,7 @@ case class CreateViewCommand( ...@@ -74,7 +77,7 @@ case class CreateViewCommand(
// already exists. // already exists.
} else if (replace) { } else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
sessionState.catalog.alterTable(prepareTable(sqlContext, analzyedPlan)) sessionState.catalog.alterTable(prepareTable(sqlContext, analyzedPlan))
} else { } else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists. // exists.
...@@ -85,68 +88,74 @@ case class CreateViewCommand( ...@@ -85,68 +88,74 @@ case class CreateViewCommand(
} else { } else {
// Create the view if it doesn't exist. // Create the view if it doesn't exist.
sessionState.catalog.createTable( sessionState.catalog.createTable(
prepareTable(sqlContext, analzyedPlan), ignoreIfExists = false) prepareTable(sqlContext, analyzedPlan), ignoreIfExists = false)
} }
Seq.empty[Row] Seq.empty[Row]
} }
private def prepareTable(sqlContext: SQLContext, analzyedPlan: LogicalPlan): CatalogTable = { /**
val expandedText = if (sqlContext.conf.canonicalView) { * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
try rebuildViewQueryString(sqlContext, analzyedPlan) catch { * SQL based on the analyzed plan, and also creates the proper schema for the view.
case NonFatal(e) => wrapViewTextWithSelect(analzyedPlan) */
private def prepareTable(sqlContext: SQLContext, analyzedPlan: LogicalPlan): CatalogTable = {
val viewSQL: String =
if (sqlContext.conf.canonicalView) {
val logicalPlan =
if (tableDesc.schema.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
case (attr, col) => Alias(attr, col.name)()
}
sqlContext.executePlan(Project(projectList, analyzedPlan)).analyzed
}
new SQLBuilder(logicalPlan).toSQL
} else {
// When user specified column names for view, we should create a project to do the renaming.
// When no column name specified, we still need to create a project to declare the columns
// we need, to make us more robust to top level `*`s.
val viewOutput = {
val columnNames = analyzedPlan.output.map(f => quote(f.name))
if (tableDesc.schema.isEmpty) {
columnNames.mkString(", ")
} else {
columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
case (name, alias) => s"$name AS $alias"
}.mkString(", ")
}
}
val viewText = tableDesc.viewText.get
val viewName = quote(tableDesc.identifier.table)
s"SELECT $viewOutput FROM ($viewText) $viewName"
} }
} else {
wrapViewTextWithSelect(analzyedPlan) // Validate the view SQL - make sure we can parse it and analyze it.
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
try {
sqlContext.sql(viewSQL).queryExecution.assertAnalyzed()
} catch {
case NonFatal(e) =>
throw new RuntimeException(
"Failed to analyze the canonicalized SQL. It is possible there is a bug in Spark.", e)
} }
val viewSchema = { val viewSchema: Seq[CatalogColumn] = {
if (tableDesc.schema.isEmpty) { if (tableDesc.schema.isEmpty) {
analzyedPlan.output.map { a => analyzedPlan.output.map { a =>
CatalogColumn(a.name, a.dataType.simpleString) CatalogColumn(a.name, a.dataType.simpleString)
} }
} else { } else {
analzyedPlan.output.zip(tableDesc.schema).map { case (a, col) => analyzedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment) CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment)
} }
} }
} }
tableDesc.copy(schema = viewSchema, viewText = Some(expandedText)) tableDesc.copy(schema = viewSchema, viewText = Some(viewSQL))
}
private def wrapViewTextWithSelect(analzyedPlan: LogicalPlan): String = {
// When user specified column names for view, we should create a project to do the renaming.
// When no column name specified, we still need to create a project to declare the columns
// we need, to make us more robust to top level `*`s.
val viewOutput = {
val columnNames = analzyedPlan.output.map(f => quote(f.name))
if (tableDesc.schema.isEmpty) {
columnNames.mkString(", ")
} else {
columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
case (name, alias) => s"$name AS $alias"
}.mkString(", ")
}
}
val viewText = tableDesc.viewText.get
val viewName = quote(tableDesc.identifier.table)
s"SELECT $viewOutput FROM ($viewText) $viewName"
}
private def rebuildViewQueryString(sqlContext: SQLContext, analzyedPlan: LogicalPlan): String = {
val logicalPlan = if (tableDesc.schema.isEmpty) {
analzyedPlan
} else {
val projectList = analzyedPlan.output.zip(tableDesc.schema).map {
case (attr, col) => Alias(attr, col.name)()
}
sqlContext.executePlan(Project(projectList, analzyedPlan)).analyzed
}
new SQLBuilder(logicalPlan).toSQL
} }
// escape backtick with double-backtick in column name and wrap it with backtick. /** Escape backtick with double-backtick in column name and wrap it with backtick. */
private def quote(name: String) = s"`${name.replaceAll("`", "``")}`" private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
} }
...@@ -28,26 +28,50 @@ import org.apache.spark.sql.test.SQLTestUtils ...@@ -28,26 +28,50 @@ import org.apache.spark.sql.test.SQLTestUtils
class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
import hiveContext.implicits._ import hiveContext.implicits._
override def beforeAll(): Unit = {
// Create a simple table with two columns: id and id1
sqlContext.range(1, 10).selectExpr("id", "id id1").write.format("json").saveAsTable("jt")
}
override def afterAll(): Unit = {
sqlContext.sql(s"DROP TABLE IF EXISTS jt")
}
test("nested views") {
withView("jtv1", "jtv2") {
sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3").collect()
sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6").collect()
checkAnswer(sql("select count(*) FROM jtv2"), Row(2))
}
}
test("error handling: fail if the view sql itself is invalid") {
// A table that does not exist
intercept[AnalysisException] {
sql("CREATE OR REPLACE VIEW myabcdview AS SELECT * FROM table_not_exist1345").collect()
}
// A column that does not exist
intercept[AnalysisException] {
sql("CREATE OR REPLACE VIEW myabcdview AS SELECT random1234 FROM jt").collect()
}
}
test("correctly parse CREATE VIEW statement") { test("correctly parse CREATE VIEW statement") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt") { sql(
val df = (1 until 10).map(i => i -> i).toDF("i", "j") """CREATE VIEW IF NOT EXISTS
df.write.format("json").saveAsTable("jt") |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
sql( |TBLPROPERTIES ('a' = 'b')
"""CREATE VIEW IF NOT EXISTS |AS SELECT * FROM jt""".stripMargin)
|default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
|TBLPROPERTIES ('a' = 'b') sql("DROP VIEW testView")
|AS SELECT * FROM jt""".stripMargin)
checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
sql("DROP VIEW testView")
}
} }
} }
test("correctly handle CREATE VIEW IF NOT EXISTS") { test("correctly handle CREATE VIEW IF NOT EXISTS") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt", "jt2") { withTable("jt2") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
sql("CREATE VIEW testView AS SELECT id FROM jt") sql("CREATE VIEW testView AS SELECT id FROM jt")
val df = (1 until 10).map(i => i -> i).toDF("i", "j") val df = (1 until 10).map(i => i -> i).toDF("i", "j")
...@@ -66,8 +90,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ...@@ -66,8 +90,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test(s"$prefix correctly handle CREATE OR REPLACE VIEW") { test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
withSQLConf( withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("jt", "jt2") { withTable("jt2") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt") sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
...@@ -90,9 +113,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ...@@ -90,9 +113,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test(s"$prefix correctly handle ALTER VIEW") { test(s"$prefix correctly handle ALTER VIEW") {
withSQLConf( withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("jt", "jt2") { withTable("jt2") {
withView("testView") { withView("testView") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
sql("CREATE VIEW testView AS SELECT id FROM jt") sql("CREATE VIEW testView AS SELECT id FROM jt")
val df = (1 until 10).map(i => i -> i).toDF("i", "j") val df = (1 until 10).map(i => i -> i).toDF("i", "j")
...@@ -109,12 +131,9 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ...@@ -109,12 +131,9 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
// json table is not hive-compatible, make sure the new flag fix it. // json table is not hive-compatible, make sure the new flag fix it.
withSQLConf( withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("jt") { withView("testView") {
withView("testView") { sql("CREATE VIEW testView AS SELECT id FROM jt")
sqlContext.range(1, 10).write.format("json").saveAsTable("jt") checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
sql("CREATE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
}
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment