diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 152bd499a08c6458487fbfb8b27e39109449d648..67b1752ee8c3139a1076b9da7dcf50294a4f0f53 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -21,6 +21,8 @@ import java.io.File
 
 import scala.collection.mutable
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
@@ -221,6 +223,13 @@ class SessionCatalog(
       inheritTableSpecs, isSkewedStoreAsSubdir)
   }
 
+  def defaultTablePath(tableIdent: TableIdentifier): String = {
+    val dbName = tableIdent.database.getOrElse(currentDb)
+    val dbLocation = getDatabaseMetadata(dbName).locationUri
+
+    new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString
+  }
+
   // -------------------------------------------------------------
   // | Methods that interact with temporary and metastore tables |
   // -------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 3ce5f28bf32c288b64e0d71234112fd3504b1f44..3c10504fbdea3a15b59e1c49adb6885b368287b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{AnalysisException, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions._
@@ -421,10 +421,21 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         ExecutedCommandExec(
           CreateTempTableUsing(
             tableIdent, userSpecifiedSchema, provider, opts)) :: Nil
+
       case c: CreateTableUsing if !c.temporary =>
-        sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
+        val cmd =
+          CreateDataSourceTableCommand(
+            c.tableIdent,
+            c.userSpecifiedSchema,
+            c.provider,
+            c.options,
+            c.allowExisting,
+            c.managedIfNoPath)
+        ExecutedCommandExec(cmd) :: Nil
+
       case c: CreateTableUsing if c.temporary && c.allowExisting =>
-        sys.error("allowExisting should be set to false when creating a temporary table.")
+        throw new AnalysisException(
+          "allowExisting should be set to false when creating a temporary table.")
 
       case c: CreateTableUsingAsSelect if c.temporary && c.partitionColumns.nonEmpty =>
         sys.error("Cannot create temporary partitioned table.")
@@ -433,8 +444,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         val cmd = CreateTempTableUsingAsSelect(
           c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, c.child)
         ExecutedCommandExec(cmd) :: Nil
+
       case c: CreateTableUsingAsSelect if !c.temporary =>
