diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 78897daec810787bf22b896dd4a9f680f6e2784a..5e8316320917555de0bf349efba8c81f0b20390f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
 
 import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
 import org.apache.spark.sql.catalyst.expressions.Expression
-
+import org.apache.spark.sql.types.StructType
 
 /**
  * Interface for the system catalog (of functions, partitions, tables, and databases).
@@ -104,6 +104,19 @@ abstract class ExternalCatalog {
    */
   def alterTable(tableDefinition: CatalogTable): Unit
 
+  /**
+   * Alter the schema of a table identified by the provided database and table name. The new schema
+   * should still contain the existing bucket columns and partition columns used by the table. This
+   * method will also update any Spark SQL-related parameters stored as Hive table properties (such
+   * as the schema itself).
+   *
+   * @param db Database that table to alter schema for exists in
+   * @param table Name of table to alter schema for
+   * @param schema Updated schema to be used for the table (must contain existing partition and
+   *               bucket columns)
+   */
+  def alterTableSchema(db: String, table: String, schema: StructType): Unit
+
   def getTable(db: String, table: String): CatalogTable
 
   def getTableOption(db: String, table: String): Option[CatalogTable]
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 9a6c732ea697136e31c352f34dd8702f29ea07fa..d700634e3c2afb70d50e7b3501ced2261aec250d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.types.StructType
 
 /**
  * An in-memory (ephemeral) implementation of the system catalog.
@@ -297,6 +298,15 @@ class InMemoryCatalog(
     catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
   }
 
+  override def alterTableSchema(
+      db: String,
+      table: String,
+      schema: StructType): Unit = synchronized {
+    requireTableExists(db, table)
+    val origTable = catalog(db).tables(table).table
+    catalog(db).tables(table).table = origTable.copy(schema = schema)
+  }
+
   override def getTable(db: String, table: String): CatalogTable = synchronized {
     requireTableExists(db, table)
     catalog(db).tables(table).table
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 5b5378c09e540ad0186e1761abd6b340fd82b4ec..aa561e57f77f5d7969e86406fc5e666888eda311 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -158,6 +158,11 @@ case class BucketSpec(
  * @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
  *                                  catalog. If false, it is inferred automatically based on file
  *                                  structure.
+ * @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive.
+ *                           When using a Hive Metastore, this flag is set to false if a case-
+ *                           sensitive schema was unable to be read from the table properties.
+ *                           Used to trigger case-sensitive schema inference at query time, when
+ *                           configured.
  */
 case class CatalogTable(
     identifier: TableIdentifier,
@@ -176,7 +181,8 @@ case class CatalogTable(
     viewText: Option[String] = None,
     comment: Option[String] = None,
     unsupportedFeatures: Seq[String] = Seq.empty,
-    tracksPartitionsInCatalog: Boolean = false) {
+    tracksPartitionsInCatalog: Boolean = false,
+    schemaPreservesCase: Boolean = true) {
 
   /** schema of this table's partition columns */
   def partitionSchema: StructType = StructType(schema.filter {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 59b52651a9fbd44434128734a68edbf5b29a60df..f0692a8e3537e950331476ee38a80c733c2d4176 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 
@@ -239,6 +239,19 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     }
   }
 
+  test("alter table schema") {
+    val catalog = newBasicCatalog()
+    val tbl1 = catalog.getTable("db2", "tbl1")
+    val newSchema = StructType(Seq(
+      StructField("new_field_1", IntegerType),
+      StructField("new_field_2", StringType),
+      StructField("a", IntegerType),
+      StructField("b", StringType)))
+    catalog.alterTableSchema("db2", "tbl1", newSchema)
+    val newTbl1 = catalog.getTable("db2", "tbl1")
+    assert(newTbl1.schema == newSchema)
+  }
+
   test("get table") {
     assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index af1eaa1f237465b077ed50cac22c9589e13f2461..37e3dfabd0b21907fb4ee59953c1820b5dc54e9f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -491,7 +491,8 @@ class TreeNodeSuite extends SparkFunSuite {
         "lastAccessTime" -> -1,
         "tracksPartitionsInCatalog" -> false,
         "properties" -> JNull,
-        "unsupportedFeatures" -> List.empty[String]))
+        "unsupportedFeatures" -> List.empty[String],
+        "schemaPreservesCase" -> JBool(true)))
 
     // For unknown case class, returns JNull.
     val bigValue = new Array[Int](10000)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 0e1fc7ae961357d33f4880d3f508a36b8783b085..2b4892ee23ba33607bc95a971f45e0b2652e186d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -486,71 +486,6 @@ object ParquetFileFormat extends Logging {
     }
   }
 
