diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
index c701f3823842cf0a2bbc68930a1521f30edf1f6f..478a83f8fa45eae9edc7c95f5f77bd7ae19076f3 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
@@ -110,4 +110,31 @@ class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext {
     df.select("features").rdd.map { case Row(d: Vector) => d }.first
     df.select("features").collect
   }
+
+  test("create libsvmTable table without schema") {
+    try {
+      spark.sql(
+        s"""
+           |CREATE TABLE libsvmTable
+           |USING libsvm
+           |OPTIONS (
+           |  path '$path'
+           |)
+         """.stripMargin)
+      val df = spark.table("libsvmTable")
+      assert(df.columns(0) == "label")
+      assert(df.columns(1) == "features")
+    } finally {
+      spark.sql("DROP TABLE IF EXISTS libsvmTable")
+    }
+  }
+
+  test("create libsvmTable table without schema and path") {
+    try {
+      val e = intercept[IOException](spark.sql("CREATE TABLE libsvmTable USING libsvm"))
+      assert(e.getMessage.contains("No input path specified for libsvm data"))
+    } finally {
+      spark.sql("DROP TABLE IF EXISTS libsvmTable")
+    }
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index d553d44cb3a1478b09f1e71da35db95b031d2526..623d47b4c9a106e488b1b693f59f5aa3fb61e282 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -30,9 +30,9 @@ import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
 import org.apache.spark.sql.types.{AtomicType, StructType}
 
 /**
- * Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]].
+ * Try to replaces [[UnresolvedRelation]]s if the plan is for direct query on files.
  */
-class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {
   private def maybeSQLFile(u: UnresolvedRelation): Boolean = {
     sparkSession.sessionState.conf.runSQLonFile && u.tableIdentifier.database.isDefined
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 68b774b52fd7fdcd3befce45fe4b99f774fc09e3..a5ebe4780f3dadd19b2260218befa85504b5b547 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -115,7 +115,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
     new Analyzer(catalog, conf) {
       override val extendedResolutionRules =
         new FindDataSourceTable(sparkSession) ::
-        new ResolveDataSource(sparkSession) :: Nil
+        new ResolveSQLOnFile(sparkSession) :: Nil
 
       override val postHocResolutionRules =
         AnalyzeCreateTable(sparkSession) ::
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 51f5946c19e00d8919bf797dd2dea6b2075b0513..f6d1ee2287c7bc7a916c1b1267e59fc386e2ba26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1511,6 +1511,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
+  test("create a data source table without schema") {
+    import testImplicits._
+    withTempPath { tempDir =>
+      withTable("tab1", "tab2") {
+        (("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)
+
+        val e = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING json") }.getMessage
+        assert(e.contains("Unable to infer schema for JSON. It must be specified manually"))
+
+        sql(s"CREATE TABLE tab2 using json location '${tempDir.getCanonicalPath}'")
+        checkAnswer(spark.table("tab2"), Row("a", "b"))
+      }
+    }
+  }
+
   test("create table using CLUSTERED BY without schema specification") {
     import testImplicits._
     withTempPath { tempDir =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 9fd03ef8ba037c8938165ea42dce9b570ca77333..413712e0c651db2e3682622bbff1fcfd02883235 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -62,16 +62,16 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
       override val extendedResolutionRules =
         catalog.ParquetConversions ::
         catalog.OrcConversions ::
-        new DetermineHiveSerde(conf) ::
+        new ResolveHiveSerdeTable(sparkSession) ::
         new FindDataSourceTable(sparkSession) ::
         new FindHiveSerdeTable(sparkSession) ::
-        new ResolveDataSource(sparkSession) :: Nil
+        new ResolveSQLOnFile(sparkSession) :: Nil
 
       override val postHocResolutionRules =
         AnalyzeCreateTable(sparkSession) ::
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::
-        new HiveAnalysis(sparkSession) :: Nil
+        HiveAnalysis :: Nil
 
       override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
     }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 39be41770d8edb566c0cb4deb5ae024e441a86e6..0f293c21fa452f098811f084ef9dfb2144fea1a2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, SimpleCatalogRelation}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
@@ -27,21 +27,24 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion}
 import org.apache.spark.sql.hive.execution._
-import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.HiveSerDe
 
 
 /**
- * Determine the serde/format of the Hive serde table, according to the storage properties.
+ * Determine the database, serde/format and schema of the Hive serde table, according to the storage
+ * properties.
  */
-class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) && t.storage.serde.isEmpty =>
-      if (t.bucketSpec.isDefined) {
+class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
+  private def determineHiveSerde(table: CatalogTable): CatalogTable = {
+    if (table.storage.serde.nonEmpty) {
+      table
+    } else {
+      if (table.bucketSpec.isDefined) {
         throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.")
       }
 
-      val defaultStorage = HiveSerDe.getDefaultStorage(conf)
-      val options = new HiveOptions(t.storage.properties)
+      val defaultStorage = HiveSerDe.getDefaultStorage(session.sessionState.conf)
+      val options = new HiveOptions(table.storage.properties)
 
       val fileStorage = if (options.fileFormat.isDefined) {
         HiveSerDe.sourceToSerDe(options.fileFormat.get) match {
@@ -67,13 +70,39 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
         CatalogStorageFormat.empty
       }
 
-      val storage = t.storage.copy(
+      val storage = table.storage.copy(
         inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
         outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
         serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
         properties = options.serdeProperties)
 
-      c.copy(tableDesc = t.copy(storage = storage))
+      table.copy(storage = storage)
+    }
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) =>
+      // Finds the database name if the name does not exist.
+      val dbName = t.identifier.database.getOrElse(session.catalog.currentDatabase)
+      val table = t.copy(identifier = t.identifier.copy(database = Some(dbName)))
+
+      // Determines the serde/format of Hive tables
+      val withStorage = determineHiveSerde(table)
+
+      // Infers the schema, if empty, because the schema could be determined by Hive
+      // serde.
+      val catalogTable = if (query.isEmpty) {
+        val withSchema = HiveUtils.inferSchema(withStorage)
+        if (withSchema.schema.length <= 0) {
+          throw new AnalysisException("Unable to infer the schema. " +
+            s"The schema specification is required to create the table ${withSchema.identifier}.")
+        }
+        withSchema
+      } else {
+        withStorage
+      }
+
+      c.copy(tableDesc = catalogTable)
   }
 }
 
@@ -82,17 +111,13 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
  *
  * Note that, this rule must be run after `PreprocessTableInsertion`.
  */
-class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
+object HiveAnalysis extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) =>
       InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
 
     case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
-      val dbName = tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
-      CreateHiveTableAsSelectCommand(
-        tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
-        query,
-        mode)
+      CreateHiveTableAsSelectCommand(tableDesc, query, mode)
   }
 }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 26b1994308f5d2326587d68068b6c263fb80614f..2822a55e3d43ea12820d96ae66e30c8cf3168f07 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -24,18 +24,25 @@ import java.sql.Timestamp
 import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable.HashMap
+import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.common.`type`.HiveDecimal
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
 import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
 import org.apache.hadoop.util.VersionInfo
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, CatalogTableType}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf._
@@ -455,4 +462,133 @@ private[spark] object HiveUtils extends Logging {
     case (decimal, DecimalType()) => decimal.toString
     case (other, tpe) if primitiveTypes contains tpe => other.toString
   }
+
+  /** Converts the native StructField to Hive's FieldSchema. */
+  private def toHiveColumn(c: StructField): FieldSchema = {
+    val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
+      c.metadata.getString(HiveUtils.hiveTypeString)
+    } else {
+      c.dataType.catalogString
+    }
+    new FieldSchema(c.name, typeString, c.getComment.orNull)
+  }
+
+  /** Builds the native StructField from Hive's FieldSchema. */
+  private def fromHiveColumn(hc: FieldSchema): StructField = {
+    val columnType = try {
+      CatalystSqlParser.parseDataType(hc.getType)
+    } catch {
+      case e: ParseException =>
+        throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
+    }
+
+    val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
+    val field = StructField(
+      name = hc.getName,
+      dataType = columnType,
+      nullable = true,
+      metadata = metadata)
+    Option(hc.getComment).map(field.withComment).getOrElse(field)
+  }
+
+  // TODO: merge this with HiveClientImpl#toHiveTable
+  /** Converts the native table metadata representation format CatalogTable to Hive's Table. */
+  def toHiveTable(catalogTable: CatalogTable): HiveTable = {
+    // We start by constructing an API table as Hive performs several important transformations
+    // internally when converting an API table to a QL table.
+    val tTable = new org.apache.hadoop.hive.metastore.api.Table()
+    tTable.setTableName(catalogTable.identifier.table)
+    tTable.setDbName(catalogTable.database)
+
+    val tableParameters = new java.util.HashMap[String, String]()
+    tTable.setParameters(tableParameters)
+    catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
+
+    tTable.setTableType(catalogTable.tableType match {
+      case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString
+      case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
+      case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
+    })
+
+    val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
+    tTable.setSd(sd)
+
+    // Note: In Hive the schema and partition columns must be disjoint sets
+    val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
+      catalogTable.partitionColumnNames.contains(c.getName)
+    }
+    sd.setCols(schema.asJava)
+    tTable.setPartitionKeys(partCols.asJava)
+
+    catalogTable.storage.locationUri.foreach(sd.setLocation)
+    catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
+    catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
+
+    val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
+    catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
+    sd.setSerdeInfo(serdeInfo)
+
+    val serdeParameters = new java.util.HashMap[String, String]()
+    catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
+    serdeInfo.setParameters(serdeParameters)
+
+    new HiveTable(tTable)
+  }
+
+  /**
+   * Converts the native partition metadata representation format CatalogTablePartition to
+   * Hive's Partition.
+   */
+  def toHivePartition(
+      catalogTable: CatalogTable,
+      hiveTable: HiveTable,
+      partition: CatalogTablePartition): HivePartition = {
+    val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
+    tPartition.setDbName(catalogTable.database)
+    tPartition.setTableName(catalogTable.identifier.table)
+    tPartition.setValues(catalogTable.partitionColumnNames.map(partition.spec(_)).asJava)
+
+    val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
+    tPartition.setSd(sd)
+
+    // Note: In Hive the schema and partition columns must be disjoint sets
+    val schema = catalogTable.schema.map(toHiveColumn).filter { c =>
+      !catalogTable.partitionColumnNames.contains(c.getName)
+    }
+    sd.setCols(schema.asJava)
+
+    partition.storage.locationUri.foreach(sd.setLocation)
+    partition.storage.inputFormat.foreach(sd.setInputFormat)
+    partition.storage.outputFormat.foreach(sd.setOutputFormat)
+
+    val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
+    sd.setSerdeInfo(serdeInfo)
+    // maps and lists should be set only after all elements are ready (see HIVE-7975)
+    partition.storage.serde.foreach(serdeInfo.setSerializationLib)
+
+    val serdeParameters = new java.util.HashMap[String, String]()
+    catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
+    partition.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
+    serdeInfo.setParameters(serdeParameters)
+
+    new HivePartition(hiveTable, tPartition)
+  }
+
+  /**
+   * Infers the schema for Hive serde tables and returns the CatalogTable with the inferred schema.
+   * When the tables are data source tables or the schema already exists, returns the original
+   * CatalogTable.
+   */
+  def inferSchema(table: CatalogTable): CatalogTable = {
+    if (DDLUtils.isDatasourceTable(table) || table.schema.nonEmpty) {
+      table
+    } else {
+      val hiveTable = toHiveTable(table)
+      // Note: Hive separates partition columns and the schema, but for us the
+      // partition columns are part of the schema
+      val partCols = hiveTable.getPartCols.asScala.map(fromHiveColumn)
+      val schema = StructType(hiveTable.getCols.asScala.map(fromHiveColumn) ++ partCols)
+      table.copy(schema = schema)
+    }
+  }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 346757c2047a7c4d299faf71459bd1bac0a4a845..6394eb6da5173ccb41330b03db64951b8368af66 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -19,13 +19,9 @@ package org.apache.spark.sql.hive
 
 import java.io.IOException
 
