diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala index d65b3cb0e312172e5dd59ac6af94c16be020a100..7f26a7e411f73e4ce5fab02a8150f93f31483dba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala @@ -39,7 +39,8 @@ import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType} * supported by this builder (yet). */ class SQLBuilder(logicalPlan: LogicalPlan) extends Logging { - require(logicalPlan.resolved, "SQLBuilder only supports resolved logical query plans") + require(logicalPlan.resolved, + "SQLBuilder only supports resolved logical query plans. Current plan:\n" + logicalPlan) def this(df: Dataset[_]) = this(df.queryExecution.analyzed) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 082f944f99bb04a7f1e3a10cd23442e4c2c37e9d..7542f9d6c311cb5339b06740741e8cf3f09bc907 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -63,9 +63,12 @@ case class CreateViewCommand( } override def run(sqlContext: SQLContext): Seq[Row] = { - val analzyedPlan = sqlContext.executePlan(child).analyzed + // If the plan cannot be analyzed, throw an exception and don't proceed. + val qe = sqlContext.executePlan(child) + qe.assertAnalyzed() + val analyzedPlan = qe.analyzed - require(tableDesc.schema == Nil || tableDesc.schema.length == analzyedPlan.output.length) + require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length) val sessionState = sqlContext.sessionState if (sessionState.catalog.tableExists(tableIdentifier)) { @@ -74,7 +77,7 @@ case class CreateViewCommand( // already exists. } else if (replace) { // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` - sessionState.catalog.alterTable(prepareTable(sqlContext, analzyedPlan)) + sessionState.catalog.alterTable(prepareTable(sqlContext, analyzedPlan)) } else { // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already // exists. @@ -85,68 +88,74 @@ case class CreateViewCommand( } else { // Create the view if it doesn't exist. sessionState.catalog.createTable( - prepareTable(sqlContext, analzyedPlan), ignoreIfExists = false) + prepareTable(sqlContext, analyzedPlan), ignoreIfExists = false) } Seq.empty[Row] } - private def prepareTable(sqlContext: SQLContext, analzyedPlan: LogicalPlan): CatalogTable = { - val expandedText = if (sqlContext.conf.canonicalView) { - try rebuildViewQueryString(sqlContext, analzyedPlan) catch { - case NonFatal(e) => wrapViewTextWithSelect(analzyedPlan) + /** + * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize + * SQL based on the analyzed plan, and also creates the proper schema for the view. + */ + private def prepareTable(sqlContext: SQLContext, analyzedPlan: LogicalPlan): CatalogTable = { + val viewSQL: String = + if (sqlContext.conf.canonicalView) { + val logicalPlan = + if (tableDesc.schema.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(tableDesc.schema).map { + case (attr, col) => Alias(attr, col.name)() + } + sqlContext.executePlan(Project(projectList, analyzedPlan)).analyzed + } + new SQLBuilder(logicalPlan).toSQL + } else { + // When user specified column names for view, we should create a project to do the renaming. + // When no column name specified, we still need to create a project to declare the columns + // we need, to make us more robust to top level `*`s. + val viewOutput = { + val columnNames = analyzedPlan.output.map(f => quote(f.name)) + if (tableDesc.schema.isEmpty) { + columnNames.mkString(", ") + } else { + columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map { + case (name, alias) => s"$name AS $alias" + }.mkString(", ") + } + } + + val viewText = tableDesc.viewText.get + val viewName = quote(tableDesc.identifier.table) + s"SELECT $viewOutput FROM ($viewText) $viewName" } - } else { - wrapViewTextWithSelect(analzyedPlan) + + // Validate the view SQL - make sure we can parse it and analyze it. + // If we cannot analyze the generated query, there is probably a bug in SQL generation. + try { + sqlContext.sql(viewSQL).queryExecution.assertAnalyzed() + } catch { + case NonFatal(e) => + throw new RuntimeException( + "Failed to analyze the canonicalized SQL. It is possible there is a bug in Spark.", e) } - val viewSchema = { + val viewSchema: Seq[CatalogColumn] = { if (tableDesc.schema.isEmpty) { - analzyedPlan.output.map { a => + analyzedPlan.output.map { a => CatalogColumn(a.name, a.dataType.simpleString) } } else { - analzyedPlan.output.zip(tableDesc.schema).map { case (a, col) => + analyzedPlan.output.zip(tableDesc.schema).map { case (a, col) => CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment) } } } - tableDesc.copy(schema = viewSchema, viewText = Some(expandedText)) - } - - private def wrapViewTextWithSelect(analzyedPlan: LogicalPlan): String = { - // When user specified column names for view, we should create a project to do the renaming. - // When no column name specified, we still need to create a project to declare the columns - // we need, to make us more robust to top level `*`s. - val viewOutput = { - val columnNames = analzyedPlan.output.map(f => quote(f.name)) - if (tableDesc.schema.isEmpty) { - columnNames.mkString(", ") - } else { - columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map { - case (name, alias) => s"$name AS $alias" - }.mkString(", ") - } - } - - val viewText = tableDesc.viewText.get - val viewName = quote(tableDesc.identifier.table) - s"SELECT $viewOutput FROM ($viewText) $viewName" - } - - private def rebuildViewQueryString(sqlContext: SQLContext, analzyedPlan: LogicalPlan): String = { - val logicalPlan = if (tableDesc.schema.isEmpty) { - analzyedPlan - } else { - val projectList = analzyedPlan.output.zip(tableDesc.schema).map { - case (attr, col) => Alias(attr, col.name)() - } - sqlContext.executePlan(Project(projectList, analzyedPlan)).analyzed - } - new SQLBuilder(logicalPlan).toSQL + tableDesc.copy(schema = viewSchema, viewText = Some(viewSQL)) } - // escape backtick with double-backtick in column name and wrap it with backtick. + /** Escape backtick with double-backtick in column name and wrap it with backtick. */ private def quote(name: String) = s"`${name.replaceAll("`", "``")}`" } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index cdd5cb31d9000240983bdf7fc6a998915c50fcea..0d88b3b87f5019a315c23072b7193afed0b1cee4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -28,26 +28,50 @@ import org.apache.spark.sql.test.SQLTestUtils class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import hiveContext.implicits._ + override def beforeAll(): Unit = { + // Create a simple table with two columns: id and id1 + sqlContext.range(1, 10).selectExpr("id", "id id1").write.format("json").saveAsTable("jt") + } + + override def afterAll(): Unit = { + sqlContext.sql(s"DROP TABLE IF EXISTS jt") + } + + test("nested views") { + withView("jtv1", "jtv2") { + sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3").collect() + sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6").collect() + checkAnswer(sql("select count(*) FROM jtv2"), Row(2)) + } + } + + test("error handling: fail if the view sql itself is invalid") { + // A table that does not exist + intercept[AnalysisException] { + sql("CREATE OR REPLACE VIEW myabcdview AS SELECT * FROM table_not_exist1345").collect() + } + + // A column that does not exist + intercept[AnalysisException] { + sql("CREATE OR REPLACE VIEW myabcdview AS SELECT random1234 FROM jt").collect() + } + } + test("correctly parse CREATE VIEW statement") { withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { - withTable("jt") { - val df = (1 until 10).map(i => i -> i).toDF("i", "j") - df.write.format("json").saveAsTable("jt") - sql( - """CREATE VIEW IF NOT EXISTS - |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') - |TBLPROPERTIES ('a' = 'b') - |AS SELECT * FROM jt""".stripMargin) - checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) - sql("DROP VIEW testView") - } + sql( + """CREATE VIEW IF NOT EXISTS + |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') + |TBLPROPERTIES ('a' = 'b') + |AS SELECT * FROM jt""".stripMargin) + checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) + sql("DROP VIEW testView") } } test("correctly handle CREATE VIEW IF NOT EXISTS") { withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { - withTable("jt", "jt2") { - sqlContext.range(1, 10).write.format("json").saveAsTable("jt") + withTable("jt2") { sql("CREATE VIEW testView AS SELECT id FROM jt") val df = (1 until 10).map(i => i -> i).toDF("i", "j") @@ -66,8 +90,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test(s"$prefix correctly handle CREATE OR REPLACE VIEW") { withSQLConf( SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { - withTable("jt", "jt2") { - sqlContext.range(1, 10).write.format("json").saveAsTable("jt") + withTable("jt2") { sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt") checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) @@ -90,9 +113,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test(s"$prefix correctly handle ALTER VIEW") { withSQLConf( SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { - withTable("jt", "jt2") { + withTable("jt2") { withView("testView") { - sqlContext.range(1, 10).write.format("json").saveAsTable("jt") sql("CREATE VIEW testView AS SELECT id FROM jt") val df = (1 until 10).map(i => i -> i).toDF("i", "j") @@ -109,12 +131,9 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { // json table is not hive-compatible, make sure the new flag fix it. withSQLConf( SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { - withTable("jt") { - withView("testView") { - sqlContext.range(1, 10).write.format("json").saveAsTable("jt") - sql("CREATE VIEW testView AS SELECT id FROM jt") - checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) - } + withView("testView") { + sql("CREATE VIEW testView AS SELECT id FROM jt") + checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) } } }