-        sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
+        val cmd =
+          CreateDataSourceTableAsSelectCommand(
+            c.tableIdent,
+            c.provider,
+            c.partitionColumns,
+            c.bucketSpec,
+            c.mode,
+            c.options,
+            c.child)
+        ExecutedCommandExec(cmd) :: Nil
 
       case logical.ShowFunctions(db, pattern) =>
         ExecutedCommandExec(ShowFunctions(db, pattern)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
new file mode 100644
index 0000000000000000000000000000000000000000..0ef1d1d6889b5519cbf3f9335bb228dc33a73081
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import java.util.regex.Pattern
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.types._
+
+/**
+ * A command used to create a data source table.
+ *
+ * Note: This is different from [[CreateTable]]. Please check the syntax for difference.
+ * This is not intended for temporary tables.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
+ *   [(col1 data_type [COMMENT col_comment], ...)]
+ *   USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...])
+ * }}}
+ */
+case class CreateDataSourceTableCommand(
+    tableIdent: TableIdentifier,
+    userSpecifiedSchema: Option[StructType],
+    provider: String,
+    options: Map[String, String],
+    ignoreIfExists: Boolean,
+    managedIfNoPath: Boolean)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    // Since we are saving metadata to metastore, we need to check if metastore supports
+    // the table name and database name we have for this query. MetaStoreUtils.validateName
+    // is the method used by Hive to check if a table name or a database name is valid for
+    // the metastore.
+    if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) {
+      throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
+        s"metastore. Metastore only accepts table name containing characters, numbers and _.")
+    }
+    if (tableIdent.database.isDefined &&
+      !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) {
+      throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
+        s"for metastore. Metastore only accepts database name containing " +
+        s"characters, numbers and _.")
+    }
+
+    val tableName = tableIdent.unquotedString
+    val sessionState = sqlContext.sessionState
+
+    if (sessionState.catalog.tableExists(tableIdent)) {
+      if (ignoreIfExists) {
+        return Seq.empty[Row]
+      } else {
+        throw new AnalysisException(s"Table $tableName already exists.")
+      }
+    }
+
+    var isExternal = true
+    val optionsWithPath =
+      if (!options.contains("path") && managedIfNoPath) {
+        isExternal = false
+        options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
+      } else {
+        options
+      }
+
+    // Create the relation to validate the arguments before writing the metadata to the metastore.
+    DataSource(
+      sqlContext = sqlContext,
+      userSpecifiedSchema = userSpecifiedSchema,
+      className = provider,
+      bucketSpec = None,
+      options = optionsWithPath).resolveRelation()
+
+    CreateDataSourceTableUtils.createDataSourceTable(
+      sqlContext = sqlContext,
+      tableIdent = tableIdent,
+      userSpecifiedSchema = userSpecifiedSchema,
+      partitionColumns = Array.empty[String],
+      bucketSpec = None,
+      provider = provider,
+      options = optionsWithPath,
+      isExternal = isExternal)
+
+    Seq.empty[Row]
+  }
+}
+
+/**
+ * A command used to create a data source table using the result of a query.
+ *
+ * Note: This is different from [[CreateTableAsSelect]]. Please check the syntax for difference.
+ * This is not intended for temporary tables.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
+ *   USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...])
+ *   AS SELECT ...
+ * }}}
+ */
+case class CreateDataSourceTableAsSelectCommand(
+    tableIdent: TableIdentifier,
+    provider: String,
+    partitionColumns: Array[String],
+    bucketSpec: Option[BucketSpec],
+    mode: SaveMode,
+    options: Map[String, String],
+    query: LogicalPlan)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    // Since we are saving metadata to metastore, we need to check if metastore supports
+    // the table name and database name we have for this query. MetaStoreUtils.validateName
+    // is the method used by Hive to check if a table name or a database name is valid for
+    // the metastore.
+    if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) {
+      throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
+        s"metastore. Metastore only accepts table name containing characters, numbers and _.")
+    }
+    if (tableIdent.database.isDefined &&
+      !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) {
+      throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
+        s"for metastore. Metastore only accepts database name containing " +
+        s"characters, numbers and _.")
+    }
+
+    val tableName = tableIdent.unquotedString
+    val sessionState = sqlContext.sessionState
+    var createMetastoreTable = false
+    var isExternal = true
+    val optionsWithPath =
+      if (!options.contains("path")) {
+        isExternal = false
+        options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
+      } else {
+        options
+      }
+
+    var existingSchema = None: Option[StructType]
+    if (sqlContext.sessionState.catalog.tableExists(tableIdent)) {
+      // Check if we need to throw an exception or just return.
+      mode match {
+        case SaveMode.ErrorIfExists =>
+          throw new AnalysisException(s"Table $tableName already exists. " +
+            s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " +
+            s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" +
+            s"the existing data. " +
+            s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.")
+        case SaveMode.Ignore =>
+          // Since the table already exists and the save mode is Ignore, we will just return.
+          return Seq.empty[Row]
+        case SaveMode.Append =>
+          // Check if the specified data source match the data source of the existing table.
+          val dataSource = DataSource(
+            sqlContext = sqlContext,
+            userSpecifiedSchema = Some(query.schema.asNullable),
+            partitionColumns = partitionColumns,
+            bucketSpec = bucketSpec,
+            className = provider,
+            options = optionsWithPath)
+          // TODO: Check that options from the resolved relation match the relation that we are
+          // inserting into (i.e. using the same compression).
+
+          EliminateSubqueryAliases(
+            sessionState.catalog.lookupRelation(tableIdent)) match {
+            case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
+              existingSchema = Some(l.schema)
+            case o =>
+              throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
+          }
+        case SaveMode.Overwrite =>
+          sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
+          // Need to create the table again.
+          createMetastoreTable = true
+      }
+    } else {
+      // The table does not exist. We need to create it in metastore.
+      createMetastoreTable = true
+    }
+
+    val data = Dataset.ofRows(sqlContext, query)
+    val df = existingSchema match {
+      // If we are inserting into an existing table, just use the existing schema.
+      case Some(s) => data.selectExpr(s.fieldNames: _*)
+      case None => data
+    }
+
+    // Create the relation based on the data of df.
+    val dataSource = DataSource(
+      sqlContext,
+      className = provider,
+      partitionColumns = partitionColumns,
+      bucketSpec = bucketSpec,
+      options = optionsWithPath)
+
+    val result = dataSource.write(mode, df)
+
+    if (createMetastoreTable) {
+      // We will use the schema of resolved.relation as the schema of the table (instead of
+      // the schema of df). It is important since the nullability may be changed by the relation
+      // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
+      CreateDataSourceTableUtils.createDataSourceTable(
+        sqlContext = sqlContext,
+        tableIdent = tableIdent,
+        userSpecifiedSchema = Some(result.schema),
+        partitionColumns = partitionColumns,
+        bucketSpec = bucketSpec,
+        provider = provider,
+        options = optionsWithPath,
+        isExternal = isExternal)
+    }
+
+    // Refresh the cache of the table in the catalog.
+    sessionState.catalog.refreshTable(tableIdent)
+    Seq.empty[Row]
+  }
+}
+
+object CreateDataSourceTableUtils extends Logging {
+  /**
+   * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
+   * i.e. if this name only contains characters, numbers, and _.
+   *
+   * This method is intended to have the same behavior of
+   * org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName.
+   */
+  def validateName(name: String): Boolean = {
+    val tpat = Pattern.compile("[\\w_]+")
+    val matcher = tpat.matcher(name)
+
+    matcher.matches()
+  }
+
+  def createDataSourceTable(
+      sqlContext: SQLContext,
+      tableIdent: TableIdentifier,
+      userSpecifiedSchema: Option[StructType],
+      partitionColumns: Array[String],
+      bucketSpec: Option[BucketSpec],
+      provider: String,
+      options: Map[String, String],
+      isExternal: Boolean): Unit = {
+    val tableProperties = new mutable.HashMap[String, String]
+    tableProperties.put("spark.sql.sources.provider", provider)
+
+    // Saves optional user specified schema.  Serialized JSON schema string may be too long to be
+    // stored into a single metastore SerDe property.  In this case, we split the JSON string and
+    // store each part as a separate SerDe property.
+    userSpecifiedSchema.foreach { schema =>
+      val threshold = sqlContext.sessionState.conf.schemaStringLengthThreshold
+      val schemaJsonString = schema.json
+      // Split the JSON string.
+      val parts = schemaJsonString.grouped(threshold).toSeq
+      tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString)
+      parts.zipWithIndex.foreach { case (part, index) =>
+        tableProperties.put(s"spark.sql.sources.schema.part.$index", part)
+      }
+    }
+
+    if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
+      tableProperties.put("spark.sql.sources.schema.numPartCols", partitionColumns.length.toString)
+      partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
+        tableProperties.put(s"spark.sql.sources.schema.partCol.$index", partCol)
+      }
+    }
+
+    if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
+      val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
+
+      tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString)
+      tableProperties.put("spark.sql.sources.schema.numBucketCols",
+        bucketColumnNames.length.toString)
+      bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
+        tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol)
+      }
+
+      if (sortColumnNames.nonEmpty) {
+        tableProperties.put("spark.sql.sources.schema.numSortCols",
+          sortColumnNames.length.toString)
+        sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
+          tableProperties.put(s"spark.sql.sources.schema.sortCol.$index", sortCol)
+        }
+      }
+    }
+
+    if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
+      // The table does not have a specified schema, which means that the schema will be inferred
+      // when we load the table. So, we are not expecting partition columns and we will discover
+      // partitions when we load the table. However, if there are specified partition columns,
+      // we simply ignore them and provide a warning message.
+      logWarning(
+        s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
+          s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
+    }
+
+    val tableType = if (isExternal) {
+      tableProperties.put("EXTERNAL", "TRUE")
+      CatalogTableType.EXTERNAL_TABLE
+    } else {
+      tableProperties.put("EXTERNAL", "FALSE")
+      CatalogTableType.MANAGED_TABLE
+    }
+
+    val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sqlContext.sessionState.conf)
+    val dataSource =
+      DataSource(
+        sqlContext,
+        userSpecifiedSchema = userSpecifiedSchema,
+        partitionColumns = partitionColumns,
+        bucketSpec = bucketSpec,
+        className = provider,
+        options = options)
+
+    def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
+      CatalogTable(
+        identifier = tableIdent,
+        tableType = tableType,
+        schema = Nil,
+        storage = CatalogStorageFormat(
+          locationUri = None,
+          inputFormat = None,
+          outputFormat = None,
+          serde = None,
+          serdeProperties = options
+        ),
+        properties = tableProperties.toMap)
+    }
+
+    def newHiveCompatibleMetastoreTable(
+        relation: HadoopFsRelation,
+        serde: HiveSerDe): CatalogTable = {
+      assert(partitionColumns.isEmpty)
+      assert(relation.partitionSchema.isEmpty)
+
+      CatalogTable(
+        identifier = tableIdent,
+        tableType = tableType,
+        storage = CatalogStorageFormat(
+          locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
+          inputFormat = serde.inputFormat,
+          outputFormat = serde.outputFormat,
+          serde = serde.serde,
+          serdeProperties = options
+        ),
+        schema = relation.schema.map { f =>
+          CatalogColumn(f.name, f.dataType.catalogString)
+        },
+        properties = tableProperties.toMap,
+        viewText = None)
+    }
+
+    // TODO: Support persisting partitioned data source relations in Hive compatible format
+    val qualifiedTableName = tableIdent.quotedString
+    val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
+    val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match {
+      case _ if skipHiveMetadata =>
+        val message =
+          s"Persisting partitioned data source relation $qualifiedTableName into " +
+            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
+        (None, message)
+
+      case (Some(serde), relation: HadoopFsRelation)
+        if relation.location.paths.length == 1 && relation.partitionSchema.isEmpty =>
+        val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
+        val message =
+          s"Persisting data source relation $qualifiedTableName with a single input path " +
+            s"into Hive metastore in Hive compatible format. Input path: " +
+            s"${relation.location.paths.head}."
+        (Some(hiveTable), message)
+
+      case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty =>
+        val message =
+          s"Persisting partitioned data source relation $qualifiedTableName into " +
+            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
+            "Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
+        (None, message)
+
+      case (Some(serde), relation: HadoopFsRelation) =>
+        val message =
+          s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
+            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
+            s"Input paths: " + relation.location.paths.mkString("\n", "\n", "")
+        (None, message)
+
+      case (Some(serde), _) =>
+        val message =
+          s"Data source relation $qualifiedTableName is not a " +
+            s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
+            "in Spark SQL specific format, which is NOT compatible with Hive."
+        (None, message)
+
+      case _ =>
+        val message =
+          s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
+            s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
+            s"Spark SQL specific format, which is NOT compatible with Hive."
+        (None, message)
+    }
+
+    (hiveCompatibleTable, logMessage) match {
+      case (Some(table), message) =>
+        // We first try to save the metadata of the table in a Hive compatible way.
+        // If Hive throws an error, we fall back to save its metadata in the Spark SQL
+        // specific way.
+        try {
+          logInfo(message)
+          sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false)
+        } catch {
+          case NonFatal(e) =>
+            val warningMessage =
+              s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
+                s"it into Hive metastore in Spark SQL specific format."
+            logWarning(warningMessage, e)
+            val table = newSparkSQLSpecificMetastoreTable()
+            sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false)
+        }
+
+      case (None, message) =>
+        logWarning(message)
+        val table = newSparkSQLSpecificMetastoreTable()
+        sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false)
+    }
+  }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 9353e9ccd22626b53e1006573bfb1a0614c00fd0..13f29e08fb07d4a71fb75b687fece4fb3d515ca3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -156,198 +156,6 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
     cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
   }
 
