diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 3748e07f88aad3a35ca19c80e4e25fe4fae030f6..8b1a7303fc5b2f6df0cc28fa5f76bc7837baacce 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -200,6 +200,11 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log_="), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=") + ) ++ Seq( + // SPARK-12689 Migrate DDL parsing to the newly absorbed parser + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.execution.datasources.DDLParser"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.execution.datasources.DDLException"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.ddlParser") ) ++ Seq( // SPARK-7799 Add "streaming-akka" project ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"), diff --git a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g index 0555a6ba83cbbd0febdef097f4b0b1422f035998..c162c1a0c57893e5742ba5af320174b960d3c846 100644 --- a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g +++ b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g @@ -493,6 +493,16 @@ descFuncNames | functionIdentifier ; +//We are allowed to use From and To in CreateTableUsing command's options (actually seems we can use any string as the option key). But we can't simply add them into nonReserved because by doing that we mess other existing rules. So we create a looseIdentifier and looseNonReserved here. +looseIdentifier + : + Identifier + | looseNonReserved -> Identifier[$looseNonReserved.text] + // If it decides to support SQL11 reserved keywords, i.e., useSQL11ReservedKeywordsForIdentifier()=false, + // the sql11keywords in existing q tests will NOT be added back. + | {useSQL11ReservedKeywordsForIdentifier()}? sql11ReservedKeywordsUsedAsIdentifier -> Identifier[$sql11ReservedKeywordsUsedAsIdentifier.text] + ; + identifier : Identifier @@ -516,6 +526,10 @@ principalIdentifier | QuotedIdentifier ; +looseNonReserved + : nonReserved | KW_FROM | KW_TO + ; + //The new version of nonReserved + sql11ReservedKeywordsUsedAsIdentifier = old version of nonReserved //Non reserved keywords are basically the keywords that can be used as identifiers. //All the KW_* are automatically not only keywords, but also reserved keywords. diff --git a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlLexer.g b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlLexer.g index 4374cd7ef7200bc740ddc546cf96eae0aaa3685e..e930caa291d4f0c05cc5057591b46a511202c081 100644 --- a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlLexer.g +++ b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlLexer.g @@ -324,6 +324,8 @@ KW_ISOLATION: 'ISOLATION'; KW_LEVEL: 'LEVEL'; KW_SNAPSHOT: 'SNAPSHOT'; KW_AUTOCOMMIT: 'AUTOCOMMIT'; +KW_REFRESH: 'REFRESH'; +KW_OPTIONS: 'OPTIONS'; KW_WEEK: 'WEEK'|'WEEKS'; KW_MILLISECOND: 'MILLISECOND'|'MILLISECONDS'; KW_MICROSECOND: 'MICROSECOND'|'MICROSECONDS'; @@ -470,7 +472,7 @@ Identifier fragment QuotedIdentifier : - '`' ( '``' | ~('`') )* '`' { setText(getText().substring(1, getText().length() -1 ).replaceAll("``", "`")); } + '`' ( '``' | ~('`') )* '`' { setText(getText().replaceAll("``", "`")); } ; WS : (' '|'\r'|'\t'|'\n') {$channel=HIDDEN;} diff --git a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g index 35bef00351d72119ea36cc342d26f7cf06dc8e2d..6591f6b0f56ceaf488ace59be47c776fd1477b32 100644 --- a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g +++ b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g @@ -142,6 +142,7 @@ TOK_UNIONTYPE; TOK_COLTYPELIST; TOK_CREATEDATABASE; TOK_CREATETABLE; +TOK_CREATETABLEUSING; TOK_TRUNCATETABLE; TOK_CREATEINDEX; TOK_CREATEINDEX_INDEXTBLNAME; @@ -371,6 +372,10 @@ TOK_TXN_READ_WRITE; TOK_COMMIT; TOK_ROLLBACK; TOK_SET_AUTOCOMMIT; +TOK_REFRESHTABLE; +TOK_TABLEPROVIDER; +TOK_TABLEOPTIONS; +TOK_TABLEOPTION; TOK_CACHETABLE; TOK_UNCACHETABLE; TOK_CLEARCACHE; @@ -660,6 +665,12 @@ import java.util.HashMap; } private char [] excludedCharForColumnName = {'.', ':'}; private boolean containExcludedCharForCreateTableColumnName(String input) { + if (input.length() > 0) { + if (input.charAt(0) == '`' && input.charAt(input.length() - 1) == '`') { + // When column name is backquoted, we don't care about excluded chars. + return false; + } + } for(char c : excludedCharForColumnName) { if(input.indexOf(c)>-1) { return true; @@ -781,6 +792,7 @@ ddlStatement | truncateTableStatement | alterStatement | descStatement + | refreshStatement | showStatement | metastoreCheck | createViewStatement @@ -907,12 +919,31 @@ createTableStatement @init { pushMsg("create table statement", state); } @after { popMsg(state); } : KW_CREATE (temp=KW_TEMPORARY)? (ext=KW_EXTERNAL)? KW_TABLE ifNotExists? name=tableName - ( like=KW_LIKE likeName=tableName + ( + like=KW_LIKE likeName=tableName tableRowFormat? tableFileFormat? tableLocation? tablePropertiesPrefixed? + -> ^(TOK_CREATETABLE $name $temp? $ext? ifNotExists? + ^(TOK_LIKETABLE $likeName?) + tableRowFormat? + tableFileFormat? + tableLocation? + tablePropertiesPrefixed? + ) + | + tableProvider + tableOpts? + (KW_AS selectStatementWithCTE)? + -> ^(TOK_CREATETABLEUSING $name $temp? ifNotExists? + tableProvider + tableOpts? + selectStatementWithCTE? + ) | (LPAREN columnNameTypeList RPAREN)? + (p=tableProvider?) + tableOpts? tableComment? tablePartition? tableBuckets? @@ -922,8 +953,15 @@ createTableStatement tableLocation? tablePropertiesPrefixed? (KW_AS selectStatementWithCTE)? - ) - -> ^(TOK_CREATETABLE $name $temp? $ext? ifNotExists? + -> {p != null}? + ^(TOK_CREATETABLEUSING $name $temp? ifNotExists? + columnNameTypeList? + $p + tableOpts? + selectStatementWithCTE? + ) + -> + ^(TOK_CREATETABLE $name $temp? $ext? ifNotExists? ^(TOK_LIKETABLE $likeName?) columnNameTypeList? tableComment? @@ -935,7 +973,8 @@ createTableStatement tableLocation? tablePropertiesPrefixed? selectStatementWithCTE? - ) + ) + ) ; truncateTableStatement @@ -1379,6 +1418,13 @@ tabPartColTypeExpr : tableName partitionSpec? extColumnName? -> ^(TOK_TABTYPE tableName partitionSpec? extColumnName?) ; +refreshStatement +@init { pushMsg("refresh statement", state); } +@after { popMsg(state); } + : + KW_REFRESH KW_TABLE tableName -> ^(TOK_REFRESHTABLE tableName) + ; + descStatement @init { pushMsg("describe statement", state); } @after { popMsg(state); } @@ -1774,6 +1820,30 @@ showStmtIdentifier | StringLiteral ; +tableProvider +@init { pushMsg("table's provider", state); } +@after { popMsg(state); } + : + KW_USING Identifier (DOT Identifier)* + -> ^(TOK_TABLEPROVIDER Identifier+) + ; + +optionKeyValue +@init { pushMsg("table's option specification", state); } +@after { popMsg(state); } + : + (looseIdentifier (DOT looseIdentifier)*) StringLiteral + -> ^(TOK_TABLEOPTION looseIdentifier+ StringLiteral) + ; + +tableOpts +@init { pushMsg("table's options", state); } +@after { popMsg(state); } + : + KW_OPTIONS LPAREN optionKeyValue (COMMA optionKeyValue)* RPAREN + -> ^(TOK_TABLEOPTIONS optionKeyValue+) + ; + tableComment @init { pushMsg("table's comment", state); } @after { popMsg(state); } @@ -2132,7 +2202,7 @@ structType mapType @init { pushMsg("map type", state); } @after { popMsg(state); } - : KW_MAP LESSTHAN left=primitiveType COMMA right=type GREATERTHAN + : KW_MAP LESSTHAN left=type COMMA right=type GREATERTHAN -> ^(TOK_MAP $left $right) ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala index 536c292ab7f34581748e23efac91042bfbe9e89e..7ce2407913ade4a47b2f631be6a0a42538983330 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala @@ -140,6 +140,7 @@ private[sql] class CatalystQl(val conf: ParserConf = SimpleParserConf()) extends case Token("TOK_BOOLEAN", Nil) => BooleanType case Token("TOK_STRING", Nil) => StringType case Token("TOK_VARCHAR", Token(_, Nil) :: Nil) => StringType + case Token("TOK_CHAR", Token(_, Nil) :: Nil) => StringType case Token("TOK_FLOAT", Nil) => FloatType case Token("TOK_DOUBLE", Nil) => DoubleType case Token("TOK_DATE", Nil) => DateType @@ -156,9 +157,10 @@ private[sql] class CatalystQl(val conf: ParserConf = SimpleParserConf()) extends protected def nodeToStructField(node: ASTNode): StructField = node match { case Token("TOK_TABCOL", Token(fieldName, Nil) :: dataType :: Nil) => - StructField(fieldName, nodeToDataType(dataType), nullable = true) - case Token("TOK_TABCOL", Token(fieldName, Nil) :: dataType :: _ /* comment */:: Nil) => - StructField(fieldName, nodeToDataType(dataType), nullable = true) + StructField(cleanIdentifier(fieldName), nodeToDataType(dataType), nullable = true) + case Token("TOK_TABCOL", Token(fieldName, Nil) :: dataType :: comment :: Nil) => + val meta = new MetadataBuilder().putString("comment", unquoteString(comment.text)).build() + StructField(cleanIdentifier(fieldName), nodeToDataType(dataType), nullable = true, meta) case _ => noParseRule("StructField", node) } @@ -222,15 +224,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case Nil => ShowFunctions(None, None) case Token(name, Nil) :: Nil => - ShowFunctions(None, Some(unquoteString(name))) + ShowFunctions(None, Some(unquoteString(cleanIdentifier(name)))) case Token(db, Nil) :: Token(name, Nil) :: Nil => - ShowFunctions(Some(unquoteString(db)), Some(unquoteString(name))) + ShowFunctions(Some(unquoteString(cleanIdentifier(db))), + Some(unquoteString(cleanIdentifier(name)))) case _ => noParseRule("SHOW FUNCTIONS", node) } case Token("TOK_DESCFUNCTION", Token(functionName, Nil) :: isExtended) => - DescribeFunction(functionName, isExtended.nonEmpty) + DescribeFunction(cleanIdentifier(functionName), isExtended.nonEmpty) case Token("TOK_QUERY", queryArgs @ Token("TOK_CTE" | "TOK_FROM" | "TOK_INSERT", _) :: _) => val (fromClause: Option[ASTNode], insertClauses, cteRelations) = @@ -611,7 +614,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C noParseRule("Select", node) } - protected val escapedIdentifier = "`([^`]+)`".r + protected val escapedIdentifier = "`(.+)`".r protected val doubleQuotedString = "\"([^\"]+)\"".r protected val singleQuotedString = "'([^']+)'".r @@ -655,7 +658,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C nodeToExpr(qualifier) match { case UnresolvedAttribute(nameParts) => UnresolvedAttribute(nameParts :+ cleanIdentifier(attr)) - case other => UnresolvedExtractValue(other, Literal(attr)) + case other => UnresolvedExtractValue(other, Literal(cleanIdentifier(attr))) } /* Stars (*) */ @@ -663,7 +666,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C // The format of dbName.tableName.* cannot be parsed by HiveParser. TOK_TABNAME will only // has a single child which is tableName. case Token("TOK_ALLCOLREF", Token("TOK_TABNAME", target) :: Nil) if target.nonEmpty => - UnresolvedStar(Some(target.map(_.text))) + UnresolvedStar(Some(target.map(x => cleanIdentifier(x.text)))) /* Aggregate Functions */ case Token("TOK_FUNCTIONDI", Token(COUNT(), Nil) :: args) => @@ -971,7 +974,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C protected def nodeToGenerate(node: ASTNode, outer: Boolean, child: LogicalPlan): Generate = { val Token("TOK_SELECT", Token("TOK_SELEXPR", clauses) :: Nil) = node - val alias = getClause("TOK_TABALIAS", clauses).children.head.text + val alias = cleanIdentifier(getClause("TOK_TABALIAS", clauses).children.head.text) val generator = clauses.head match { case Token("TOK_FUNCTION", Token(explode(), Nil) :: childNode :: Nil) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index be28df3a51557a42495a12b57c91cc74ba468654..ef993c3edae37d2d6e7aa4bce6ea3dd48684ba4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -206,10 +206,7 @@ class SQLContext private[sql]( @transient protected[sql] val sqlParser: ParserInterface = new SparkQl(conf) - @transient - protected[sql] val ddlParser: DDLParser = new DDLParser(sqlParser) - - protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false) + protected[sql] def parseSql(sql: String): LogicalPlan = sqlParser.parsePlan(sql) protected[sql] def executeSql(sql: String): org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala index a5bd8ee42dec979e647f5ae92d0e515e2d2c2a1f..4174e27e9c8b77758fcbc198a6f29c7ec11fb488 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala @@ -16,11 +16,14 @@ */ package org.apache.spark.sql.execution +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.parser.{ASTNode, ParserConf, SimpleParserConf} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types.StructType private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends CatalystQl(conf) { /** Check if a command should not be explained. */ @@ -55,6 +58,86 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs) ExplainCommand(nodeToPlan(query), extended = extended.isDefined) + case Token("TOK_REFRESHTABLE", nameParts :: Nil) => + val tableIdent = extractTableIdent(nameParts) + RefreshTable(tableIdent) + + case Token("TOK_CREATETABLEUSING", createTableArgs) => + val Seq( + temp, + allowExisting, + Some(tabName), + tableCols, + Some(Token("TOK_TABLEPROVIDER", providerNameParts)), + tableOpts, + tableAs) = getClauses(Seq( + "TEMPORARY", + "TOK_IFNOTEXISTS", + "TOK_TABNAME", "TOK_TABCOLLIST", + "TOK_TABLEPROVIDER", + "TOK_TABLEOPTIONS", + "TOK_QUERY"), createTableArgs) + + val tableIdent: TableIdentifier = extractTableIdent(tabName) + + val columns = tableCols.map { + case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField)) + } + + val provider = providerNameParts.map { + case Token(name, Nil) => name + }.mkString(".") + + val options: Map[String, String] = tableOpts.toSeq.flatMap { + case Token("TOK_TABLEOPTIONS", options) => + options.map { + case Token("TOK_TABLEOPTION", keysAndValue) => + val key = keysAndValue.init.map(_.text).mkString(".") + val value = unquoteString(keysAndValue.last.text) + (key, value) + } + }.toMap + + val asClause = tableAs.map(nodeToPlan(_)) + + if (temp.isDefined && allowExisting.isDefined) { + throw new AnalysisException( + "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.") + } + + if (asClause.isDefined) { + if (columns.isDefined) { + throw new AnalysisException( + "a CREATE TABLE AS SELECT statement does not allow column definitions.") + } + + val mode = if (allowExisting.isDefined) { + SaveMode.Ignore + } else if (temp.isDefined) { + SaveMode.Overwrite + } else { + SaveMode.ErrorIfExists + } + + CreateTableUsingAsSelect(tableIdent, + provider, + temp.isDefined, + Array.empty[String], + bucketSpec = None, + mode, + options, + asClause.get) + } else { + CreateTableUsing( + tableIdent, + columns, + provider, + temp.isDefined, + options, + allowExisting.isDefined, + managedIfNoPath = false) + } + case Token("TOK_SWITCHDATABASE", Token(database, Nil) :: Nil) => SetDatabaseCommand(cleanIdentifier(database)) @@ -68,26 +151,30 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly nodeToDescribeFallback(node) } else { tableType match { - case Token("TOK_TABTYPE", Token("TOK_TABNAME", nameParts :: Nil) :: Nil) => + case Token("TOK_TABTYPE", Token("TOK_TABNAME", nameParts) :: Nil) => nameParts match { - case Token(".", dbName :: tableName :: Nil) => + case Token(dbName, Nil) :: Token(tableName, Nil) :: Nil => // It is describing a table with the format like "describe db.table". // TODO: Actually, a user may mean tableName.columnName. Need to resolve this // issue. - val tableIdent = extractTableIdent(nameParts) + val tableIdent = TableIdentifier( + cleanIdentifier(tableName), Some(cleanIdentifier(dbName))) datasources.DescribeCommand( UnresolvedRelation(tableIdent, None), isExtended = extended.isDefined) - case Token(".", dbName :: tableName :: colName :: Nil) => + case Token(dbName, Nil) :: Token(tableName, Nil) :: Token(colName, Nil) :: Nil => // It is describing a column with the format like "describe db.table column". nodeToDescribeFallback(node) - case tableName => + case tableName :: Nil => // It is describing a table with the format like "describe table". datasources.DescribeCommand( - UnresolvedRelation(TableIdentifier(tableName.text), None), + UnresolvedRelation(TableIdentifier(cleanIdentifier(tableName.text)), None), isExtended = extended.isDefined) + case _ => + nodeToDescribeFallback(node) } // All other cases. - case _ => nodeToDescribeFallback(node) + case _ => + nodeToDescribeFallback(node) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala deleted file mode 100644 index f4766b037027dae4708f2081be5a2e5818d37331..0000000000000000000000000000000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala +++ /dev/null @@ -1,193 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.execution.datasources - -import scala.language.implicitConversions -import scala.util.matching.Regex - -import org.apache.spark.Logging -import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, ParserInterface, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.util.DataTypeParser -import org.apache.spark.sql.types._ - -/** - * A parser for foreign DDL commands. - */ -class DDLParser(fallback: => ParserInterface) - extends AbstractSparkSQLParser with DataTypeParser with Logging { - - override def parseExpression(sql: String): Expression = fallback.parseExpression(sql) - - override def parseTableIdentifier(sql: String): TableIdentifier = { - fallback.parseTableIdentifier(sql) - } - - def parse(input: String, exceptionOnError: Boolean): LogicalPlan = { - try { - parsePlan(input) - } catch { - case ddlException: DDLException => throw ddlException - case _ if !exceptionOnError => fallback.parsePlan(input) - case x: Throwable => throw x - } - } - - // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword` - // properties via reflection the class in runtime for constructing the SqlLexical object - protected val CREATE = Keyword("CREATE") - protected val TEMPORARY = Keyword("TEMPORARY") - protected val TABLE = Keyword("TABLE") - protected val IF = Keyword("IF") - protected val NOT = Keyword("NOT") - protected val EXISTS = Keyword("EXISTS") - protected val USING = Keyword("USING") - protected val OPTIONS = Keyword("OPTIONS") - protected val DESCRIBE = Keyword("DESCRIBE") - protected val EXTENDED = Keyword("EXTENDED") - protected val AS = Keyword("AS") - protected val COMMENT = Keyword("COMMENT") - protected val REFRESH = Keyword("REFRESH") - - protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable - - protected def start: Parser[LogicalPlan] = ddl - - /** - * `CREATE [TEMPORARY] TABLE [IF NOT EXISTS] avroTable - * USING org.apache.spark.sql.avro - * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` - * or - * `CREATE [TEMPORARY] TABLE [IF NOT EXISTS] avroTable(intField int, stringField string...) - * USING org.apache.spark.sql.avro - * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` - * or - * `CREATE [TEMPORARY] TABLE [IF NOT EXISTS] avroTable - * USING org.apache.spark.sql.avro - * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` - * AS SELECT ... - */ - protected lazy val createTable: Parser[LogicalPlan] = { - // TODO: Support database.table. - (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ tableIdentifier ~ - tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ { - case temp ~ allowExisting ~ tableIdent ~ columns ~ provider ~ opts ~ query => - if (temp.isDefined && allowExisting.isDefined) { - throw new DDLException( - "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.") - } - - val options = opts.getOrElse(Map.empty[String, String]) - if (query.isDefined) { - if (columns.isDefined) { - throw new DDLException( - "a CREATE TABLE AS SELECT statement does not allow column definitions.") - } - // When IF NOT EXISTS clause appears in the query, the save mode will be ignore. - val mode = if (allowExisting.isDefined) { - SaveMode.Ignore - } else if (temp.isDefined) { - SaveMode.Overwrite - } else { - SaveMode.ErrorIfExists - } - - val queryPlan = fallback.parsePlan(query.get) - CreateTableUsingAsSelect(tableIdent, - provider, - temp.isDefined, - Array.empty[String], - bucketSpec = None, - mode, - options, - queryPlan) - } else { - val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) - CreateTableUsing( - tableIdent, - userSpecifiedSchema, - provider, - temp.isDefined, - options, - allowExisting.isDefined, - managedIfNoPath = false) - } - } - } - - // This is the same as tableIdentifier in SqlParser. - protected lazy val tableIdentifier: Parser[TableIdentifier] = - (ident <~ ".").? ~ ident ^^ { - case maybeDbName ~ tableName => TableIdentifier(tableName, maybeDbName) - } - - protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" - - /* - * describe [extended] table avroTable - * This will display all columns of table `avroTable` includes column_name,column_type,comment - */ - protected lazy val describeTable: Parser[LogicalPlan] = - (DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ { - case e ~ tableIdent => - DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined) - } - - protected lazy val refreshTable: Parser[LogicalPlan] = - REFRESH ~> TABLE ~> tableIdentifier ^^ { - case tableIndet => - RefreshTable(tableIndet) - } - - protected lazy val options: Parser[Map[String, String]] = - "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap } - - protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")} - - override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch( - s"identifier matching regex $regex", { - case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str - case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str - } - ) - - protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ { - case name => name - } - - protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ { - case parts => parts.mkString(".") - } - - protected lazy val pair: Parser[(String, String)] = - optionName ~ stringLit ^^ { case k ~ v => (k, v) } - - protected lazy val column: Parser[StructField] = - ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm => - val meta = cm match { - case Some(comment) => - new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build() - case None => Metadata.empty - } - - StructField(columnName, typ, nullable = true, meta) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index c3603936dfd2e712afa78bc9195d3b89b497df1c..1554209be9891fe46fb81eae833cb2516a01e918 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -169,8 +169,3 @@ class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String] override def -(key: String): Map[String, String] = baseMap - key.toLowerCase } - -/** - * The exception thrown from the DDL parser. - */ -class DDLException(message: String) extends RuntimeException(message) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 6fc9febe49707e75f0cf90ea282492fc0b4a2e0b..cb88a1c83c999f4a425aabfe9c6dc7d98250519b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -22,7 +22,6 @@ import java.io.{File, IOException} import org.scalatest.BeforeAndAfter import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.execution.datasources.DDLException import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -105,7 +104,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with sql("SELECT a, b FROM jsonTable"), sql("SELECT a, b FROM jt").collect()) - val message = intercept[DDLException]{ + val message = intercept[AnalysisException]{ sql( s""" |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable @@ -156,7 +155,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with } test("CREATE TEMPORARY TABLE AS SELECT with IF NOT EXISTS is not allowed") { - val message = intercept[DDLException]{ + val message = intercept[AnalysisException]{ sql( s""" |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable @@ -173,7 +172,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with } test("a CTAS statement with column definitions is not allowed") { - intercept[DDLException]{ + intercept[AnalysisException]{ sql( s""" |CREATE TEMPORARY TABLE jsonTable (a int, b string)