-  /**
-   * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
-   * schema and Parquet schema.
-   *
-   * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
-   * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
-   * distinguish binary and string).  This method generates a correct schema by merging Metastore
-   * schema data types and Parquet schema field names.
-   */
-  def mergeMetastoreParquetSchema(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    def schemaConflictMessage: String =
-      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
-         |${metastoreSchema.prettyJson}
-         |
-         |Parquet schema:
-         |${parquetSchema.prettyJson}
-       """.stripMargin
-
-    val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
-
-    assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
-
-    val ordinalMap = metastoreSchema.zipWithIndex.map {
-      case (field, index) => field.name.toLowerCase -> index
-    }.toMap
-
-    val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
-      ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
-
-    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
-      // Uses Parquet field names but retains Metastore data types.
-      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
-        mSchema.copy(name = pSchema.name)
-      case _ =>
-        throw new SparkException(schemaConflictMessage)
-    })
-  }
-
-  /**
-   * Returns the original schema from the Parquet file with any missing nullable fields from the
-   * Hive Metastore schema merged in.
-   *
-   * When constructing a DataFrame from a collection of structured data, the resulting object has
-   * a schema corresponding to the union of the fields present in each element of the collection.
-   * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
-   * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
-   * contain a particular nullable field in its schema despite that field being present in the
-   * table schema obtained from the Hive Metastore. This method returns a schema representing the
-   * Parquet file schema along with any additional nullable fields from the Metastore schema
-   * merged in.
-   */
-  private[parquet] def mergeMissingNullableFields(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
-    val missingFields = metastoreSchema
-      .map(_.name.toLowerCase)
-      .diff(parquetSchema.map(_.name.toLowerCase))
-      .map(fieldMap(_))
-      .filter(_.nullable)
-    StructType(parquetSchema ++ missingFields)
-  }
-
   /**
    * Reads Parquet footers in multi-threaded manner.
    * If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8d493e0d56ca24ef85a049b3dae647d579f0ef3c..c4da2bbd5ead273f4d46a921fb68aa1f99548087 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -285,6 +285,25 @@ object SQLConf {
       .longConf
       .createWithDefault(250 * 1024 * 1024)
 
+  object HiveCaseSensitiveInferenceMode extends Enumeration {
+    val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
+  }
+
+  val HIVE_CASE_SENSITIVE_INFERENCE = SQLConfigBuilder("spark.sql.hive.caseSensitiveInferenceMode")
+    .doc("Sets the action to take when a case-sensitive schema cannot be read from a Hive " +
+      "table's properties. Although Spark SQL itself is not case-sensitive, Hive compatible file " +
+      "formats such as Parquet are. Spark SQL must use a case-preserving schema when querying " +
+      "any table backed by files containing case-sensitive field names or queries may not return " +
+      "accurate results. Valid options include INFER_AND_SAVE (the default mode-- infer the " +
+      "case-sensitive schema from the underlying data files and write it back to the table " +
+      "properties), INFER_ONLY (infer the schema but don't attempt to write it to the table " +
+      "properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema " +
+      "instead of inferring).")
+    .stringConf
+    .transform(_.toUpperCase())
+    .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
+    .createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
+
   val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
     .doc("When true, enable the metadata-only query optimization that use the table's metadata " +
       "to produce the partition columns instead of table scans. It applies when all the columns " +
@@ -723,6 +742,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
 
+  def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
+    HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))
+
   def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
 
   def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 8a980a7eb538f5dcda85a3861104f336966924b3..6aa940afbb2c46b53a9642e6025cfddded966aa5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -368,88 +368,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     }
   }
 
-  test("merge with metastore schema") {
-    // Field type conflict resolution
-    assertResult(
-      StructType(Seq(
-        StructField("lowerCase", StringType),
-        StructField("UPPERCase", DoubleType, nullable = false)))) {
-
-      ParquetFileFormat.mergeMetastoreParquetSchema(
-        StructType(Seq(
-          StructField("lowercase", StringType),
-          StructField("uppercase", DoubleType, nullable = false))),
-
-        StructType(Seq(
-          StructField("lowerCase", BinaryType),
-          StructField("UPPERCase", IntegerType, nullable = true))))
-    }
-
-    // MetaStore schema is subset of parquet schema
-    assertResult(
-      StructType(Seq(
-        StructField("UPPERCase", DoubleType, nullable = false)))) {
-
-      ParquetFileFormat.mergeMetastoreParquetSchema(
-        StructType(Seq(
-          StructField("uppercase", DoubleType, nullable = false))),
-
-        StructType(Seq(
-          StructField("lowerCase", BinaryType),
-          StructField("UPPERCase", IntegerType, nullable = true))))
-    }
-
-    // Metastore schema contains additional non-nullable fields.
-    assert(intercept[Throwable] {
-      ParquetFileFormat.mergeMetastoreParquetSchema(
-        StructType(Seq(
-          StructField("uppercase", DoubleType, nullable = false),
-          StructField("lowerCase", BinaryType, nullable = false))),
-
-        StructType(Seq(
-          StructField("UPPERCase", IntegerType, nullable = true))))
-    }.getMessage.contains("detected conflicting schemas"))
-
-    // Conflicting non-nullable field names
-    intercept[Throwable] {
-      ParquetFileFormat.mergeMetastoreParquetSchema(
-        StructType(Seq(StructField("lower", StringType, nullable = false))),
-        StructType(Seq(StructField("lowerCase", BinaryType))))
-    }
-  }
-
-  test("merge missing nullable fields from Metastore schema") {
-    // Standard case: Metastore schema contains additional nullable fields not present
-    // in the Parquet file schema.
-    assertResult(
-      StructType(Seq(
-        StructField("firstField", StringType, nullable = true),
-        StructField("secondField", StringType, nullable = true),
-        StructField("thirdfield", StringType, nullable = true)))) {
-      ParquetFileFormat.mergeMetastoreParquetSchema(
-        StructType(Seq(
-          StructField("firstfield", StringType, nullable = true),
-          StructField("secondfield", StringType, nullable = true),
-          StructField("thirdfield", StringType, nullable = true))),
-        StructType(Seq(
-          StructField("firstField", StringType, nullable = true),
-          StructField("secondField", StringType, nullable = true))))
-    }
-
-    // Merge should fail if the Metastore contains any additional fields that are not
-    // nullable.
-    assert(intercept[Throwable] {
-      ParquetFileFormat.mergeMetastoreParquetSchema(
-        StructType(Seq(
-          StructField("firstfield", StringType, nullable = true),
-          StructField("secondfield", StringType, nullable = true),
-          StructField("thirdfield", StringType, nullable = false))),
-        StructType(Seq(
-          StructField("firstField", StringType, nullable = true),
-          StructField("secondField", StringType, nullable = true))))
-    }.getMessage.contains("detected conflicting schemas"))
-  }
-
   test("schema merging failure error message") {
     import testImplicits._
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index f321c45e5c518996f659413bd568d6af6afdf0cf..cbf146966bcfa16eb0251c535d92b4acd10443ee 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -597,6 +597,25 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
   }
 
+  override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
+    requireTableExists(db, table)
+    val rawTable = getRawTable(db, table)
+    val withNewSchema = rawTable.copy(schema = schema)
+    // Add table metadata such as table schema, partition columns, etc. to table properties.
+    val updatedTable = withNewSchema.copy(
+      properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
+    try {
+      client.alterTable(updatedTable)
+    } catch {
+      case NonFatal(e) =>
+        val warningMessage =
+          s"Could not alter schema of table  ${rawTable.identifier.quotedString} in a Hive " +
+            "compatible way. Updating Hive metastore in Spark SQL specific format."
+        logWarning(warningMessage, e)
+        client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
+    }
+  }
+
   override def getTable(db: String, table: String): CatalogTable = withClient {
     restoreTableMetadata(getRawTable(db, table))
   }
@@ -690,10 +709,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
           "different from the schema when this table was created by Spark SQL" +
           s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " +
           "from Hive metastore which is not case preserving.")
-        hiveTable
+        hiveTable.copy(schemaPreservesCase = false)
       }
     } else {
-      hiveTable
+      hiveTable.copy(schemaPreservesCase = false)
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 82e519c994afc87ba321b082ab9086aebe50de2d..f93922073704ad951c4e732ed5586d475dffd3e5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.hive
 
+import scala.util.control.NonFatal
+
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import com.google.common.util.concurrent.Striped
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -31,6 +34,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
 import org.apache.spark.sql.hive.orc.OrcFileFormat
+import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
 import org.apache.spark.sql.types._
 
 /**
@@ -45,6 +49,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   /** A fully qualified identifier for a table (i.e., database.tableName) */
   case class QualifiedTableName(database: String, name: String)
 
+  import HiveMetastoreCatalog._
+
   private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase
 
   def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
@@ -200,9 +206,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     val bucketSpec = None  // We don't support hive bucketed tables, only ones we write out.
 
     val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
+    val fileFormat = fileFormatClass.newInstance()
+
     val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
       val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
-
       val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
         Seq(metastoreRelation.hiveQlTable.getDataLocation)
       } else {
@@ -243,9 +250,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             }
           }
           val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
-          val dataSchema =
-            StructType(metastoreSchema
-              .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+
+          val (dataSchema, updatedTable) =
+            inferIfNeeded(metastoreRelation, options, fileFormat, Option(fileIndex))
 
           val relation = HadoopFsRelation(
             location = fileIndex,
@@ -256,7 +263,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             options = options)(sparkSession = sparkSession)
 
           val created = LogicalRelation(relation,
-            catalogTable = Some(metastoreRelation.catalogTable))
+            catalogTable = Some(updatedTable))
           cachedDataSourceTables.put(tableIdentifier, created)
           created
         }
@@ -274,16 +281,17 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
           bucketSpec,
           None)
         val logicalRelation = cached.getOrElse {
+          val (dataSchema, updatedTable) = inferIfNeeded(metastoreRelation, options, fileFormat)
           val created =
             LogicalRelation(
               DataSource(
                 sparkSession = sparkSession,
                 paths = rootPath.toString :: Nil,
-                userSpecifiedSchema = Some(metastoreRelation.schema),
+                userSpecifiedSchema = Option(dataSchema),
                 bucketSpec = bucketSpec,
                 options = options,
                 className = fileType).resolveRelation(),
-              catalogTable = Some(metastoreRelation.catalogTable))
+              catalogTable = Some(updatedTable))
 
           cachedDataSourceTables.put(tableIdentifier, created)
           created
@@ -295,6 +303,54 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
   }
 
+  private def inferIfNeeded(
+      relation: MetastoreRelation,
+      options: Map[String, String],
+      fileFormat: FileFormat,
+      fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {
+    val inferenceMode = sparkSession.sessionState.conf.caseSensitiveInferenceMode
+    val shouldInfer = (inferenceMode != NEVER_INFER) && !relation.catalogTable.schemaPreservesCase
+    val tableName = relation.catalogTable.identifier.unquotedString
+    if (shouldInfer) {
+      logInfo(s"Inferring case-sensitive schema for table $tableName (inference mode: " +
+        s"$inferenceMode)")
+      val fileIndex = fileIndexOpt.getOrElse {
+        val rootPath = new Path(relation.catalogTable.location)
+        new InMemoryFileIndex(sparkSession, Seq(rootPath), options, None)
+      }
+
+      val inferredSchema = fileFormat
+        .inferSchema(
+          sparkSession,
+          options,
+          fileIndex.listFiles(Nil).flatMap(_.files))
+        .map(mergeWithMetastoreSchema(relation.catalogTable.schema, _))
+
+      inferredSchema match {
+        case Some(schema) =>
+          if (inferenceMode == INFER_AND_SAVE) {
+            updateCatalogSchema(relation.catalogTable.identifier, schema)
+          }
+          (schema, relation.catalogTable.copy(schema = schema))
+        case None =>
+          logWarning(s"Unable to infer schema for table $tableName from file format " +
+            s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.")
+          (relation.catalogTable.schema, relation.catalogTable)
+      }
+    } else {
+      (relation.catalogTable.schema, relation.catalogTable)
+    }
+  }
+
+  private def updateCatalogSchema(identifier: TableIdentifier, schema: StructType): Unit = try {
+    val db = identifier.database.get
+    logInfo(s"Saving case-sensitive schema for table ${identifier.unquotedString}")
+    sparkSession.sharedState.externalCatalog.alterTableSchema(db, identifier.table, schema)
+  } catch {
+    case NonFatal(ex) =>
+      logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex)
+  }
+
   /**
    * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
    * data source relations for better performance.
@@ -373,3 +429,30 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     }
   }
 }
+
+private[hive] object HiveMetastoreCatalog {
+  def mergeWithMetastoreSchema(
+      metastoreSchema: StructType,
+      inferredSchema: StructType): StructType = try {
+    // Find any nullable fields in mestastore schema that are missing from the inferred schema.
+    val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
+    val missingNullables = metastoreFields
+      .filterKeys(!inferredSchema.map(_.name.toLowerCase).contains(_))
+      .values
+      .filter(_.nullable)
+    // Merge missing nullable fields to inferred schema and build a case-insensitive field map.
+    val inferredFields = StructType(inferredSchema ++ missingNullables)
+      .map(f => f.name.toLowerCase -> f).toMap
+    StructType(metastoreSchema.map(f => f.copy(name = inferredFields(f.name).name)))
+  } catch {
+    case NonFatal(_) =>
+      val msg = s"""Detected conflicting schemas when merging the schema obtained from the Hive
+         | Metastore with the one inferred from the file format. Metastore schema:
+         |${metastoreSchema.prettyJson}
+         |
+         |Inferred schema:
+         |${inferredSchema.prettyJson}
+       """.stripMargin
+      throw new SparkException(msg)
+  }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5a80c41938a65e602d214114228d71bb15262973
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode.{Value => InferenceMode, _}
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+
+class HiveSchemaInferenceSuite
+  extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
+
+  import HiveSchemaInferenceSuite._
+  import HiveExternalCatalog.DATASOURCE_SCHEMA_PREFIX
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    FileStatusCache.resetForTesting()
+  }
+
+  override def afterEach(): Unit = {
+    super.afterEach()
+    // spark.sessionState.catalog.tableRelationCache.invalidateAll()
+    FileStatusCache.resetForTesting()
+  }
+
+  private val externalCatalog = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+  private val client = externalCatalog.client
+
+  // Return a copy of the given schema with all field names converted to lower case.
+  private def lowerCaseSchema(schema: StructType): StructType = {
+    StructType(schema.map(f => f.copy(name = f.name.toLowerCase)))
+  }
+
+  // Create a Hive external test table containing the given field and partition column names.
+  // Returns a case-sensitive schema for the table.
+  private def setupExternalTable(
+      fileType: String,
+      fields: Seq[String],
+      partitionCols: Seq[String],
+      dir: File): StructType = {
+    // Treat all table fields as bigints...
+    val structFields = fields.map { field =>
+      StructField(
+        name = field,
+        dataType = LongType,
+        nullable = true,
+        metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "bigint").build())
+    }
+    // and all partition columns as ints
+    val partitionStructFields = partitionCols.map { field =>
+      StructField(
+        // Partition column case isn't preserved
+        name = field.toLowerCase,
+        dataType = IntegerType,
+        nullable = true,
+        metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, "int").build())
+    }
+    val schema = StructType(structFields ++ partitionStructFields)
+
+    // Write some test data (partitioned if specified)
+    val writer = spark.range(NUM_RECORDS)
+      .selectExpr((fields ++ partitionCols).map("id as " + _): _*)
+      .write
+      .partitionBy(partitionCols: _*)
+      .mode("overwrite")
+    fileType match {
+      case ORC_FILE_TYPE =>
+       writer.orc(dir.getAbsolutePath)
+      case PARQUET_FILE_TYPE =>
+       writer.parquet(dir.getAbsolutePath)
+    }
+
+    // Create Hive external table with lowercased schema
+    val serde = HiveSerDe.sourceToSerDe(fileType).get
+    client.createTable(
+      CatalogTable(
+        identifier = TableIdentifier(table = TEST_TABLE_NAME, database = Option(DATABASE)),
+        tableType = CatalogTableType.EXTERNAL,
+        storage = CatalogStorageFormat(
+          locationUri = Option(dir.getAbsolutePath),
+          inputFormat = serde.inputFormat,
+          outputFormat = serde.outputFormat,
+          serde = serde.serde,
+          compressed = false,
+          properties = Map("serialization.format" -> "1")),
+        schema = schema,
+        provider = Option("hive"),
+        partitionColumnNames = partitionCols.map(_.toLowerCase),
+        properties = Map.empty),
+      true)
+
+    // Add partition records (if specified)
+    if (!partitionCols.isEmpty) {
+      spark.catalog.recoverPartitions(TEST_TABLE_NAME)
+    }
+
+    // Check that the table returned by HiveExternalCatalog has schemaPreservesCase set to false
+    // and that the raw table returned by the Hive client doesn't have any Spark SQL properties
+    // set (table needs to be obtained from client since HiveExternalCatalog filters these
+    // properties out).
+    assert(!externalCatalog.getTable(DATABASE, TEST_TABLE_NAME).schemaPreservesCase)
+    val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME)
+    assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)) == Map.empty)
+    schema
+  }
+
+  private def withTestTables(
+    fileType: String)(f: (Seq[String], Seq[String], StructType) => Unit): Unit = {
+    // Test both a partitioned and unpartitioned Hive table
+    val tableFields = Seq(
+      (Seq("fieldOne"), Seq("partCol1", "partCol2")),
+      (Seq("fieldOne", "fieldTwo"), Seq.empty[String]))
+
+    tableFields.foreach { case (fields, partCols) =>
+      withTempDir { dir =>
+        val schema = setupExternalTable(fileType, fields, partCols, dir)
+        withTable(TEST_TABLE_NAME) { f(fields, partCols, schema) }
+      }
+    }
+  }
+
+  private def withFileTypes(f: (String) => Unit): Unit
+    = Seq(ORC_FILE_TYPE, PARQUET_FILE_TYPE).foreach(f)
+
+  private def withInferenceMode(mode: InferenceMode)(f: => Unit): Unit = {
+    withSQLConf(
+      HiveUtils.CONVERT_METASTORE_ORC.key -> "true",
+      SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key -> mode.toString)(f)
+  }
+
+  private val inferenceKey = SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key
+
+  private def testFieldQuery(fields: Seq[String]): Unit = {
+    if (!fields.isEmpty) {
+      val query = s"SELECT * FROM ${TEST_TABLE_NAME} WHERE ${Random.shuffle(fields).head} >= 0"
+      assert(spark.sql(query).count == NUM_RECORDS)
+    }
+  }
+
+  private def testTableSchema(expectedSchema: StructType): Unit = {
+    // Spark 2.1 doesn't add metadata for partition columns when the schema isn't read from the
+    // table properties so strip all field metadata before making the comparison.
+    val tableSchema =
+      StructType(spark.table(TEST_TABLE_NAME).schema.map(_.copy(metadata = Metadata.empty)))
+    val expected =
+      StructType(expectedSchema.map(_.copy(metadata = Metadata.empty)))
+    assert(tableSchema == expected)
+  }
+
+  withFileTypes { fileType =>
+    test(s"$fileType: schema should be inferred and saved when INFER_AND_SAVE is specified") {
+      withInferenceMode(INFER_AND_SAVE) {
+        withTestTables(fileType) { (fields, partCols, schema) =>
+          testFieldQuery(fields)
+          testFieldQuery(partCols)
+          testTableSchema(schema)
+
+          // Verify the catalog table now contains the updated schema and properties
+          val catalogTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
+          assert(catalogTable.schemaPreservesCase)
+          assert(catalogTable.schema == schema)
+          assert(catalogTable.partitionColumnNames == partCols.map(_.toLowerCase))
+        }
+      }
+    }
+  }
+
+  withFileTypes { fileType =>
+    test(s"$fileType: schema should be inferred but not stored when INFER_ONLY is specified") {
+      withInferenceMode(INFER_ONLY) {
+        withTestTables(fileType) { (fields, partCols, schema) =>
+          val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
+          testFieldQuery(fields)
+          testFieldQuery(partCols)
+          testTableSchema(schema)
+          // Catalog table shouldn't be altered
+          assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable)
+        }
+      }
+    }
+  }
+
+  withFileTypes { fileType =>
+    test(s"$fileType: schema should not be inferred when NEVER_INFER is specified") {
+      withInferenceMode(NEVER_INFER) {
+        withTestTables(fileType) { (fields, partCols, schema) =>
+          val originalTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
+          // Only check the table schema as the test queries will break
+          testTableSchema(lowerCaseSchema(schema))
+          assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == originalTable)
+        }
+      }
+    }
+  }
+
+  test("mergeWithMetastoreSchema() should return expected results") {
+    // Field type conflict resolution
+    assertResult(
+      StructType(Seq(
+        StructField("lowerCase", StringType),
+        StructField("UPPERCase", DoubleType, nullable = false)))) {
+
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(
+          StructField("lowercase", StringType),
+          StructField("uppercase", DoubleType, nullable = false))),
+
+        StructType(Seq(
+          StructField("lowerCase", BinaryType),
+          StructField("UPPERCase", IntegerType, nullable = true))))
+    }
+
+    // MetaStore schema is subset of parquet schema
+    assertResult(
+      StructType(Seq(
+        StructField("UPPERCase", DoubleType, nullable = false)))) {
+
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(
+          StructField("uppercase", DoubleType, nullable = false))),
+
+        StructType(Seq(
+          StructField("lowerCase", BinaryType),
+          StructField("UPPERCase", IntegerType, nullable = true))))
+    }
+
+    // Metastore schema contains additional non-nullable fields.
+    assert(intercept[Throwable] {
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(
+          StructField("uppercase", DoubleType, nullable = false),
+          StructField("lowerCase", BinaryType, nullable = false))),
+
+        StructType(Seq(
+          StructField("UPPERCase", IntegerType, nullable = true))))
+    }.getMessage.contains("Detected conflicting schemas"))
+
+    // Conflicting non-nullable field names
+    intercept[Throwable] {
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(StructField("lower", StringType, nullable = false))),
+        StructType(Seq(StructField("lowerCase", BinaryType))))
+    }
+
+    // Check that merging missing nullable fields works as expected.
+    assertResult(
+      StructType(Seq(
+        StructField("firstField", StringType, nullable = true),
+        StructField("secondField", StringType, nullable = true),
+        StructField("thirdfield", StringType, nullable = true)))) {
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(
+          StructField("firstfield", StringType, nullable = true),
+          StructField("secondfield", StringType, nullable = true),
+          StructField("thirdfield", StringType, nullable = true))),
+        StructType(Seq(
+          StructField("firstField", StringType, nullable = true),
+          StructField("secondField", StringType, nullable = true))))
+    }
+
+    // Merge should fail if the Metastore contains any additional fields that are not
+    // nullable.
+    assert(intercept[Throwable] {
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(
+          StructField("firstfield", StringType, nullable = true),
+          StructField("secondfield", StringType, nullable = true),
+          StructField("thirdfield", StringType, nullable = false))),
+        StructType(Seq(
+          StructField("firstField", StringType, nullable = true),
+          StructField("secondField", StringType, nullable = true))))
+    }.getMessage.contains("Detected conflicting schemas"))
+
+    // Schema merge should maintain metastore order.
+    assertResult(
+      StructType(Seq(
+        StructField("first_field", StringType, nullable = true),
+        StructField("second_field", StringType, nullable = true),
+        StructField("third_field", StringType, nullable = true),
+        StructField("fourth_field", StringType, nullable = true),
+        StructField("fifth_field", StringType, nullable = true)))) {
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(
+          StructField("first_field", StringType, nullable = true),
+          StructField("second_field", StringType, nullable = true),
+          StructField("third_field", StringType, nullable = true),
+          StructField("fourth_field", StringType, nullable = true),
+          StructField("fifth_field", StringType, nullable = true))),
+        StructType(Seq(
+          StructField("fifth_field", StringType, nullable = true),
+          StructField("third_field", StringType, nullable = true),
+          StructField("second_field", StringType, nullable = true))))
+    }
+  }
+}
+
+object HiveSchemaInferenceSuite {
+  private val NUM_RECORDS = 10
+  private val DATABASE = "default"
+  private val TEST_TABLE_NAME = "test_table"
+  private val ORC_FILE_TYPE = "orc"
+  private val PARQUET_FILE_TYPE = "parquet"
+}