-  def createDataSourceTable(
-      tableIdent: TableIdentifier,
-      userSpecifiedSchema: Option[StructType],
-      partitionColumns: Array[String],
-      bucketSpec: Option[BucketSpec],
-      provider: String,
-      options: Map[String, String],
-      isExternal: Boolean): Unit = {
-    val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
-
-    val tableProperties = new mutable.HashMap[String, String]
-    tableProperties.put("spark.sql.sources.provider", provider)
-
-    // Saves optional user specified schema.  Serialized JSON schema string may be too long to be
-    // stored into a single metastore SerDe property.  In this case, we split the JSON string and
-    // store each part as a separate SerDe property.
-    userSpecifiedSchema.foreach { schema =>
-      val threshold = conf.schemaStringLengthThreshold
-      val schemaJsonString = schema.json
-      // Split the JSON string.
-      val parts = schemaJsonString.grouped(threshold).toSeq
-      tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString)
-      parts.zipWithIndex.foreach { case (part, index) =>
-        tableProperties.put(s"spark.sql.sources.schema.part.$index", part)
-      }
-    }
-
-    if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
-      tableProperties.put("spark.sql.sources.schema.numPartCols", partitionColumns.length.toString)
-      partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
-        tableProperties.put(s"spark.sql.sources.schema.partCol.$index", partCol)
-      }
-    }
-
-    if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
-      val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
-
-      tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString)
-      tableProperties.put("spark.sql.sources.schema.numBucketCols",
-        bucketColumnNames.length.toString)
-      bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
-        tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol)
-      }
-
-      if (sortColumnNames.nonEmpty) {
-        tableProperties.put("spark.sql.sources.schema.numSortCols",
-          sortColumnNames.length.toString)
-        sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
-          tableProperties.put(s"spark.sql.sources.schema.sortCol.$index", sortCol)
-        }
-      }
-    }
-
-    if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
-      // The table does not have a specified schema, which means that the schema will be inferred
-      // when we load the table. So, we are not expecting partition columns and we will discover
-      // partitions when we load the table. However, if there are specified partition columns,
-      // we simply ignore them and provide a warning message.
-      logWarning(
-        s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
-          s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
-    }
-
-    val tableType = if (isExternal) {
-      tableProperties.put("EXTERNAL", "TRUE")
-      CatalogTableType.EXTERNAL_TABLE
-    } else {
-      tableProperties.put("EXTERNAL", "FALSE")
-      CatalogTableType.MANAGED_TABLE
-    }
-
-    val maybeSerDe = HiveSerDe.sourceToSerDe(provider, conf)
-    val dataSource =
-      DataSource(
-        hive,
-        userSpecifiedSchema = userSpecifiedSchema,
-        partitionColumns = partitionColumns,
-        bucketSpec = bucketSpec,
-        className = provider,
-        options = options)
-
-    def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
-      CatalogTable(
-        identifier = TableIdentifier(tblName, Option(dbName)),
-        tableType = tableType,
-        schema = Nil,
-        storage = CatalogStorageFormat(
-          locationUri = None,
-          inputFormat = None,
-          outputFormat = None,
-          serde = None,
-          serdeProperties = options
-        ),
-        properties = tableProperties.toMap)
-    }
-
-    def newHiveCompatibleMetastoreTable(
-        relation: HadoopFsRelation,
-        serde: HiveSerDe): CatalogTable = {
-      assert(partitionColumns.isEmpty)
-      assert(relation.partitionSchema.isEmpty)
-
-      CatalogTable(
-        identifier = TableIdentifier(tblName, Option(dbName)),
-        tableType = tableType,
-        storage = CatalogStorageFormat(
-          locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
-          inputFormat = serde.inputFormat,
-          outputFormat = serde.outputFormat,
-          serde = serde.serde,
-          serdeProperties = options
-        ),
-        schema = relation.schema.map { f =>
-          CatalogColumn(f.name, f.dataType.catalogString)
-        },
-        properties = tableProperties.toMap,
-        viewText = None) // TODO: We need to place the SQL string here
-    }
-
-    // TODO: Support persisting partitioned data source relations in Hive compatible format
-    val qualifiedTableName = tableIdent.quotedString
-    val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
-    val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match {
-      case _ if skipHiveMetadata =>
-        val message =
-          s"Persisting partitioned data source relation $qualifiedTableName into " +
-            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
-        (None, message)
-
-      case (Some(serde), relation: HadoopFsRelation)
-        if relation.location.paths.length == 1 && relation.partitionSchema.isEmpty =>
-        val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
-        val message =
-          s"Persisting data source relation $qualifiedTableName with a single input path " +
-            s"into Hive metastore in Hive compatible format. Input path: " +
-            s"${relation.location.paths.head}."
-        (Some(hiveTable), message)
-
-      case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty =>
-        val message =
-          s"Persisting partitioned data source relation $qualifiedTableName into " +
-            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
-            "Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
-        (None, message)
-
-      case (Some(serde), relation: HadoopFsRelation) =>
-        val message =
-          s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
-            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
-            s"Input paths: " + relation.location.paths.mkString("\n", "\n", "")
-        (None, message)
-
-      case (Some(serde), _) =>
-        val message =
-          s"Data source relation $qualifiedTableName is not a " +
-            s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
-            "in Spark SQL specific format, which is NOT compatible with Hive."
-        (None, message)
-
-      case _ =>
-        val message =
-          s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
-            s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
-            s"Spark SQL specific format, which is NOT compatible with Hive."
-        (None, message)
-    }
-
-    (hiveCompatibleTable, logMessage) match {
-      case (Some(table), message) =>
-        // We first try to save the metadata of the table in a Hive compatible way.
-        // If Hive throws an error, we fall back to save its metadata in the Spark SQL
-        // specific way.
-        try {
-          logInfo(message)
-          client.createTable(table, ignoreIfExists = false)
-        } catch {
-          case throwable: Throwable =>
-            val warningMessage =
-              s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
-                s"it into Hive metastore in Spark SQL specific format."
-            logWarning(warningMessage, throwable)
-            val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable()
-            client.createTable(sparkSqlSpecificTable, ignoreIfExists = false)
-        }
-
-      case (None, message) =>
-        logWarning(message)
-        val hiveTable = newSparkSQLSpecificMetastoreTable()
-        client.createTable(hiveTable, ignoreIfExists = false)
-    }
-  }
-
   def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
     // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
     val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 4f9513389c8c2c0f742d864461ae14e153a24f7e..3e718826fece60d4fd439c0684f409a17597e466 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -103,18 +103,6 @@ private[sql] class HiveSessionCatalog(
     metastoreCatalog.cachedDataSourceTables.invalidateAll()
   }
 
