diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 70d7dd23d908afc6e84933364be1216f9d3e530e..172317c34659db7f5ee71a66fe55af3c651590ce 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -642,8 +642,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       if (stats.get.rowCount.isDefined) {
         statsProperties += STATISTICS_NUM_ROWS -> stats.get.rowCount.get.toString()
       }
+
+      // For datasource tables and hive serde tables created by spark 2.1 or higher,
+      // the data schema is stored in the table properties.
+      val schema = restoreTableMetadata(rawTable).schema
+
       val colNameTypeMap: Map[String, DataType] =
-        rawTable.schema.fields.map(f => (f.name, f.dataType)).toMap
+        schema.fields.map(f => (f.name, f.dataType)).toMap
       stats.get.colStats.foreach { case (colName, colStat) =>
         colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
           statsProperties += (columnStatKeyPropName(colName, k) -> v)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 84bcea30d61a6474bc2c1e169dab0006df592d06..36566bffb93355721ccf45b7a36d976df9ee94bb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -19,10 +19,11 @@ package org.apache.spark.sql.hive
 
 import java.io.{File, PrintWriter}
 
-import org.apache.hadoop.hive.common.StatsSetupConst
 import scala.reflect.ClassTag
 import scala.util.matching.Regex
 
+import org.apache.hadoop.hive.common.StatsSetupConst
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
@@ -34,9 +35,16 @@ import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 
 
 class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
+  private def dropMetadata(schema: StructType): StructType = {
+    val newFields = schema.fields.map { f =>
+      StructField(f.name, f.dataType, f.nullable, Metadata.empty)
+    }
+    StructType(newFields)
+  }
 
   test("Hive serde tables should fallback to HDFS for size estimation") {
     withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
@@ -117,6 +125,72 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("analyze non hive compatible datasource tables") {
+    val table = "parquet_tab"
+    withTable(table) {
+      sql(
+        s"""
+          |CREATE TABLE $table (a int, b int)
+          |USING parquet
+          |OPTIONS (skipHiveMetadata true)
+        """.stripMargin)
+
+      // Verify that the schema stored in catalog is a dummy one used for
+      // data source tables. The actual schema is stored in table properties.
+      val rawSchema = dropMetadata(hiveClient.getTable("default", table).schema)
+      val expectedRawSchema = new StructType()
+        .add("col", "array<string>")
+      assert(rawSchema == expectedRawSchema)
+
+      val actualSchema = spark.sharedState.externalCatalog.getTable("default", table).schema
+      val expectedActualSchema = new StructType()
+        .add("a", "int")
+        .add("b", "int")
+      assert(actualSchema == expectedActualSchema)
+
+      sql(s"INSERT INTO $table VALUES (1, 1)")
+      sql(s"INSERT INTO $table VALUES (2, 1)")
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS a, b")
+      val fetchedStats0 =
+        checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2))
+      assert(fetchedStats0.get.colStats == Map(
+        "a" -> ColumnStat(2, Some(1), Some(2), 0, 4, 4),
+        "b" -> ColumnStat(1, Some(1), Some(1), 0, 4, 4)))
+    }
+  }
+
+  test("Analyze hive serde tables when schema is not same as schema in table properties") {
+
+    val table = "hive_serde"
+    withTable(table) {
+      sql(s"CREATE TABLE $table (C1 INT, C2 STRING, C3 DOUBLE)")
+
+      // Verify that the table schema stored in hive catalog is
+      // different than the schema stored in table properties.
+      val rawSchema = dropMetadata(hiveClient.getTable("default", table).schema)
+      val expectedRawSchema = new StructType()
+        .add("c1", "int")
+        .add("c2", "string")
+        .add("c3", "double")
+      assert(rawSchema == expectedRawSchema)
+
+      val actualSchema = spark.sharedState.externalCatalog.getTable("default", table).schema
+      val expectedActualSchema = new StructType()
+        .add("C1", "int")
+        .add("C2", "string")
+        .add("C3", "double")
+      assert(actualSchema == expectedActualSchema)
+
+      sql(s"INSERT INTO TABLE $table SELECT 1, 'a', 10.0")
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS C1")
+      val fetchedStats1 =
+        checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
+      assert(fetchedStats1.colStats == Map(
+        "C1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
+          avgLen = 4, maxLen = 4)))
+    }
+  }
+
   test("SPARK-21079 - analyze table with location different than that of individual partitions") {
     val tableName = "analyzeTable_part"
     withTable(tableName) {