diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index a5e02523d28898796a19d1c1b012357049a85952..14dd707fa0f1c4e2e8d67b389b5d1c4c0d505916 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
-import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
 import org.apache.spark.sql.catalyst.expressions.Expression
 
 
@@ -39,6 +39,12 @@ abstract class ExternalCatalog {
     }
   }
 
+  protected def requireTableExists(db: String, table: String): Unit = {
+    if (!tableExists(db, table)) {
+      throw new NoSuchTableException(db = db, table = table)
+    }
+  }
+
   protected def requireFunctionExists(db: String, funcName: String): Unit = {
     if (!functionExists(db, funcName)) {
       throw new NoSuchFunctionException(db = db, func = funcName)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index ea675b76607d69b1283715d1e2ca220a9d809c6d..bc396880f22a318f452794b78ff5ad50c269845f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -64,12 +64,6 @@ class InMemoryCatalog(
     catalog(db).tables(table).partitions.contains(spec)
   }
 
-  private def requireTableExists(db: String, table: String): Unit = {
-    if (!tableExists(db, table)) {
-      throw new NoSuchTableException(db = db, table = table)
-    }
-  }
-
   private def requireTableNotExists(db: String, table: String): Unit = {
     if (tableExists(db, table)) {
       throw new TableAlreadyExistsException(db = db, table = table)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 312585df1516bcc86f98566ca19c528cc9229564..2642d9395ba88e6b4536e4b1552e8bdfb04c1955 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -250,4 +250,28 @@ object DataType {
       case (fromDataType, toDataType) => fromDataType == toDataType
     }
   }
+
+  /**
+   * Compares two types, ignoring nullability of ArrayType, MapType, StructType, and ignoring case
+   * sensitivity of field names in StructType.
+   */
+  private[sql] def equalsIgnoreCaseAndNullability(from: DataType, to: DataType): Boolean = {
+    (from, to) match {
+      case (ArrayType(fromElement, _), ArrayType(toElement, _)) =>
+        equalsIgnoreCaseAndNullability(fromElement, toElement)
+
+      case (MapType(fromKey, fromValue, _), MapType(toKey, toValue, _)) =>
+        equalsIgnoreCaseAndNullability(fromKey, toKey) &&
+          equalsIgnoreCaseAndNullability(fromValue, toValue)
+
+      case (StructType(fromFields), StructType(toFields)) =>
+        fromFields.length == toFields.length &&
+          fromFields.zip(toFields).forall { case (l, r) =>
+            l.name.equalsIgnoreCase(r.name) &&
+              equalsIgnoreCaseAndNullability(l.dataType, r.dataType)
+          }
+
+      case (fromDataType, toDataType) => fromDataType == toDataType
+    }
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index f283f4287c5bffea8ee51e4f5a8c6c924bf60cd4..66f92d1b1b0af25e401ecce9d244f85d5eacc769 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -270,6 +270,26 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     assert(catalog.listTables("db2", "*1").toSet == Set("tbl1"))
   }
 
+  test("column names should be case-preserving and column nullability should be retained") {
+    val catalog = newBasicCatalog()
+    val tbl = CatalogTable(
+      identifier = TableIdentifier("tbl", Some("db1")),
+      tableType = CatalogTableType.MANAGED,
+      storage = storageFormat,
+      schema = new StructType()
+        .add("HelLo", "int", nullable = false)
+        .add("WoRLd", "int", nullable = true),
+      provider = Some("hive"),
+      partitionColumnNames = Seq("WoRLd"),
+      bucketSpec = Some(BucketSpec(4, Seq("HelLo"), Nil)))
+    catalog.createTable(tbl, ignoreIfExists = false)
+
+    val readBack = catalog.getTable("db1", "tbl")
+    assert(readBack.schema == tbl.schema)
+    assert(readBack.partitionColumnNames == tbl.partitionColumnNames)
+    assert(readBack.bucketSpec == tbl.bucketSpec)
+  }
+
   // --------------------------------------------------------------------------
   // Partitions
   // --------------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index f95362e29228055e9dd2228c859573649e8dbd6b..e0c89811ddbfa51f045fc3393307c3d479a02cb2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -24,10 +24,10 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions, Union}
-import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
-import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions}
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -359,7 +359,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   }
 
   private def saveAsTable(tableIdent: TableIdentifier): Unit = {
-    if (source.toLowerCase == "hive") {
+    if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
       throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
     }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 634ffde3543cb6bfed3cd484f462312f208cab5b..b8be3d17ba4445b960c9b90e989b592963fc6628 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -331,7 +331,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     }
     val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
     val provider = ctx.tableProvider.qualifiedName.getText
-    if (provider.toLowerCase == "hive") {
+    if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) {
       throw new AnalysisException("Cannot create hive serde table with CREATE TABLE USING")
     }
     val schema = Option(ctx.colTypeList()).map(createSchema)
@@ -1034,7 +1034,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       tableType = tableType,
       storage = storage,
       schema = schema,
-      provider = Some("hive"),
+      provider = Some(DDLUtils.HIVE_PROVIDER),
       partitionColumnNames = partitionCols.map(_.name),
       properties = properties,
       comment = comment)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5412aca95dcf1974a3c118817c31ba3c041c4903..190fdd84343ee5896e61be8f9ead06f8148fffb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -415,7 +415,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" =>