-import scala.collection.JavaConverters._
-
 import com.google.common.base.Objects
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.hive.common.StatsSetupConst
-import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
-import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable}
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
@@ -60,57 +56,7 @@ private[hive] case class MetastoreRelation(
 
   override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
 
-  private def toHiveColumn(c: StructField): FieldSchema = {
-    val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
-      c.metadata.getString(HiveUtils.hiveTypeString)
-    } else {
-      c.dataType.catalogString
-    }
-    new FieldSchema(c.name, typeString, c.getComment.orNull)
-  }
-
-  // TODO: merge this with HiveClientImpl#toHiveTable
-  @transient val hiveQlTable: HiveTable = {
-    // We start by constructing an API table as Hive performs several important transformations
-    // internally when converting an API table to a QL table.
-    val tTable = new org.apache.hadoop.hive.metastore.api.Table()
-    tTable.setTableName(catalogTable.identifier.table)
-    tTable.setDbName(catalogTable.database)
-
-    val tableParameters = new java.util.HashMap[String, String]()
-    tTable.setParameters(tableParameters)
-    catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
-
-    tTable.setTableType(catalogTable.tableType match {
-      case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString
-      case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
-      case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
-    })
-
-    val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
-    tTable.setSd(sd)
-
-    // Note: In Hive the schema and partition columns must be disjoint sets
-    val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
-      catalogTable.partitionColumnNames.contains(c.getName)
-    }
-    sd.setCols(schema.asJava)
-    tTable.setPartitionKeys(partCols.asJava)
-
-    catalogTable.storage.locationUri.foreach(sd.setLocation)
-    catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
-    catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
-
-    val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
-    catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
-    sd.setSerdeInfo(serdeInfo)
-
-    val serdeParameters = new java.util.HashMap[String, String]()
-    catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
-    serdeInfo.setParameters(serdeParameters)
-
-    new HiveTable(tTable)
-  }
+  @transient val hiveQlTable: HiveTable = HiveUtils.toHiveTable(catalogTable)
 
   @transient override def computeStats(conf: CatalystConf): Statistics = {
     catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
@@ -165,38 +111,7 @@ private[hive] case class MetastoreRelation(
     } else {
       allPartitions
     }
-
-    rawPartitions.map { p =>
-      val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
-      tPartition.setDbName(databaseName)
-      tPartition.setTableName(tableName)
-      tPartition.setValues(partitionKeys.map(a => p.spec(a.name)).asJava)
-
-      val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
-      tPartition.setSd(sd)
-
-      // Note: In Hive the schema and partition columns must be disjoint sets
-      val schema = catalogTable.schema.map(toHiveColumn).filter { c =>
-        !catalogTable.partitionColumnNames.contains(c.getName)
-      }
-      sd.setCols(schema.asJava)
-
-      p.storage.locationUri.foreach(sd.setLocation)
-      p.storage.inputFormat.foreach(sd.setInputFormat)
-      p.storage.outputFormat.foreach(sd.setOutputFormat)
-
-      val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
-      sd.setSerdeInfo(serdeInfo)
-      // maps and lists should be set only after all elements are ready (see HIVE-7975)
-      p.storage.serde.foreach(serdeInfo.setSerializationLib)
-
-      val serdeParameters = new java.util.HashMap[String, String]()
-      catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
-      p.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
-      serdeInfo.setParameters(serdeParameters)
-
-      new Partition(hiveQlTable, tPartition)
-    }
+    rawPartitions.map(HiveUtils.toHivePartition(catalogTable, hiveQlTable, _))
   }
 
   /** Only compare database and tablename, not alias. */
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
index b20c10c6a3294ca95bcbed4090e09382f90d0b16..43b6bf5feeb603b81dd0fa3d6b77bbb8f9740f27 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -68,82 +68,4 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl
       sql("DROP TABLE IF EXISTS createAndInsertTest")
     }
   }