-  def createDataSourceTable(
-      name: TableIdentifier,
-      userSpecifiedSchema: Option[StructType],
-      partitionColumns: Array[String],
-      bucketSpec: Option[BucketSpec],
-      provider: String,
-      options: Map[String, String],
-      isExternal: Boolean): Unit = {
-    metastoreCatalog.createDataSourceTable(
-      name, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options, isExternal)
-  }
-
   def hiveDefaultTableFilePath(name: TableIdentifier): String = {
     metastoreCatalog.hiveDefaultTableFilePath(name)
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index a22e19207e5c185c7eee64805c426777cc1f7c3a..636bfdebf7066cee0776cae77c8029bb5aa1387c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -110,7 +110,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
         experimentalMethods.extraStrategies ++ Seq(
           FileSourceStrategy,
           DataSourceStrategy,
-          HiveDDLStrategy,
           DDLStrategy,
           SpecialLimits,
           InMemoryScans,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 2d36ddafe60889c833ad7e238df9df1282b1e7ea..2bea32b14446081895cf9671b09a3f166765f12b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, CreateTempTableUsingAsSelect}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.execution._
 
 private[hive] trait HiveStrategies {
@@ -83,27 +83,4 @@ private[hive] trait HiveStrategies {
         Nil
     }
   }
-
-  object HiveDDLStrategy extends Strategy {
-    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case CreateTableUsing(
-        tableIdent, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) =>
-        val cmd =
-          CreateMetastoreDataSource(
-            tableIdent, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)
-        ExecutedCommandExec(cmd) :: Nil
-
-      case c: CreateTableUsingAsSelect if c.temporary =>
-        val cmd = CreateTempTableUsingAsSelect(
-          c.tableIdent, c.provider, c.partitionColumns, c.mode, c.options, c.child)
-        ExecutedCommandExec(cmd) :: Nil
-
-      case c: CreateTableUsingAsSelect =>
-        val cmd = CreateMetastoreDataSourceAsSelect(c.tableIdent, c.provider, c.partitionColumns,
-          c.bucketSpec, c.mode, c.options, c.child)
-        ExecutedCommandExec(cmd) :: Nil
-
-      case _ => Nil
-    }
-  }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
deleted file mode 100644
index 6899f46eec0d28a32e938050076f81b7c2880e4e..0000000000000000000000000000000000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.hadoop.hive.metastore.MetaStoreUtils
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation}
-import org.apache.spark.sql.hive.HiveSessionState
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types._
-
-
-private[hive]
-case class CreateMetastoreDataSource(
-    tableIdent: TableIdentifier,
-    userSpecifiedSchema: Option[StructType],
-    provider: String,
-    options: Map[String, String],
-    allowExisting: Boolean,
-    managedIfNoPath: Boolean) extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    // Since we are saving metadata to metastore, we need to check if metastore supports
-    // the table name and database name we have for this query. MetaStoreUtils.validateName
-    // is the method used by Hive to check if a table name or a database name is valid for
-    // the metastore.
-    if (!MetaStoreUtils.validateName(tableIdent.table)) {
-      throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
-        s"metastore. Metastore only accepts table name containing characters, numbers and _.")
-    }
-    if (tableIdent.database.isDefined && !MetaStoreUtils.validateName(tableIdent.database.get)) {
-      throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
-        s"for metastore. Metastore only accepts database name containing " +
-        s"characters, numbers and _.")
-    }
-
-    val tableName = tableIdent.unquotedString
-    val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
-
-    if (sessionState.catalog.tableExists(tableIdent)) {
-      if (allowExisting) {
-        return Seq.empty[Row]
-      } else {
-        throw new AnalysisException(s"Table $tableName already exists.")
-      }
-    }
-
-    var isExternal = true
-    val optionsWithPath =
-      if (!options.contains("path") && managedIfNoPath) {
-        isExternal = false
-        options + ("path" -> sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
-      } else {
-        options
-      }
-
-    // Create the relation to validate the arguments before writing the metadata to the metastore.
-    DataSource(
-      sqlContext = sqlContext,
-      userSpecifiedSchema = userSpecifiedSchema,
-      className = provider,
-      bucketSpec = None,
-      options = optionsWithPath).resolveRelation()
-
-    sessionState.catalog.createDataSourceTable(
-      tableIdent,
-      userSpecifiedSchema,
-      Array.empty[String],
-      bucketSpec = None,
-      provider,
-      optionsWithPath,
-      isExternal)
-
-    Seq.empty[Row]
-  }
-}
-
-private[hive]
-case class CreateMetastoreDataSourceAsSelect(
-    tableIdent: TableIdentifier,
-    provider: String,
-    partitionColumns: Array[String],
-    bucketSpec: Option[BucketSpec],
-    mode: SaveMode,
-    options: Map[String, String],
-    query: LogicalPlan) extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    // Since we are saving metadata to metastore, we need to check if metastore supports
-    // the table name and database name we have for this query. MetaStoreUtils.validateName
-    // is the method used by Hive to check if a table name or a database name is valid for
-    // the metastore.
-    if (!MetaStoreUtils.validateName(tableIdent.table)) {
-      throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
-        s"metastore. Metastore only accepts table name containing characters, numbers and _.")
-    }
-    if (tableIdent.database.isDefined && !MetaStoreUtils.validateName(tableIdent.database.get)) {
-      throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
-        s"for metastore. Metastore only accepts database name containing " +
-        s"characters, numbers and _.")
-    }
-
-    val tableName = tableIdent.unquotedString
-    val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
-    var createMetastoreTable = false
-    var isExternal = true
-    val optionsWithPath =
-      if (!options.contains("path")) {
-        isExternal = false
-        options + ("path" -> sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
-      } else {
-        options
-      }
-
-    var existingSchema = None: Option[StructType]
-    if (sqlContext.sessionState.catalog.tableExists(tableIdent)) {
-      // Check if we need to throw an exception or just return.
-      mode match {
-        case SaveMode.ErrorIfExists =>
-          throw new AnalysisException(s"Table $tableName already exists. " +
-            s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " +
-            s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" +
-            s"the existing data. " +
-            s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.")
-        case SaveMode.Ignore =>
-          // Since the table already exists and the save mode is Ignore, we will just return.
-          return Seq.empty[Row]
-        case SaveMode.Append =>
-          // Check if the specified data source match the data source of the existing table.
-          val dataSource = DataSource(
-            sqlContext = sqlContext,
-            userSpecifiedSchema = Some(query.schema.asNullable),
-            partitionColumns = partitionColumns,
-            bucketSpec = bucketSpec,
-            className = provider,
-            options = optionsWithPath)
-          // TODO: Check that options from the resolved relation match the relation that we are
-          // inserting into (i.e. using the same compression).
-
-          EliminateSubqueryAliases(
-            sessionState.catalog.lookupRelation(tableIdent)) match {
-            case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
-              existingSchema = Some(l.schema)
-            case o =>
-              throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
-          }
-        case SaveMode.Overwrite =>
-          sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
-          // Need to create the table again.
-          createMetastoreTable = true
-      }
-    } else {
-      // The table does not exist. We need to create it in metastore.
-      createMetastoreTable = true
-    }
-
-    val data = Dataset.ofRows(sqlContext, query)
-    val df = existingSchema match {
-      // If we are inserting into an existing table, just use the existing schema.
-      case Some(s) => data.selectExpr(s.fieldNames: _*)
-      case None => data
-    }
-
-    // Create the relation based on the data of df.
-    val dataSource = DataSource(
-      sqlContext,
-      className = provider,
-      partitionColumns = partitionColumns,
-      bucketSpec = bucketSpec,
-      options = optionsWithPath)
-
-    val result = dataSource.write(mode, df)
-
-    if (createMetastoreTable) {
-      // We will use the schema of resolved.relation as the schema of the table (instead of
-      // the schema of df). It is important since the nullability may be changed by the relation
-      // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
-      sessionState.catalog.createDataSourceTable(
-        tableIdent,
-        Some(result.schema),
-        partitionColumns,
-        bucketSpec,
-        provider,
-        optionsWithPath,
-        isExternal)
-    }
-
-    // Refresh the cache of the table in the catalog.
-    sessionState.catalog.refreshTable(tableIdent)
-    Seq.empty[Row]
-  }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 11165a7ebbe8c3ee4083ffb5805c7bbe3092b3a8..68244cdb11cb8731b6e32b991bd60f1ed7ea5ec9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.parser.DataTypeParser
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -699,8 +700,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))
 
           // Manually create a metastore data source table.