+      case CreateTable(tableDesc, mode, None)
+        if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
         val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore)
         ExecutedCommandExec(cmd) :: Nil
 
@@ -427,7 +428,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
       // `CreateTables`
 
-      case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get != "hive" =>
+      case CreateTable(tableDesc, mode, Some(query))
+        if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER =>
         val cmd =
           CreateDataSourceTableAsSelectCommand(
             tableDesc,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index b4d3ca1f3707426f69018114fa80e3ab755a3882..8500ab460a1b67d35514d51dec85b339c67dbbbf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -687,8 +687,10 @@ case class AlterTableSetLocationCommand(
 
 
 object DDLUtils {
+  val HIVE_PROVIDER = "hive"
+
   def isDatasourceTable(table: CatalogTable): Boolean = {
-    table.provider.isDefined && table.provider.get != "hive"
+    table.provider.isDefined && table.provider.get != HIVE_PROVIDER
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 4647b11af4dfb4ae01a5111e0fffb8041459f9d7..5ba44ff9f5d9d3d55a968ca333e9ddbed0203bb3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrd
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
 import org.apache.spark.sql.types.{AtomicType, StructType}
@@ -127,7 +128,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
     checkDuplication(normalizedPartitionCols, "partition")
 
     if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
-      if (tableDesc.provider.get == "hive") {
+      if (tableDesc.provider.get == DDLUtils.HIVE_PROVIDER) {
         // When we hit this branch, it means users didn't specify schema for the table to be
         // created, as we always include partition columns in table schema for hive serde tables.
         // The real schema will be inferred at hive metastore by hive serde, plus the given
@@ -292,7 +293,7 @@ object HiveOnlyCheck extends (LogicalPlan => Unit) {
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
       case CreateTable(tableDesc, _, Some(_))
-          if tableDesc.provider.get == "hive" =>
+          if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
         throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT")
 
       case _ => // OK
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 64ba52672b1c8ab22556c1ccc08a029b76a08a51..b537061d0d22155ac08f8140d1bb1745fe102ac9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -95,8 +95,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
   }
 
-  private def requireTableExists(db: String, table: String): Unit = {
-    withClient { getTable(db, table) }
+  /**
+   * Get the raw table metadata from hive metastore directly. The raw table metadata may contains
+   * special data source properties and should not be exposed outside of `HiveExternalCatalog`. We
+   * should interpret these special data source properties and restore the original table metadata
+   * before returning it.
+   */
+  private def getRawTable(db: String, table: String): CatalogTable = withClient {
+    client.getTable(db, table)
   }
 
   /**
@@ -187,16 +193,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     if (tableExists(db, table) && !ignoreIfExists) {
       throw new TableAlreadyExistsException(db = db, table = table)
     }
-    // Before saving data source table metadata into Hive metastore, we should:
-    //  1. Put table metadata like provider, schema, etc. in table properties.
-    //  2. Check if this table is hive compatible
-    //    2.1  If it's not hive compatible, set location URI, schema, partition columns and bucket
-    //         spec to empty and save table metadata to Hive.
-    //    2.2  If it's hive compatible, set serde information in table metadata and try to save
-    //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
-    if (DDLUtils.isDatasourceTable(tableDefinition)) {
+
+    if (tableDefinition.tableType == VIEW) {
+      client.createTable(tableDefinition, ignoreIfExists)
+    } else if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
+      // Here we follow data source tables and put table metadata like provider, schema, etc. in
+      // table properties, so that we can work around the Hive metastore issue about not case
+      // preserving and make Hive serde table support mixed-case column names.
+      val tableWithDataSourceProps = tableDefinition.copy(
+        properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
+      client.createTable(tableWithDataSourceProps, ignoreIfExists)
+    } else {
+      // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
+      // support, no column nullability, etc., we should do some extra works before saving table
+      // metadata into Hive metastore:
+      //  1. Put table metadata like provider, schema, etc. in table properties.
+      //  2. Check if this table is hive compatible.
+      //    2.1  If it's not hive compatible, set location URI, schema, partition columns and bucket
+      //         spec to empty and save table metadata to Hive.
+      //    2.2  If it's hive compatible, set serde information in table metadata and try to save
+      //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
       val tableProperties = tableMetaToTableProps(tableDefinition)
 
+      // Ideally we should not create a managed table with location, but Hive serde table can
+      // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have
+      // to create the table directory and write out data before we create this table, to avoid
+      // exposing a partial written table.
       val needDefaultTableLocation = tableDefinition.tableType == MANAGED &&
         tableDefinition.storage.locationUri.isEmpty
       val tableLocation = if (needDefaultTableLocation) {
@@ -304,8 +326,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
           logWarning(message)
           saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
       }
-    } else {
-      client.createTable(tableDefinition, ignoreIfExists)
     }
   }
 
@@ -417,11 +437,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   }
 
   override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
-    val rawTable = client.getTable(db, oldName)
-
-    val storageWithNewPath = if (rawTable.tableType == MANAGED) {
-      // If it's a managed table and we are renaming it, then the path option becomes inaccurate
-      // and we need to update it according to the new table name.
+    val rawTable = getRawTable(db, oldName)
+
+    // Note that Hive serde tables don't use path option in storage properties to store the value
+    // of table location, but use `locationUri` field to store it directly. And `locationUri` field
+    // will be updated automatically in Hive metastore by the `alterTable` call at the end of this
+    // method. Here we only update the path option if the path option already exists in storage
+    // properties, to avoid adding a unnecessary path option for Hive serde tables.
+    val hasPathOption = new CaseInsensitiveMap(rawTable.storage.properties).contains("path")
+    val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) {
+      // If it's a managed table with path option and we are renaming it, then the path option
+      // becomes inaccurate and we need to update it according to the new table name.
       val newTablePath = defaultTablePath(TableIdentifier(newName, Some(db)))
       updateLocationInStorageProps(rawTable, Some(newTablePath))
     } else {
@@ -442,7 +468,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   private def updateLocationInStorageProps(
       table: CatalogTable,
       newPath: Option[String]): CatalogStorageFormat = {
-    val propsWithoutPath = table.storage.properties.filterKeys(_.toLowerCase != "path")
+    // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
+    // while `CatalogTable` should be serializable.
+    val propsWithoutPath = table.storage.properties.filter {
+      case (k, v) => k.toLowerCase != "path"
+    }
     table.storage.copy(properties = propsWithoutPath ++ newPath.map("path" -> _))
   }
 
@@ -475,18 +505,51 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       tableDefinition
     }
 
-    if (DDLUtils.isDatasourceTable(withStatsProps)) {
-      val oldTableDef = client.getTable(db, withStatsProps.identifier.table)
+    if (tableDefinition.tableType == VIEW) {
+      client.alterTable(withStatsProps)
+    } else {
+      val oldTableDef = getRawTable(db, withStatsProps.identifier.table)
 
-      val oldLocation = getLocationFromStorageProps(oldTableDef)
-      val newLocation = tableDefinition.storage.locationUri
-      // Only update the `locationUri` field if the location is really changed, because this table
-      // may be not Hive-compatible and can not set the `locationUri` field. We should respect the
-      // old `locationUri` even it's None.
-      val storageWithNewLocation = if (oldLocation == newLocation) {
-        oldTableDef.storage
+      val newStorage = if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
+        tableDefinition.storage
       } else {
-        updateLocationInStorageProps(oldTableDef, newLocation).copy(locationUri = newLocation)
+        // We can't alter the table storage of data source table directly for 2 reasons:
+        //   1. internally we use path option in storage properties to store the value of table
+        //      location, but the given `tableDefinition` is from outside and doesn't have the path
+        //      option, we need to add it manually.
+        //   2. this data source table may be created on a file, not a directory, then we can't set
+        //      the `locationUri` field and save it to Hive metastore, because Hive only allows
+        //      directory as table location.
+        //
+        // For example, an external data source table is created with a single file '/path/to/file'.
+        // Internally, we will add a path option with value '/path/to/file' to storage properties,
+        // and set the `locationUri` to a special value due to SPARK-15269(please see
+        // `saveTableIntoHive` for more details). When users try to get the table metadata back, we
+        // will restore the `locationUri` field from the path option and remove the path option from
+        // storage properties. When users try to alter the table storage, the given
+        // `tableDefinition` will have `locationUri` field with value `/path/to/file` and the path
+        // option is not set.
+        //
+        // Here we need 2 extra steps:
+        //   1. add path option to storage properties, to match the internal format, i.e. using path
+        //      option to store the value of table location.
+        //   2. set the `locationUri` field back to the old one from the existing table metadata,
+        //      if users don't want to alter the table location. This step is necessary as the
+        //      `locationUri` is not always same with the path option, e.g. in the above example
+        //      `locationUri` is a special value and we should respect it. Note that, if users
+        //       want to alter the table location to a file path, we will fail. This should be fixed
+        //       in the future.
+
+        val newLocation = tableDefinition.storage.locationUri
+        val storageWithPathOption = tableDefinition.storage.copy(
+          properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _))
+
+        val oldLocation = getLocationFromStorageProps(oldTableDef)
+        if (oldLocation == newLocation) {
+          storageWithPathOption.copy(locationUri = oldTableDef.storage.locationUri)
+        } else {
+          storageWithPathOption
+        }
       }
 
       val partitionProviderProp = if (tableDefinition.tracksPartitionsInCatalog) {
@@ -498,23 +561,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
       // to retain the spark specific format if it is. Also add old data source properties to table
       // properties, to retain the data source table format.
-      val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX))
+      val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
       val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp
       val newDef = withStatsProps.copy(
-        storage = storageWithNewLocation,
+        storage = newStorage,
         schema = oldTableDef.schema,
         partitionColumnNames = oldTableDef.partitionColumnNames,
         bucketSpec = oldTableDef.bucketSpec,
         properties = newTableProps)
 
       client.alterTable(newDef)
-    } else {
-      client.alterTable(withStatsProps)
     }
   }
 
   override def getTable(db: String, table: String): CatalogTable = withClient {
-    restoreTableMetadata(client.getTable(db, table))
+    restoreTableMetadata(getRawTable(db, table))
   }
 
   override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient {
@@ -536,28 +597,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     val tableWithSchema = if (table.tableType == VIEW) {
       table
     } else {
-      getProviderFromTableProperties(table).map { provider =>
-        assert(provider != TABLE_PARTITION_PROVIDER_CATALOG,
-          "Hive serde table should not save provider in table properties.")
-        // Internally we store the table location in storage properties with key "path" for data
-        // source tables. Here we set the table location to `locationUri` field and filter out the
-        // path option in storage properties, to avoid exposing this concept externally.
-        val storageWithLocation = {
-          val tableLocation = getLocationFromStorageProps(table)
-          updateLocationInStorageProps(table, None).copy(locationUri = tableLocation)
-        }
-        val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
-
-        table.copy(
-          storage = storageWithLocation,
-          schema = getSchemaFromTableProperties(table),
-          provider = Some(provider),
-          partitionColumnNames = getPartitionColumnsFromTableProperties(table),
-          bucketSpec = getBucketSpecFromTableProperties(table),
-          tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG)
-        )
-      } getOrElse {
-        table.copy(provider = Some("hive"), tracksPartitionsInCatalog = true)
+      getProviderFromTableProperties(table) match {
+        // No provider in table properties, which means this table is created by Spark prior to 2.1,
+        // or is created at Hive side.
+        case None =>
+          table.copy(provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true)
+
+        // This is a Hive serde table created by Spark 2.1 or higher versions.
+        case Some(DDLUtils.HIVE_PROVIDER) => restoreHiveSerdeTable(table)
+
+        // This is a regular data source table.
+        case Some(provider) => restoreDataSourceTable(table, provider)
       }
     }
 
@@ -583,6 +633,50 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     tableWithStats.copy(properties = getOriginalTableProperties(table))
   }
 
+  private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
+    val hiveTable = table.copy(
+      provider = Some(DDLUtils.HIVE_PROVIDER),
+      tracksPartitionsInCatalog = true)
+
+    val schemaFromTableProps = getSchemaFromTableProperties(table)
+    if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
+      hiveTable.copy(
+        schema = schemaFromTableProps,
+        partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+        bucketSpec = getBucketSpecFromTableProperties(table))
+    } else {
+      // Hive metastore may change the table schema, e.g. schema inference. If the table
+      // schema we read back is different(ignore case and nullability) from the one in table
+      // properties which was written when creating table, we should respect the table schema
+      // from hive.
+      logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " +
+        "different from the schema when this table was created by Spark SQL" +
+        s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema from " +
+        "Hive metastore which is not case preserving.")
+      hiveTable
+    }
+  }
+
+  private def restoreDataSourceTable(table: CatalogTable, provider: String): CatalogTable = {
+    // Internally we store the table location in storage properties with key "path" for data
+    // source tables. Here we set the table location to `locationUri` field and filter out the
+    // path option in storage properties, to avoid exposing this concept externally.
+    val storageWithLocation = {
+      val tableLocation = getLocationFromStorageProps(table)
+      // We pass None as `newPath` here, to remove the path option in storage properties.
+      updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation)
+    }
+    val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
+
+    table.copy(
+      provider = Some(provider),
+      storage = storageWithLocation,
+      schema = getSchemaFromTableProperties(table),
+      partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+      bucketSpec = getBucketSpecFromTableProperties(table),
+      tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG))
+  }
+
   override def tableExists(db: String, table: String): Boolean = withClient {
     client.tableExists(db, table)
   }
@@ -623,7 +717,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 
     val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
     getTable(db, table).partitionColumnNames.foreach { colName =>
-      orderedPartitionSpec.put(colName, partition(colName))
+      // Hive metastore is not case preserving and keeps partition columns with lower cased names,
+      // and Hive will validate the column names in partition spec to make sure they are partition
+      // columns. Here we Lowercase the column names before passing the partition spec to Hive
+      // client, to satisfy Hive.
+      orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
     }
 
     client.loadPartition(
@@ -648,7 +746,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 
     val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
     getTable(db, table).partitionColumnNames.foreach { colName =>
-      orderedPartitionSpec.put(colName, partition(colName))
+      // Hive metastore is not case preserving and keeps partition columns with lower cased names,
+      // and Hive will validate the column names in partition spec to make sure they are partition
+      // columns. Here we Lowercase the column names before passing the partition spec to Hive
+      // client, to satisfy Hive.
+      orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
     }
 
     client.loadDynamicPartitions(
@@ -754,7 +856,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       db: String,
       table: String,
       predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient {
-    val rawTable = client.getTable(db, table)
+    val rawTable = getRawTable(db, table)
     val catalogTable = restoreTableMetadata(rawTable)
     val partitionColumnNames = catalogTable.partitionColumnNames.toSet
     val nonPartitionPruningPredicates = predicates.filterNot {
diff --git a/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145 b/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145
index d3ffb995aff4be6917ca104b39f82634b1491675..93ba96ec8c15993a6b8c75e8b71e40b77e709ff2 100644
--- a/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145
+++ b/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145
@@ -1,2 +1,2 @@
-a                   	int                 	                    
-b                   	double              	                    
+A                   	int
+B                   	double
diff --git a/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16 b/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16
index d3ffb995aff4be6917ca104b39f82634b1491675..93ba96ec8c15993a6b8c75e8b71e40b77e709ff2 100644
--- a/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16
+++ b/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16
@@ -1,2 +1,2 @@
-a                   	int                 	                    
-b                   	double              	                    
+A                   	int
+B                   	double
diff --git a/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd b/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd
index d3ffb995aff4be6917ca104b39f82634b1491675..93ba96ec8c15993a6b8c75e8b71e40b77e709ff2 100644
--- a/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd
+++ b/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd
@@ -1,2 +1,2 @@
-a                   	int                 	                    
-b                   	double              	                    
+A                   	int
+B                   	double
diff --git a/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b b/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b
index 77eaef91c9c3f967e6444f0854a48dfeda7005f7..d52fcf0ebbdb32509bec4c4fa13aea3340370231 100644
--- a/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b
+++ b/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b
@@ -1,3 +1,3 @@
-a                   	array<int>          	                    
-b                   	double              	                    
-c                   	map<double,int>     	                    
+A                   	array<int>
+B                   	double
+C                   	map<double,int>
diff --git a/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da b/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da
index 70c14c3ef34ab72325b3360bec4ff2d7b182edac..2f7168cba930749dcad309d1892b840520ab37f4 100644
--- a/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da
+++ b/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da
@@ -1,3 +1,3 @@
-key                 
-value               
-ds                  
+KEY
+VALUE
+ds
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index d8e31c4e39a5c490f22e31305fbba673f40dcc14..b41bc862e9bc5c3a39800a299b0ea17b8d360631 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -105,12 +105,9 @@ class PartitionedTablePerfStatsSuite
         assert(df4.count() == 0)
         assert(df4.inputFiles.length == 0)
 
-        // TODO(ekl) enable for hive tables as well once SPARK-17983 is fixed
-        if (spec.isDatasourceTable) {
-          val df5 = spark.sql("select * from test where fieldOne = 4")
-          assert(df5.count() == 1)
-          assert(df5.inputFiles.length == 5)
-        }
+        val df5 = spark.sql("select * from test where fieldOne = 4")
+        assert(df5.count() == 1)
+        assert(df5.inputFiles.length == 5)
       }
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index cc09aef32699b99214dcd1172e0f0fcedd0645c9..28e5dffb115230be14975c21b4a81c0e31000fe6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -521,7 +521,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     val catalogTable =
       sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
     relation match {
-      case LogicalRelation(r: HadoopFsRelation, _, Some(table)) =>
+      case LogicalRelation(r: HadoopFsRelation, _, _) =>
         if (!isDataSourceTable) {
           fail(
             s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
@@ -529,7 +529,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         }
         userSpecifiedLocation match {
           case Some(location) =>
-            assert(table.storage.locationUri.get === location)
+            assert(r.options("path") === location)
           case None => // OK.
         }
         assert(catalogTable.provider.get === format)