-
-  test("SPARK-13709: reading partitioned Avro table with nested schema") {
-    withTempDir { dir =>
-      val path = dir.toURI.toString
-      val tableName = "spark_13709"
-      val tempTableName = "spark_13709_temp"
-
-      new File(dir.getAbsolutePath, tableName).mkdir()
-      new File(dir.getAbsolutePath, tempTableName).mkdir()
-
-      val avroSchema =
-        """{
-          |  "name": "test_record",
-          |  "type": "record",
-          |  "fields": [ {
-          |    "name": "f0",
-          |    "type": "int"
-          |  }, {
-          |    "name": "f1",
-          |    "type": {
-          |      "type": "record",
-          |      "name": "inner",
-          |      "fields": [ {
-          |        "name": "f10",
-          |        "type": "int"
-          |      }, {
-          |        "name": "f11",
-          |        "type": "double"
-          |      } ]
-          |    }
-          |  } ]
-          |}
-        """.stripMargin
-
-      withTable(tableName, tempTableName) {
-        // Creates the external partitioned Avro table to be tested.
-        sql(
-          s"""CREATE EXTERNAL TABLE $tableName
-             |PARTITIONED BY (ds STRING)
-             |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
-             |STORED AS
-             |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
-             |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
-             |LOCATION '$path/$tableName'
-             |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
-           """.stripMargin
-        )
-
-        // Creates an temporary Avro table used to prepare testing Avro file.
-        sql(
-          s"""CREATE EXTERNAL TABLE $tempTableName
-             |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
-             |STORED AS
-             |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
-             |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
-             |LOCATION '$path/$tempTableName'
-             |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
-           """.stripMargin
-        )
-
-        // Generates Avro data.
-        sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)")
-
-        // Adds generated Avro data as a new partition to the testing table.
-        sql(s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'")
-
-        // The following query fails before SPARK-13709 is fixed. This is because when reading data
-        // from table partitions, Avro deserializer needs the Avro schema, which is defined in
-        // table property "avro.schema.literal". However, we only initializes the deserializer using
-        // partition properties, which doesn't include the wanted property entry. Merging two sets
-        // of properties solves the problem.
-        checkAnswer(
-          sql(s"SELECT * FROM $tableName"),
-          Row(1, Row(2, 2.5D), "foo")
-        )
-      }
-    }
-  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 28b5bfd5819c67ae92ff83f76258b689e79c6974..ca39c7e8459fcf0cdbd7c1e1d8c731d999ad4469 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -24,9 +24,8 @@ import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 import org.apache.hadoop.mapred.TextInputFormat
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException}
 import org.apache.spark.sql.catalyst.catalog._
