Skip to content
Snippets Groups Projects
Commit 328c7116 authored by Herman van Hovell's avatar Herman van Hovell Committed by Andrew Or
Browse files

[SPARK-14086][SQL] Add DDL commands to ANTLR4 parser

#### What changes were proposed in this pull request?

This PR adds all the current Spark SQL DDL commands to the new ANTLR 4 based SQL parser.

I have found a few inconsistencies in the current commands:
- Function has an alias field. This is actually the class name of the function.
- Partition specifications should contain nulls in some commands, and contain `None`s in others.
- `AlterTableSkewedLocation`: Should defines which columns have skewed values, and should allow us to define storage for each skewed combination of values. We currently only allow one value per field.
- `AlterTableSetFileFormat`: Should only have one file format, it currently supports both.

I have implemented all these comments like they were, and I propose to improve them in follow-up PRs.

#### How was this patch tested?

The existing DDLCommandSuite.

cc rxin andrewor14 yhuai

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #12011 from hvanhovell/SPARK-14086.
parent 8c11d1aa
No related branches found
No related tags found
No related merge requests found
......@@ -20,7 +20,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ng.{AbstractSqlParser, AstBuilder}
import org.apache.spark.sql.catalyst.parser.ng.{AbstractSqlParser, AstBuilder, ParseException}
import org.apache.spark.sql.catalyst.parser.ng.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.execution.command.{DescribeCommand => _, _}
......@@ -200,8 +200,8 @@ class SparkSqlAstBuilder extends AstBuilder {
}
/**
* Convert a table property list into a key-value map.
*/
* Convert a table property list into a key-value map.
*/
override def visitTablePropertyList(
ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
ctx.tableProperty.asScala.map { property =>
......@@ -216,4 +216,618 @@ class SparkSqlAstBuilder extends AstBuilder {
key -> value
}.toMap
}
/**
* Create a [[CreateDatabase]] command.
*
* For example:
* {{{
* CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment]
* [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)]
* }}}
*/
override def visitCreateDatabase(ctx: CreateDatabaseContext): LogicalPlan = withOrigin(ctx) {
CreateDatabase(
ctx.identifier.getText,
ctx.EXISTS != null,
Option(ctx.locationSpec).map(visitLocationSpec),
Option(ctx.comment).map(string),
Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty))(
command(ctx))
}
/**
* Create an [[AlterDatabaseProperties]] command.
*
* For example:
* {{{
* ALTER (DATABASE|SCHEMA) database SET DBPROPERTIES (property_name=property_value, ...);
* }}}
*/
override def visitSetDatabaseProperties(
ctx: SetDatabasePropertiesContext): LogicalPlan = withOrigin(ctx) {
AlterDatabaseProperties(
ctx.identifier.getText,
visitTablePropertyList(ctx.tablePropertyList))(
command(ctx))
}
/**
* Create a [[DropDatabase]] command.
*
* For example:
* {{{
* DROP (DATABASE|SCHEMA) [IF EXISTS] database [RESTRICT|CASCADE];
* }}}
*/
override def visitDropDatabase(ctx: DropDatabaseContext): LogicalPlan = withOrigin(ctx) {
DropDatabase(ctx.identifier.getText, ctx.EXISTS != null, ctx.CASCADE == null)(command(ctx))
}
/**
* Create a [[DescribeDatabase]] command.
*
* For example:
* {{{
* DESCRIBE DATABASE [EXTENDED] database;
* }}}
*/
override def visitDescribeDatabase(ctx: DescribeDatabaseContext): LogicalPlan = withOrigin(ctx) {
DescribeDatabase(ctx.identifier.getText, ctx.EXTENDED != null)(command(ctx))
}
/**
* Create a [[CreateFunction]] command.
*
* For example:
* {{{
* CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
* [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
* }}}
*/
override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) {
val resources = ctx.resource.asScala.map { resource =>
val resourceType = resource.identifier.getText.toLowerCase
resourceType match {
case "jar" | "file" | "archive" =>
resourceType -> string(resource.STRING)
case other =>
throw new ParseException(s"Resource Type '$resourceType' is not supported.", ctx)
}
}
// Extract database, name & alias.
val (database, function) = visitFunctionName(ctx.qualifiedName)
CreateFunction(
database,
function,
string(ctx.className), // TODO this is not an alias.
resources,
ctx.TEMPORARY != null)(
command(ctx))
}
/**
* Create a [[DropFunction]] command.
*
* For example:
* {{{
* DROP [TEMPORARY] FUNCTION [IF EXISTS] function;
* }}}
*/
override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) {
val (database, function) = visitFunctionName(ctx.qualifiedName)
DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)(command(ctx))
}
/**
* Create a function database (optional) and name pair.
*/
private def visitFunctionName(ctx: QualifiedNameContext): (Option[String], String) = {
ctx.identifier().asScala.map(_.getText) match {
case Seq(db, fn) => (Option(db), fn)
case Seq(fn) => (None, fn)
case other => throw new ParseException(s"Unsupported function name '${ctx.getText}'", ctx)
}
}
/**
* Create a [[AlterTableRename]] command.
*
* For example:
* {{{
* ALTER TABLE table1 RENAME TO table2;
* }}}
*/
override def visitRenameTable(ctx: RenameTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableRename(
visitTableIdentifier(ctx.from),
visitTableIdentifier(ctx.to))(
command(ctx))
}
/**
* Create an [[AlterTableSetProperties]] command.
*
* For example:
* {{{
* ALTER TABLE table SET TBLPROPERTIES ('comment' = new_comment);
* }}}
*/
override def visitSetTableProperties(
ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
AlterTableSetProperties(
visitTableIdentifier(ctx.tableIdentifier),
visitTablePropertyList(ctx.tablePropertyList))(
command(ctx))
}
/**
* Create an [[AlterTableUnsetProperties]] command.
*
* For example:
* {{{
* ALTER TABLE table UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
* }}}
*/
override def visitUnsetTableProperties(
ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
AlterTableUnsetProperties(
visitTableIdentifier(ctx.tableIdentifier),
visitTablePropertyList(ctx.tablePropertyList),
ctx.EXISTS != null)(
command(ctx))
}
/**
* Create an [[AlterTableSerDeProperties]] command.
*
* For example:
* {{{
* ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props];
* ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties;
* }}}
*/
override def visitSetTableSerDe(ctx: SetTableSerDeContext): LogicalPlan = withOrigin(ctx) {
AlterTableSerDeProperties(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.STRING).map(string),
Option(ctx.tablePropertyList).map(visitTablePropertyList),
// TODO a partition spec is allowed to have optional values. This is currently violated.
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
command(ctx))
}
/**
* Create an [[AlterTableStorageProperties]] command.
*
* For example:
* {{{
* ALTER TABLE table CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS;
* }}}
*/
override def visitBucketTable(ctx: BucketTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableStorageProperties(
visitTableIdentifier(ctx.tableIdentifier),
visitBucketSpec(ctx.bucketSpec))(
command(ctx))
}
/**
* Create an [[AlterTableNotClustered]] command.
*
* For example:
* {{{
* ALTER TABLE table NOT CLUSTERED;
* }}}
*/
override def visitUnclusterTable(ctx: UnclusterTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableNotClustered(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
}
/**
* Create an [[AlterTableNotSorted]] command.
*
* For example:
* {{{
* ALTER TABLE table NOT SORTED;
* }}}
*/
override def visitUnsortTable(ctx: UnsortTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableNotSorted(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
}
/**
* Create an [[AlterTableSkewed]] command.
*
* For example:
* {{{
* ALTER TABLE table SKEWED BY (col1, col2)
* ON ((col1_value, col2_value) [, (col1_value, col2_value), ...])
* [STORED AS DIRECTORIES];
* }}}
*/
override def visitSkewTable(ctx: SkewTableContext): LogicalPlan = withOrigin(ctx) {
val table = visitTableIdentifier(ctx.tableIdentifier)
val (cols, values, storedAsDirs) = visitSkewSpec(ctx.skewSpec)
AlterTableSkewed(table, cols, values, storedAsDirs)(command(ctx))
}
/**
* Create an [[AlterTableNotSorted]] command.
*
* For example:
* {{{
* ALTER TABLE table NOT SKEWED;
* }}}
*/
override def visitUnskewTable(ctx: UnskewTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableNotSkewed(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
}
/**
* Create an [[AlterTableNotStoredAsDirs]] command.
*
* For example:
* {{{
* ALTER TABLE table NOT STORED AS DIRECTORIES
* }}}
*/
override def visitUnstoreTable(ctx: UnstoreTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableNotStoredAsDirs(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
}
/**
* Create an [[AlterTableSkewedLocation]] command.
*
* For example:
* {{{
* ALTER TABLE table SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] );
* }}}
*/
override def visitSetTableSkewLocations(
ctx: SetTableSkewLocationsContext): LogicalPlan = withOrigin(ctx) {
val skewedMap = ctx.skewedLocationList.skewedLocation.asScala.flatMap {
slCtx =>
val location = string(slCtx.STRING)
if (slCtx.constant != null) {
Seq(visitStringConstant(slCtx.constant) -> location)
} else {
// TODO this is similar to what was in the original implementation. However this does not
// make to much sense to me since we should be storing a tuple of values (not column
// names) for which we want a dedicated storage location.
visitConstantList(slCtx.constantList).map(_ -> location)
}
}.toMap
AlterTableSkewedLocation(
visitTableIdentifier(ctx.tableIdentifier),
skewedMap)(
command(ctx))
}
/**
* Create an [[AlterTableAddPartition]] command.
*
* For example:
* {{{
* ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
* }}}
*/
override def visitAddTablePartition(
ctx: AddTablePartitionContext): LogicalPlan = withOrigin(ctx) {
// Create partition spec to location mapping.
val specsAndLocs = ctx.partitionSpecLocation.asScala.map {
splCtx =>
val spec = visitNonOptionalPartitionSpec(splCtx.partitionSpec)
val location = Option(splCtx.locationSpec).map(visitLocationSpec)
spec -> location
}
AlterTableAddPartition(
visitTableIdentifier(ctx.tableIdentifier),
specsAndLocs,
ctx.EXISTS != null)(
command(ctx))
}
/**
* Create an [[AlterTableExchangePartition]] command.
*
* For example:
* {{{
* ALTER TABLE table1 EXCHANGE PARTITION spec WITH TABLE table2;
* }}}
*/
override def visitExchangeTablePartition(
ctx: ExchangeTablePartitionContext): LogicalPlan = withOrigin(ctx) {
AlterTableExchangePartition(
visitTableIdentifier(ctx.from),
visitTableIdentifier(ctx.to),
visitNonOptionalPartitionSpec(ctx.partitionSpec))(
command(ctx))
}
/**
* Create an [[AlterTableRenamePartition]] command
*
* For example:
* {{{
* ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2;
* }}}
*/
override def visitRenameTablePartition(
ctx: RenameTablePartitionContext): LogicalPlan = withOrigin(ctx) {
AlterTableRenamePartition(
visitTableIdentifier(ctx.tableIdentifier),
visitNonOptionalPartitionSpec(ctx.from),
visitNonOptionalPartitionSpec(ctx.to))(
command(ctx))
}
/**
* Create an [[AlterTableDropPartition]] command
*
* For example:
* {{{
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
* }}}
*/
override def visitDropTablePartitions(
ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) {
AlterTableDropPartition(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ctx.EXISTS != null,
ctx.PURGE != null)(
command(ctx))
}
/**
* Create an [[AlterTableArchivePartition]] command
*
* For example:
* {{{
* ALTER TABLE table ARCHIVE PARTITION spec;
* }}}
*/
override def visitArchiveTablePartition(
ctx: ArchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) {
AlterTableArchivePartition(
visitTableIdentifier(ctx.tableIdentifier),
visitNonOptionalPartitionSpec(ctx.partitionSpec))(
command(ctx))
}
/**
* Create an [[AlterTableUnarchivePartition]] command
*
* For example:
* {{{
* ALTER TABLE table UNARCHIVE PARTITION spec;
* }}}
*/
override def visitUnarchiveTablePartition(
ctx: UnarchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) {
AlterTableUnarchivePartition(
visitTableIdentifier(ctx.tableIdentifier),
visitNonOptionalPartitionSpec(ctx.partitionSpec))(
command(ctx))
}
/**
* Create an [[AlterTableSetFileFormat]] command
*
* For example:
* {{{
* ALTER TABLE table [PARTITION spec] SET FILEFORMAT file_format;
* }}}
*/
override def visitSetTableFileFormat(
ctx: SetTableFileFormatContext): LogicalPlan = withOrigin(ctx) {
// AlterTableSetFileFormat currently takes both a GenericFileFormat and a
// TableFileFormatContext. This is a bit weird because it should only take one. It also should
// use a CatalogFileFormat instead of either a String or a Sequence of Strings. We will address
// this in a follow-up PR.
val (fileFormat, genericFormat) = ctx.fileFormat match {
case s: GenericFileFormatContext =>
(Seq.empty[String], Option(s.identifier.getText))
case s: TableFileFormatContext =>
val elements = Seq(s.inFmt, s.outFmt) ++
Option(s.serdeCls).toSeq ++
Option(s.inDriver).toSeq ++
Option(s.outDriver).toSeq
(elements.map(string), None)
}
AlterTableSetFileFormat(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
fileFormat,
genericFormat)(
command(ctx))
}
/**
* Create an [[AlterTableSetLocation]] command
*
* For example:
* {{{
* ALTER TABLE table [PARTITION spec] SET LOCATION "loc";
* }}}
*/
override def visitSetTableLocation(ctx: SetTableLocationContext): LogicalPlan = withOrigin(ctx) {
AlterTableSetLocation(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
visitLocationSpec(ctx.locationSpec))(
command(ctx))
}
/**
* Create an [[AlterTableTouch]] command
*
* For example:
* {{{
* ALTER TABLE table TOUCH [PARTITION spec];
* }}}
*/
override def visitTouchTable(ctx: TouchTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableTouch(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
command(ctx))
}
/**
* Create an [[AlterTableCompact]] command
*
* For example:
* {{{
* ALTER TABLE table [PARTITION spec] COMPACT 'compaction_type';
* }}}
*/
override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableCompact(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
string(ctx.STRING))(
command(ctx))
}
/**
* Create an [[AlterTableMerge]] command
*
* For example:
* {{{
* ALTER TABLE table [PARTITION spec] CONCATENATE;
* }}}
*/
override def visitConcatenateTable(ctx: ConcatenateTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableMerge(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
command(ctx))
}
/**
* Create an [[AlterTableChangeCol]] command
*
* For example:
* {{{
* ALTER TABLE tableIdentifier [PARTITION spec]
* CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment]
* [FIRST|AFTER column_name] [CASCADE|RESTRICT];
* }}}
*/
override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = withOrigin(ctx) {
val col = visitColType(ctx.colType())
val comment = if (col.metadata.contains("comment")) {
Option(col.metadata.getString("comment"))
} else {
None
}
AlterTableChangeCol(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
ctx.oldName.getText,
// We could also pass in a struct field - seems easier.
col.name,
col.dataType,
comment,
Option(ctx.after).map(_.getText),
// Note that Restrict and Cascade are mutually exclusive.
ctx.RESTRICT != null,
ctx.CASCADE != null)(
command(ctx))
}
/**
* Create an [[AlterTableAddCol]] command
*
* For example:
* {{{
* ALTER TABLE tableIdentifier [PARTITION spec]
* ADD COLUMNS (name type [COMMENT comment], ...) [CASCADE|RESTRICT]
* }}}
*/
override def visitAddColumns(ctx: AddColumnsContext): LogicalPlan = withOrigin(ctx) {
AlterTableAddCol(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
createStructType(ctx.colTypeList),
// Note that Restrict and Cascade are mutually exclusive.
ctx.RESTRICT != null,
ctx.CASCADE != null)(
command(ctx))
}
/**
* Create an [[AlterTableReplaceCol]] command
*
* For example:
* {{{
* ALTER TABLE tableIdentifier [PARTITION spec]
* REPLACE COLUMNS (name type [COMMENT comment], ...) [CASCADE|RESTRICT]
* }}}
*/
override def visitReplaceColumns(ctx: ReplaceColumnsContext): LogicalPlan = withOrigin(ctx) {
AlterTableReplaceCol(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
createStructType(ctx.colTypeList),
// Note that Restrict and Cascade are mutually exclusive.
ctx.RESTRICT != null,
ctx.CASCADE != null)(
command(ctx))
}
/**
* Create location string.
*/
override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) {
string(ctx.STRING)
}
/**
* Create a [[BucketSpec]].
*/
override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = withOrigin(ctx) {
BucketSpec(
ctx.INTEGER_VALUE.getText.toInt,
visitIdentifierList(ctx.identifierList),
Option(ctx.orderedIdentifierList).toSeq
.flatMap(_.orderedIdentifier.asScala)
.map(_.identifier.getText))
}
/**
* Create a skew specification. This contains three components:
* - The Skewed Columns
* - Values for which are skewed. The size of each entry must match the number of skewed columns.
* - A store in directory flag.
*/
override def visitSkewSpec(
ctx: SkewSpecContext): (Seq[String], Seq[Seq[String]], Boolean) = withOrigin(ctx) {
val skewedValues = if (ctx.constantList != null) {
Seq(visitConstantList(ctx.constantList))
} else {
visitNestedConstantList(ctx.nestedConstantList)
}
(visitIdentifierList(ctx.identifierList), skewedValues, ctx.DIRECTORIES != null)
}
/**
* Convert a nested constants list into a sequence of string sequences.
*/
override def visitNestedConstantList(
ctx: NestedConstantListContext): Seq[Seq[String]] = withOrigin(ctx) {
ctx.constantList.asScala.map(visitConstantList)
}
/**
* Convert a constants list into a String sequence.
*/
override def visitConstantList(ctx: ConstantListContext): Seq[String] = withOrigin(ctx) {
ctx.constant.asScala.map(visitStringConstant)
}
}
......@@ -18,14 +18,13 @@
package org.apache.spark.sql.execution.command
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.execution.SparkQl
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.types._
class DDLCommandSuite extends PlanTest {
private val parser = new SparkQl
private val parser = SparkSqlParser
test("create database") {
val sql =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment