diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index c098fa99c2220bd8c00f928ee2d8678b188a3ff9..a8313deeefc2f8a687637cee9898e95241bd83dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -20,7 +20,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.parser.ng.{AbstractSqlParser, AstBuilder}
+import org.apache.spark.sql.catalyst.parser.ng.{AbstractSqlParser, AstBuilder, ParseException}
 import org.apache.spark.sql.catalyst.parser.ng.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
 import org.apache.spark.sql.execution.command.{DescribeCommand => _, _}
@@ -200,8 +200,8 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   /**
-    * Convert a table property list into a key-value map.
-    */
+   * Convert a table property list into a key-value map.
+   */
   override def visitTablePropertyList(
       ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
     ctx.tableProperty.asScala.map { property =>
@@ -216,4 +216,618 @@ class SparkSqlAstBuilder extends AstBuilder {
       key -> value
     }.toMap
   }
+
+  /**
+   * Create a [[CreateDatabase]] command.
+   *
+   * For example:
+   * {{{
+   *   CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment]
+   *    [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)]
+   * }}}
+   */
+  override def visitCreateDatabase(ctx: CreateDatabaseContext): LogicalPlan = withOrigin(ctx) {
+    CreateDatabase(
+      ctx.identifier.getText,
+      ctx.EXISTS != null,
+      Option(ctx.locationSpec).map(visitLocationSpec),
+      Option(ctx.comment).map(string),
+      Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterDatabaseProperties]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER (DATABASE|SCHEMA) database SET DBPROPERTIES (property_name=property_value, ...);
+   * }}}
+   */
+  override def visitSetDatabaseProperties(
+      ctx: SetDatabasePropertiesContext): LogicalPlan = withOrigin(ctx) {
+    AlterDatabaseProperties(
+      ctx.identifier.getText,
+      visitTablePropertyList(ctx.tablePropertyList))(
+      command(ctx))
+  }
+
+  /**
+   * Create a [[DropDatabase]] command.
+   *
+   * For example:
+   * {{{
+   *   DROP (DATABASE|SCHEMA) [IF EXISTS] database [RESTRICT|CASCADE];
+   * }}}
+   */
+  override def visitDropDatabase(ctx: DropDatabaseContext): LogicalPlan = withOrigin(ctx) {
+    DropDatabase(ctx.identifier.getText, ctx.EXISTS != null, ctx.CASCADE == null)(command(ctx))
+  }
+
+  /**
+   * Create a [[DescribeDatabase]] command.
+   *
+   * For example:
+   * {{{
+   *   DESCRIBE DATABASE [EXTENDED] database;
+   * }}}
+   */
+  override def visitDescribeDatabase(ctx: DescribeDatabaseContext): LogicalPlan = withOrigin(ctx) {
+    DescribeDatabase(ctx.identifier.getText, ctx.EXTENDED != null)(command(ctx))
+  }
+
+  /**
+   * Create a [[CreateFunction]] command.
+   *
+   * For example:
+   * {{{
+   *   CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
+   *    [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
+   * }}}
+   */
+  override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) {
+    val resources = ctx.resource.asScala.map { resource =>
+      val resourceType = resource.identifier.getText.toLowerCase
+      resourceType match {
+        case "jar" | "file" | "archive" =>
+          resourceType -> string(resource.STRING)
+        case other =>
+          throw new ParseException(s"Resource Type '$resourceType' is not supported.", ctx)
+      }
+    }
+
+    // Extract database, name & alias.
+    val (database, function) = visitFunctionName(ctx.qualifiedName)
+    CreateFunction(
+      database,
+      function,
+      string(ctx.className), // TODO this is not an alias.
+      resources,
+      ctx.TEMPORARY != null)(
+      command(ctx))
+  }
+
+  /**
+   * Create a [[DropFunction]] command.
+   *
+   * For example:
+   * {{{
+   *   DROP [TEMPORARY] FUNCTION [IF EXISTS] function;
+   * }}}
+   */
+  override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) {
+    val (database, function) = visitFunctionName(ctx.qualifiedName)
+    DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)(command(ctx))
+  }
+
+  /**
+   * Create a function database (optional) and name pair.
+   */
+  private def visitFunctionName(ctx: QualifiedNameContext): (Option[String], String) = {
+    ctx.identifier().asScala.map(_.getText) match {
+      case Seq(db, fn) => (Option(db), fn)
+      case Seq(fn) => (None, fn)
+      case other => throw new ParseException(s"Unsupported function name '${ctx.getText}'", ctx)
+    }
+  }
+
+  /**
+   * Create a [[AlterTableRename]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table1 RENAME TO table2;
+   * }}}
+   */
+  override def visitRenameTable(ctx: RenameTableContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableRename(
+      visitTableIdentifier(ctx.from),
+      visitTableIdentifier(ctx.to))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableSetProperties]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table SET TBLPROPERTIES ('comment' = new_comment);
+   * }}}
+   */
+  override def visitSetTableProperties(
+      ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableSetProperties(
+      visitTableIdentifier(ctx.tableIdentifier),
+      visitTablePropertyList(ctx.tablePropertyList))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableUnsetProperties]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
+   * }}}
+   */
+  override def visitUnsetTableProperties(
+      ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableUnsetProperties(
+      visitTableIdentifier(ctx.tableIdentifier),
+      visitTablePropertyList(ctx.tablePropertyList),
+      ctx.EXISTS != null)(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableSerDeProperties]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props];
+   *   ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties;
+   * }}}
+   */
+  override def visitSetTableSerDe(ctx: SetTableSerDeContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableSerDeProperties(
+      visitTableIdentifier(ctx.tableIdentifier),
+      Option(ctx.STRING).map(string),
+      Option(ctx.tablePropertyList).map(visitTablePropertyList),
+      // TODO a partition spec is allowed to have optional values. This is currently violated.
+      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableStorageProperties]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS;
+   * }}}
+   */
+  override def visitBucketTable(ctx: BucketTableContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableStorageProperties(
+      visitTableIdentifier(ctx.tableIdentifier),
+      visitBucketSpec(ctx.bucketSpec))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableNotClustered]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table NOT CLUSTERED;
+   * }}}
+   */
+  override def visitUnclusterTable(ctx: UnclusterTableContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableNotClustered(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableNotSorted]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table NOT SORTED;
+   * }}}
+   */
+  override def visitUnsortTable(ctx: UnsortTableContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableNotSorted(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableSkewed]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table SKEWED BY (col1, col2)
+   *   ON ((col1_value, col2_value) [, (col1_value, col2_value), ...])
+   *   [STORED AS DIRECTORIES];
+   * }}}
+   */
+  override def visitSkewTable(ctx: SkewTableContext): LogicalPlan = withOrigin(ctx) {
+    val table = visitTableIdentifier(ctx.tableIdentifier)
+    val (cols, values, storedAsDirs) = visitSkewSpec(ctx.skewSpec)
+    AlterTableSkewed(table, cols, values, storedAsDirs)(command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableNotSorted]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table NOT SKEWED;
+   * }}}
+   */
+  override def visitUnskewTable(ctx: UnskewTableContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableNotSkewed(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableNotStoredAsDirs]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table NOT STORED AS DIRECTORIES
+   * }}}
+   */
+  override def visitUnstoreTable(ctx: UnstoreTableContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableNotStoredAsDirs(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableSkewedLocation]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] );
+   * }}}
+   */
+  override def visitSetTableSkewLocations(
+      ctx: SetTableSkewLocationsContext): LogicalPlan = withOrigin(ctx) {
+    val skewedMap = ctx.skewedLocationList.skewedLocation.asScala.flatMap {
+      slCtx =>
+        val location = string(slCtx.STRING)
+        if (slCtx.constant != null) {
+          Seq(visitStringConstant(slCtx.constant) -> location)
+        } else {
+          // TODO this is similar to what was in the original implementation. However this does not
+          // make to much sense to me since we should be storing a tuple of values (not column
+          // names) for which we want a dedicated storage location.
+          visitConstantList(slCtx.constantList).map(_ -> location)
+        }
+    }.toMap
+
+    AlterTableSkewedLocation(
+      visitTableIdentifier(ctx.tableIdentifier),
+      skewedMap)(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableAddPartition]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
+   * }}}
+   */
+  override def visitAddTablePartition(
+      ctx: AddTablePartitionContext): LogicalPlan = withOrigin(ctx) {
+    // Create partition spec to location mapping.
+    val specsAndLocs = ctx.partitionSpecLocation.asScala.map {
+      splCtx =>
+        val spec = visitNonOptionalPartitionSpec(splCtx.partitionSpec)
+        val location = Option(splCtx.locationSpec).map(visitLocationSpec)
+        spec -> location
+    }
+    AlterTableAddPartition(
+      visitTableIdentifier(ctx.tableIdentifier),
+      specsAndLocs,
+      ctx.EXISTS != null)(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableExchangePartition]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table1 EXCHANGE PARTITION spec WITH TABLE table2;
+   * }}}
+   */
+  override def visitExchangeTablePartition(
+      ctx: ExchangeTablePartitionContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableExchangePartition(
+      visitTableIdentifier(ctx.from),
+      visitTableIdentifier(ctx.to),
+      visitNonOptionalPartitionSpec(ctx.partitionSpec))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableRenamePartition]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2;
+   * }}}
+   */
+  override def visitRenameTablePartition(
+      ctx: RenameTablePartitionContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableRenamePartition(
+      visitTableIdentifier(ctx.tableIdentifier),
+      visitNonOptionalPartitionSpec(ctx.from),
+      visitNonOptionalPartitionSpec(ctx.to))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableDropPartition]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
+   * }}}
+   */
+  override def visitDropTablePartitions(
+      ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableDropPartition(
+      visitTableIdentifier(ctx.tableIdentifier),
+      ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
+      ctx.EXISTS != null,
+      ctx.PURGE != null)(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableArchivePartition]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table ARCHIVE PARTITION spec;
+   * }}}
+   */
+  override def visitArchiveTablePartition(
+      ctx: ArchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableArchivePartition(
+      visitTableIdentifier(ctx.tableIdentifier),
+      visitNonOptionalPartitionSpec(ctx.partitionSpec))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableUnarchivePartition]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table UNARCHIVE PARTITION spec;
+   * }}}
+   */
+  override def visitUnarchiveTablePartition(
+      ctx: UnarchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableUnarchivePartition(
+      visitTableIdentifier(ctx.tableIdentifier),
+      visitNonOptionalPartitionSpec(ctx.partitionSpec))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableSetFileFormat]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table [PARTITION spec] SET FILEFORMAT file_format;
+   * }}}
+   */
+  override def visitSetTableFileFormat(
+      ctx: SetTableFileFormatContext): LogicalPlan = withOrigin(ctx) {
+    // AlterTableSetFileFormat currently takes both a GenericFileFormat and a
+    // TableFileFormatContext. This is a bit weird because it should only take one. It also should
+    // use a CatalogFileFormat instead of either a String or a Sequence of Strings. We will address
+    // this in a follow-up PR.
+    val (fileFormat, genericFormat) = ctx.fileFormat match {
+      case s: GenericFileFormatContext =>
+        (Seq.empty[String], Option(s.identifier.getText))
+      case s: TableFileFormatContext =>
+        val elements = Seq(s.inFmt, s.outFmt) ++
+          Option(s.serdeCls).toSeq ++
+          Option(s.inDriver).toSeq ++
+          Option(s.outDriver).toSeq
+        (elements.map(string), None)
+    }
+    AlterTableSetFileFormat(
+      visitTableIdentifier(ctx.tableIdentifier),
+      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
+      fileFormat,
+      genericFormat)(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableSetLocation]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table [PARTITION spec] SET LOCATION "loc";
+   * }}}
+   */
+  override def visitSetTableLocation(ctx: SetTableLocationContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableSetLocation(
+      visitTableIdentifier(ctx.tableIdentifier),
+      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
+      visitLocationSpec(ctx.locationSpec))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableTouch]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table TOUCH [PARTITION spec];
+   * }}}
+   */
+  override def visitTouchTable(ctx: TouchTableContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableTouch(
+      visitTableIdentifier(ctx.tableIdentifier),
+      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableCompact]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table [PARTITION spec] COMPACT 'compaction_type';
+   * }}}
+   */
+  override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableCompact(
+      visitTableIdentifier(ctx.tableIdentifier),
+      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
+      string(ctx.STRING))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableMerge]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table [PARTITION spec] CONCATENATE;
+   * }}}
+   */
+  override def visitConcatenateTable(ctx: ConcatenateTableContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableMerge(
+      visitTableIdentifier(ctx.tableIdentifier),
+      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableChangeCol]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE tableIdentifier [PARTITION spec]
+   *    CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment]
+   *    [FIRST|AFTER column_name] [CASCADE|RESTRICT];
+   * }}}
+   */
+  override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = withOrigin(ctx) {
+    val col = visitColType(ctx.colType())
+    val comment = if (col.metadata.contains("comment")) {
+      Option(col.metadata.getString("comment"))
+    } else {
+      None
+    }
+
+    AlterTableChangeCol(
+      visitTableIdentifier(ctx.tableIdentifier),
+      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
+      ctx.oldName.getText,
+      // We could also pass in a struct field - seems easier.
+      col.name,
+      col.dataType,
+      comment,
+      Option(ctx.after).map(_.getText),
+      // Note that Restrict and Cascade are mutually exclusive.
+      ctx.RESTRICT != null,
+      ctx.CASCADE != null)(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableAddCol]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE tableIdentifier [PARTITION spec]
+   *    ADD COLUMNS (name type [COMMENT comment], ...) [CASCADE|RESTRICT]
+   * }}}
+   */
+  override def visitAddColumns(ctx: AddColumnsContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableAddCol(
+      visitTableIdentifier(ctx.tableIdentifier),
+      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
+      createStructType(ctx.colTypeList),
+      // Note that Restrict and Cascade are mutually exclusive.
+      ctx.RESTRICT != null,
+      ctx.CASCADE != null)(
+      command(ctx))
+  }
+
+  /**
+   * Create an [[AlterTableReplaceCol]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE tableIdentifier [PARTITION spec]
+   *    REPLACE COLUMNS (name type [COMMENT comment], ...) [CASCADE|RESTRICT]
+   * }}}
+   */
+  override def visitReplaceColumns(ctx: ReplaceColumnsContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableReplaceCol(
+      visitTableIdentifier(ctx.tableIdentifier),
+      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
+      createStructType(ctx.colTypeList),
+      // Note that Restrict and Cascade are mutually exclusive.
+      ctx.RESTRICT != null,
+      ctx.CASCADE != null)(
+      command(ctx))
+  }
+
+  /**
+   * Create location string.
+   */
+  override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) {
+    string(ctx.STRING)
+  }
+
+  /**
+   * Create a [[BucketSpec]].
+   */
+  override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = withOrigin(ctx) {
+    BucketSpec(
+      ctx.INTEGER_VALUE.getText.toInt,
+      visitIdentifierList(ctx.identifierList),
+      Option(ctx.orderedIdentifierList).toSeq
+        .flatMap(_.orderedIdentifier.asScala)
+        .map(_.identifier.getText))
+  }
+
+  /**
+   * Create a skew specification. This contains three components:
+   * - The Skewed Columns
+   * - Values for which are skewed. The size of each entry must match the number of skewed columns.
+   * - A store in directory flag.
+   */
+  override def visitSkewSpec(
+      ctx: SkewSpecContext): (Seq[String], Seq[Seq[String]], Boolean) = withOrigin(ctx) {
+    val skewedValues = if (ctx.constantList != null) {
+      Seq(visitConstantList(ctx.constantList))
+    } else {
+      visitNestedConstantList(ctx.nestedConstantList)
+    }
+    (visitIdentifierList(ctx.identifierList), skewedValues, ctx.DIRECTORIES != null)
+  }
+
+  /**
+   * Convert a nested constants list into a sequence of string sequences.
+   */
+  override def visitNestedConstantList(
+      ctx: NestedConstantListContext): Seq[Seq[String]] = withOrigin(ctx) {
+    ctx.constantList.asScala.map(visitConstantList)
+  }
+
+  /**
+   * Convert a constants list into a String sequence.
+   */
+  override def visitConstantList(ctx: ConstantListContext): Seq[String] = withOrigin(ctx) {
+    ctx.constant.asScala.map(visitStringConstant)
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 7a6343748ba9eccac3b27b6773489c731274d385..03079c6890a8416d6c8e1eff4cd0b999ab9a444e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -18,14 +18,13 @@
 package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending}
 import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.execution.SparkQl
+import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.types._
 
 class DDLCommandSuite extends PlanTest {
-  private val parser = new SparkQl
+  private val parser = SparkSqlParser
 
   test("create database") {
     val sql =