@@ -47,7 +46,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
  * is not fully tested.
  */
 @ExtendedHiveTest
-class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
+class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with Logging {
 
   private val clientBuilder = new HiveClientBuilder
   import clientBuilder.buildClient
@@ -571,6 +570,85 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet
       }
     }
 
+
+    test(s"$version: SPARK-13709: reading partitioned Avro table with nested schema") {
+      withTempDir { dir =>
+        val path = dir.toURI.toString
+        val tableName = "spark_13709"
+        val tempTableName = "spark_13709_temp"
+
+        new File(dir.getAbsolutePath, tableName).mkdir()
+        new File(dir.getAbsolutePath, tempTableName).mkdir()
+
+        val avroSchema =
+          """{
+            |  "name": "test_record",
+            |  "type": "record",
+            |  "fields": [ {
+            |    "name": "f0",
+            |    "type": "int"
+            |  }, {
+            |    "name": "f1",
+            |    "type": {
+            |      "type": "record",
+            |      "name": "inner",
+            |      "fields": [ {
+            |        "name": "f10",
+            |        "type": "int"
+            |      }, {
+            |        "name": "f11",
+            |        "type": "double"
+            |      } ]
+            |    }
+            |  } ]
+            |}
+          """.stripMargin
+
+        withTable(tableName, tempTableName) {
+          // Creates the external partitioned Avro table to be tested.
+          sql(
+            s"""CREATE EXTERNAL TABLE $tableName
+               |PARTITIONED BY (ds STRING)
+               |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+               |STORED AS
+               |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+               |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+               |LOCATION '$path/$tableName'
+               |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
+           """.stripMargin
+          )
+
+          // Creates an temporary Avro table used to prepare testing Avro file.
+          sql(
+            s"""CREATE EXTERNAL TABLE $tempTableName
+               |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+               |STORED AS
+               |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+               |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+               |LOCATION '$path/$tempTableName'
+               |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
+           """.stripMargin
+          )
+
+          // Generates Avro data.
+          sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)")
+
+          // Adds generated Avro data as a new partition to the testing table.
+          sql(s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'")
+
+          // The following query fails before SPARK-13709 is fixed. This is because when reading
+          // data from table partitions, Avro deserializer needs the Avro schema, which is defined
+          // in table property "avro.schema.literal". However, we only initializes the deserializer
+          // using partition properties, which doesn't include the wanted property entry. Merging
+          // two sets of properties solves the problem.
+          checkAnswer(
+            sql(s"SELECT * FROM $tableName"),
+            Row(1, Row(2, 2.5D), "foo")
+          )
+        }
+      }
+    }
+
     // TODO: add more tests.
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 58be079d01410aff956073c2091a32b4bf9774a1..9d9f3a620d51b29636f6f1182d1a000610770d65 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -79,6 +79,25 @@ class HiveDDLSuite
     }
   }
 