-          sessionState.catalog.createDataSourceTable(
-            name = TableIdentifier("wide_schema"),
+          CreateDataSourceTableUtils.createDataSourceTable(
+            sqlContext = sqlContext,
+            tableIdent = TableIdentifier("wide_schema"),
             userSpecifiedSchema = Some(schema),
             partitionColumns = Array.empty[String],
             bucketSpec = None,
@@ -907,8 +909,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     withTempDir { tempPath =>
       val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType)))
 
-      sessionState.catalog.createDataSourceTable(
-        name = TableIdentifier("not_skip_hive_metadata"),
+      CreateDataSourceTableUtils.createDataSourceTable(
+        sqlContext = sqlContext,
+        tableIdent = TableIdentifier("not_skip_hive_metadata"),
         userSpecifiedSchema = Some(schema),
         partitionColumns = Array.empty[String],
         bucketSpec = None,
@@ -921,8 +924,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema
         .forall(column => DataTypeParser.parse(column.dataType) == StringType))
 
-      sessionState.catalog.createDataSourceTable(
-        name = TableIdentifier("skip_hive_metadata"),
+      CreateDataSourceTableUtils.createDataSourceTable(
+        sqlContext = sqlContext,
+        tableIdent = TableIdentifier("skip_hive_metadata"),
         userSpecifiedSchema = Some(schema),
         partitionColumns = Array.empty[String],
         bucketSpec = None,