+  test("create a hive table without schema") {
+    import testImplicits._
+    withTempPath { tempDir =>
+      withTable("tab1", "tab2") {
+        (("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)
+
+        var e = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING hive") }.getMessage
+        assert(e.contains("Unable to infer the schema. The schema specification is required to " +
+          "create the table `default`.`tab1`"))
+
+        e = intercept[AnalysisException] {
+          sql(s"CREATE TABLE tab2 location '${tempDir.getCanonicalPath}'")
+        }.getMessage
+        assert(e.contains("Unable to infer the schema. The schema specification is required to " +
+          "create the table `default`.`tab2`"))
+      }
+    }
+  }
+
   test("drop external tables in default database") {
     withTempDir { tmpDir =>
       val tabName = "tab1"
@@ -199,7 +218,7 @@ class HiveDDLSuite
     val e = intercept[AnalysisException] {
       sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)")
     }
-    assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
+    assert(e.message == "Found duplicate column(s) in table definition of `default`.`tbl`: a")
   }
 
   test("add/drop partition with location - managed table") {
@@ -1192,7 +1211,7 @@ class HiveDDLSuite
         assert(e2.getMessage.contains(forbiddenPrefix + "foo"))
 
         val e3 = intercept[AnalysisException] {
-          sql(s"CREATE TABLE tbl TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
+          sql(s"CREATE TABLE tbl (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
         }
         assert(e3.getMessage.contains(forbiddenPrefix + "foo"))
       }