diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 5a8c4e7610fff09edbb2cf1071b6c0e6b5035907..1965144e811974d6a68f16bba31ad99be5b5ebb9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -91,12 +91,14 @@ object CatalogStorageFormat {
  *
  * @param spec partition spec values indexed by column name
  * @param storage storage format of the partition
- * @param parameters some parameters for the partition, for example, stats.
+ * @param parameters some parameters for the partition
+ * @param stats optional statistics (number of rows, total size, etc.)
  */
 case class CatalogTablePartition(
     spec: CatalogTypes.TablePartitionSpec,
     storage: CatalogStorageFormat,
-    parameters: Map[String, String] = Map.empty) {
+    parameters: Map[String, String] = Map.empty,
+    stats: Option[CatalogStatistics] = None) {
 
   def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
     val map = new mutable.LinkedHashMap[String, String]()
@@ -106,6 +108,7 @@ case class CatalogTablePartition(
     if (parameters.nonEmpty) {
       map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}")
     }
+    stats.foreach(s => map.put("Partition Statistics", s.simpleString))
     map
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index d4414b6f78ca2470f028aaa19c3e371d3a42b4b0..8379e740a0717e57aa8b6d78ea6bbc182acc921b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -90,30 +90,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Create an [[AnalyzeTableCommand]] command, or an [[AnalyzePartitionCommand]]
+   * or an [[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing a table or a set of partitions :
    * {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
    * }}}
+   *
    * Example SQL for analyzing columns :
    * {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2;
    * }}}
    */
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
-    if (ctx.partitionSpec != null) {
-      logWarning(s"Partition specification is ignored: ${ctx.partitionSpec.getText}")
+    if (ctx.identifier != null &&
+        ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+      throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
     }
-    if (ctx.identifier != null) {
-      if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-        throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
+
+    val table = visitTableIdentifier(ctx.tableIdentifier)
+    if (ctx.identifierSeq() == null) {
+      if (ctx.partitionSpec != null) {
+        AnalyzePartitionCommand(table, visitPartitionSpec(ctx.partitionSpec),
+          noscan = ctx.identifier != null)
+      } else {
+        AnalyzeTableCommand(table, noscan = ctx.identifier != null)
       }
-      AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
-    } else if (ctx.identifierSeq() == null) {
-      AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), noscan = false)
     } else {
+      if (ctx.partitionSpec != null) {
+        logWarning("Partition specification is ignored when collecting column statistics: " +
+          ctx.partitionSpec.getText)
+      }
       AnalyzeColumnCommand(
-        visitTableIdentifier(ctx.tableIdentifier),
+        table,
         visitIdentifierSeq(ctx.identifierSeq()))
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5b54b2270b5ecc7c49ace11c2c3823769b45688e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
+
+/**
+ * Analyzes a given set of partitions to generate per-partition statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an `AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes are calculated. When `noscan`
+ * is `true`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+    tableIdent: TableIdentifier,
+    partitionSpec: Map[String, Option[String]],
+    noscan: Boolean = true) extends RunnableCommand {
+
+  private def getPartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = {
+    val normalizedPartitionSpec =
+      PartitioningUtils.normalizePartitionSpec(partitionSpec, table.partitionColumnNames,
+        table.identifier.quotedString, conf.resolver)
+
+    // Report an error if partition columns in partition specification do not form
+    // a prefix of the list of partition columns defined in the table schema
+    val isNotSpecified =
+      table.partitionColumnNames.map(normalizedPartitionSpec.getOrElse(_, None).isEmpty)
+    if (isNotSpecified.init.zip(isNotSpecified.tail).contains((true, false))) {
+      val tableId = table.identifier
+      val schemaColumns = table.partitionColumnNames.mkString(",")
+      val specColumns = normalizedPartitionSpec.keys.mkString(",")
+      throw new AnalysisException("The list of partition columns with values " +
+        s"in partition specification for table '${tableId.table}' " +
+        s"in database '${tableId.database.get}' is not a prefix of the list of " +
+        "partition columns defined in the table schema. " +
+        s"Expected a prefix of [${schemaColumns}], but got [${specColumns}].")
+    }
+
+    val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).mapValues(_.get)
+    if (filteredSpec.isEmpty) {
+      None
+    } else {
+      Some(filteredSpec)
+    }
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val sessionState = sparkSession.sessionState
+    val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+    val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+    val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+    if (tableMeta.tableType == CatalogTableType.VIEW) {
+      throw new AnalysisException("ANALYZE TABLE is not supported on views.")
+    }
+
+    val partitionValueSpec = getPartitionSpec(tableMeta)
+
+    val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec)
+
+    if (partitions.isEmpty) {
+      if (partitionValueSpec.isDefined) {
+        throw new NoSuchPartitionException(db, tableIdent.table, partitionValueSpec.get)
+      } else {
+        // the user requested to analyze all partitions for a table which has no partitions
+        // return normally, since there is nothing to do
+        return Seq.empty[Row]
+      }
+    }
+
+    // Compute statistics for individual partitions
+    val rowCounts: Map[TablePartitionSpec, BigInt] =
+      if (noscan) {
+        Map.empty
+      } else {
+        calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
+      }
+
+    // Update the metastore if newly computed statistics are different from those
+    // recorded in the metastore.
+    val newPartitions = partitions.flatMap { p =>
+      val newTotalSize = CommandUtils.calculateLocationSize(
+        sessionState, tableMeta.identifier, p.storage.locationUri)
+      val newRowCount = rowCounts.get(p.spec)
+      val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
+      newStats.map(_ => p.copy(stats = newStats))
+    }
+
+    if (newPartitions.nonEmpty) {
+      sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
+    }
+
+    Seq.empty[Row]
+  }
+
+  private def calculateRowCountsPerPartition(
+      sparkSession: SparkSession,
+      tableMeta: CatalogTable,
+      partitionValueSpec: Option[TablePartitionSpec]): Map[TablePartitionSpec, BigInt] = {
+    val filter = if (partitionValueSpec.isDefined) {
+      val filters = partitionValueSpec.get.map {
+        case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value))
+      }
+      filters.reduce(And)
+    } else {
+      Literal.TrueLiteral
+    }
+
+    val tableDf = sparkSession.table(tableMeta.identifier)
+    val partitionColumns = tableMeta.partitionColumnNames.map(Column(_))
+
+    val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: _*).count()
+
+    df.collect().map { r =>
+      val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString)
+      val spec = tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap
+      val count = BigInt(r.getLong(partitionColumns.size))
+      (spec, count)
+    }.toMap
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index cba147c35dd9938b772c11496452dc45bbace5ce..04715bd314d4d1ba5cdd320ab8a74c78f7781214 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 
 
 /**
@@ -37,31 +37,15 @@ case class AnalyzeTableCommand(
     if (tableMeta.tableType == CatalogTableType.VIEW) {
       throw new AnalysisException("ANALYZE TABLE is not supported on views.")
     }
+
+    // Compute stats for the whole table
     val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta)
+    val newRowCount =
+      if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
 
-    val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(-1L)
-    val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
-    var newStats: Option[CatalogStatistics] = None
-    if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
-      newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
-    }
-    // We only set rowCount when noscan is false, because otherwise:
-    // 1. when total size is not changed, we don't need to alter the table;
-    // 2. when total size is changed, `oldRowCount` becomes invalid.
-    // This is to make sure that we only record the right statistics.
-    if (!noscan) {
-      val newRowCount = sparkSession.table(tableIdentWithDB).count()
-      if (newRowCount >= 0 && newRowCount != oldRowCount) {
-        newStats = if (newStats.isDefined) {
-          newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
-        } else {
-          Some(CatalogStatistics(
-            sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
-        }
-      }
-    }
     // Update the metastore if the above statistics of the table are different from those
     // recorded in the metastore.
+    val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
     if (newStats.isDefined) {
       sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
       // Refresh the cached data source table in the catalog.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index de45be85220e9d759ded403b482a85363d2a0aaf..b22958d59336c1d9ed98334d673157b1b1d58517 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition}
 import org.apache.spark.sql.internal.SessionState
 
 
@@ -112,4 +112,29 @@ object CommandUtils extends Logging {
     size
   }
 
+  def compareAndGetNewStats(
+      oldStats: Option[CatalogStatistics],
+      newTotalSize: BigInt,
+      newRowCount: Option[BigInt]): Option[CatalogStatistics] = {
+    val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(-1L)
+    val oldRowCount = oldStats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+    var newStats: Option[CatalogStatistics] = None
+    if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
+      newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
+    }
+    // We only set rowCount when noscan is false, because otherwise:
+    // 1. when total size is not changed, we don't need to alter the table;
+    // 2. when total size is changed, `oldRowCount` becomes invalid.
+    // This is to make sure that we only record the right statistics.
+    if (newRowCount.isDefined) {
+      if (newRowCount.get >= 0 && newRowCount.get != oldRowCount) {
+        newStats = if (newStats.isDefined) {
+          newStats.map(_.copy(rowCount = newRowCount))
+        } else {
+          Some(CatalogStatistics(sizeInBytes = oldTotalSize, rowCount = newRowCount))
+        }
+      }
+    }
+    newStats
+  }
 }
diff --git a/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql b/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql
new file mode 100644
index 0000000000000000000000000000000000000000..f4239da9062763aa97a6db87199b671c380c8ec0
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql
@@ -0,0 +1,34 @@
+CREATE TABLE t (key STRING, value STRING, ds STRING, hr INT) USING parquet
+    PARTITIONED BY (ds, hr);
+
+INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=10)
+VALUES ('k1', 100), ('k2', 200), ('k3', 300);
+
+INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=11)
+VALUES ('k1', 101), ('k2', 201), ('k3', 301), ('k4', 401);
+
+INSERT INTO TABLE t PARTITION (ds='2017-09-01', hr=5)
+VALUES ('k1', 102), ('k2', 202);
+
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
+
+-- Collect stats for a single partition
+ANALYZE TABLE t PARTITION (ds='2017-08-01', hr=10) COMPUTE STATISTICS;
+
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
+
+-- Collect stats for 2 partitions
+ANALYZE TABLE t PARTITION (ds='2017-08-01') COMPUTE STATISTICS;
+
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11);
+
+-- Collect stats for all partitions
+ANALYZE TABLE t PARTITION (ds, hr) COMPUTE STATISTICS;
+
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11);
+DESC EXTENDED t PARTITION (ds='2017-09-01', hr=5);
+
+-- DROP TEST TABLES/VIEWS
+DROP TABLE t;
diff --git a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
new file mode 100644
index 0000000000000000000000000000000000000000..51dac111029e800a20451ad9edb5c018104ce4f5
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
@@ -0,0 +1,244 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 15
+
+
+-- !query 0
+CREATE TABLE t (key STRING, value STRING, ds STRING, hr INT) USING parquet
+    PARTITIONED BY (ds, hr)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=10)
+VALUES ('k1', 100), ('k2', 200), ('k3', 300)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=11)
+VALUES ('k1', 101), ('k2', 201), ('k3', 301), ('k4', 401)
+-- !query 2 schema
+struct<>
+-- !query 2 output
+
+
+
+-- !query 3
+INSERT INTO TABLE t PARTITION (ds='2017-09-01', hr=5)
+VALUES ('k1', 102), ('k2', 202)
+-- !query 3 schema
+struct<>
+-- !query 3 output
+
+
+
+-- !query 4
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10)
+-- !query 4 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 4 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-08-01, hr=10]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 5
+ANALYZE TABLE t PARTITION (ds='2017-08-01', hr=10) COMPUTE STATISTICS
+-- !query 5 schema
+struct<>
+-- !query 5 output
+
+
+
+-- !query 6
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10)
+-- !query 6 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 6 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-08-01, hr=10]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10	                    
+Partition Statistics	1067 bytes, 3 rows  	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 7
+ANALYZE TABLE t PARTITION (ds='2017-08-01') COMPUTE STATISTICS
+-- !query 7 schema
+struct<>
+-- !query 7 output
+
+
+
+-- !query 8
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10)
+-- !query 8 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 8 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-08-01, hr=10]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10	                    
+Partition Statistics	1067 bytes, 3 rows  	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 9
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11)
+-- !query 9 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 9 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-08-01, hr=11]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11	                    
+Partition Statistics	1080 bytes, 4 rows  	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 10
+ANALYZE TABLE t PARTITION (ds, hr) COMPUTE STATISTICS
+-- !query 10 schema
+struct<>
+-- !query 10 output
+
+
+
+-- !query 11
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10)
+-- !query 11 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 11 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-08-01, hr=10]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10	                    
+Partition Statistics	1067 bytes, 3 rows  	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 12
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11)
+-- !query 12 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 12 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-08-01, hr=11]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11	                    
+Partition Statistics	1080 bytes, 4 rows  	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 13
+DESC EXTENDED t PARTITION (ds='2017-09-01', hr=5)
+-- !query 13 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 13 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-09-01, hr=5]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-09-01/hr=5	                    
+Partition Statistics	1054 bytes, 2 rows  	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 14
+DROP TABLE t
+-- !query 14 schema
+struct<>
+-- !query 14 output
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index d238c76fbeeffff631c04826e38317133d66eb09..fa7a866f4d551833150025056038c413ec2c78eb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -259,17 +259,33 @@ class SparkSqlParserSuite extends AnalysisTest {
     assertEqual("analyze table t compute statistics noscan",
       AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
     assertEqual("analyze table t partition (a) compute statistics nOscAn",
-      AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+      AnalyzePartitionCommand(TableIdentifier("t"), Map("a" -> None), noscan = true))
 
-    // Partitions specified - we currently parse them but don't do anything with it
+    // Partitions specified
     assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
-      AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
+        partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11"))))
     assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
-      AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
+        partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11"))))
+    assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09') COMPUTE STATISTICS noscan",
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
+        partitionSpec = Map("ds" -> Some("2008-04-09"))))
+    assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS",
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
+        partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None)))
+    assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS noscan",
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
+        partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None)))
+    assertEqual("ANALYZE TABLE t PARTITION(ds, hr=11) COMPUTE STATISTICS noscan",
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
+        partitionSpec = Map("ds" -> None, "hr" -> Some("11"))))
     assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
-      AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
+        partitionSpec = Map("ds" -> None, "hr" -> None)))
     assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan",
-      AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
+        partitionSpec = Map("ds" -> None, "hr" -> None)))
 
     intercept("analyze table t compute statistics xxxx",
       "Expected `NOSCAN` instead of `xxxx`")
@@ -282,6 +298,11 @@ class SparkSqlParserSuite extends AnalysisTest {
 
     assertEqual("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS key, value",
       AnalyzeColumnCommand(TableIdentifier("t"), Seq("key", "value")))
+
+    // Partition specified - should be ignored
+    assertEqual("ANALYZE TABLE t PARTITION(ds='2017-06-10') " +
+      "COMPUTE STATISTICS FOR COLUMNS key, value",
+      AnalyzeColumnCommand(TableIdentifier("t"), Seq("key", "value")))
   }
 
   test("query organization") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index e9d48f95aa905e70d43e24b7ac73ad00744f9e6b..547447b31f0a1dbb1ce8e16dbf88991a7ea45338 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -639,26 +639,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     requireTableExists(db, table)
     val rawTable = getRawTable(db, table)
 
-    // convert table statistics to properties so that we can persist them through hive client
-    val statsProperties = new mutable.HashMap[String, String]()
-    if (stats.isDefined) {
-      statsProperties += STATISTICS_TOTAL_SIZE -> stats.get.sizeInBytes.toString()
-      if (stats.get.rowCount.isDefined) {
-        statsProperties += STATISTICS_NUM_ROWS -> stats.get.rowCount.get.toString()
-      }
-
-      // For datasource tables and hive serde tables created by spark 2.1 or higher,
-      // the data schema is stored in the table properties.
-      val schema = restoreTableMetadata(rawTable).schema
+    // For datasource tables and hive serde tables created by spark 2.1 or higher,
+    // the data schema is stored in the table properties.
+    val schema = restoreTableMetadata(rawTable).schema
 
-      val colNameTypeMap: Map[String, DataType] =
-        schema.fields.map(f => (f.name, f.dataType)).toMap
-      stats.get.colStats.foreach { case (colName, colStat) =>
-        colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
-          statsProperties += (columnStatKeyPropName(colName, k) -> v)
-        }
+    // convert table statistics to properties so that we can persist them through hive client
+    var statsProperties =
+      if (stats.isDefined) {
+        statsToProperties(stats.get, schema)
+      } else {
+        new mutable.HashMap[String, String]()
       }
-    }
 
     val oldTableNonStatsProps = rawTable.properties.filterNot(_._1.startsWith(STATISTICS_PREFIX))
     val updatedTable = rawTable.copy(properties = oldTableNonStatsProps ++ statsProperties)
@@ -704,36 +695,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     val version: String = table.properties.getOrElse(CREATED_SPARK_VERSION, "2.2 or prior")
 
     // Restore Spark's statistics from information in Metastore.
-    val statsProps = table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
-
-    // Currently we have two sources of statistics: one from Hive and the other from Spark.
-    // In our design, if Spark's statistics is available, we respect it over Hive's statistics.
-    if (statsProps.nonEmpty) {
-      val colStats = new mutable.HashMap[String, ColumnStat]
-
-      // For each column, recover its column stats. Note that this is currently a O(n^2) operation,
-      // but given the number of columns it usually not enormous, this is probably OK as a start.
-      // If we want to map this a linear operation, we'd need a stronger contract between the
-      // naming convention used for serialization.
-      table.schema.foreach { field =>
-        if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) {
-          // If "version" field is defined, then the column stat is defined.
-          val keyPrefix = columnStatKeyPropName(field.name, "")
-          val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
-            (k.drop(keyPrefix.length), v)
-          }
-
-          ColumnStat.fromMap(table.identifier.table, field, colStatMap).foreach {
-            colStat => colStats += field.name -> colStat
-          }
-        }
-      }
-
-      table = table.copy(
-        stats = Some(CatalogStatistics(
-          sizeInBytes = BigInt(table.properties(STATISTICS_TOTAL_SIZE)),
-          rowCount = table.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
-          colStats = colStats.toMap)))
+    val restoredStats =
+      statsFromProperties(table.properties, table.identifier.table, table.schema)
+    if (restoredStats.isDefined) {
+      table = table.copy(stats = restoredStats)
     }
 
     // Get the original table properties as defined by the user.
@@ -1037,17 +1002,92 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     currentFullPath
   }
 
+  private def statsToProperties(
+      stats: CatalogStatistics,
+      schema: StructType): Map[String, String] = {
+
+    var statsProperties: Map[String, String] =
+      Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
+    if (stats.rowCount.isDefined) {
+      statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
+    }
+
+    val colNameTypeMap: Map[String, DataType] =
+      schema.fields.map(f => (f.name, f.dataType)).toMap
+    stats.colStats.foreach { case (colName, colStat) =>
+      colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
+        statsProperties += (columnStatKeyPropName(colName, k) -> v)
+      }
+    }
+
+    statsProperties
+  }
+
+  private def statsFromProperties(
+      properties: Map[String, String],
+      table: String,
+      schema: StructType): Option[CatalogStatistics] = {
+
+    val statsProps = properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+    if (statsProps.isEmpty) {
+      None
+    } else {
+
+      val colStats = new mutable.HashMap[String, ColumnStat]
+
+      // For each column, recover its column stats. Note that this is currently a O(n^2) operation,
+      // but given the number of columns it usually not enormous, this is probably OK as a start.
+      // If we want to map this a linear operation, we'd need a stronger contract between the
+      // naming convention used for serialization.
+      schema.foreach { field =>
+        if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) {
+          // If "version" field is defined, then the column stat is defined.
+          val keyPrefix = columnStatKeyPropName(field.name, "")
+          val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
+            (k.drop(keyPrefix.length), v)
+          }
+
+          ColumnStat.fromMap(table, field, colStatMap).foreach {
+            colStat => colStats += field.name -> colStat
+          }
+        }
+      }
+
+      Some(CatalogStatistics(
+        sizeInBytes = BigInt(statsProps(STATISTICS_TOTAL_SIZE)),
+        rowCount = statsProps.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
+        colStats = colStats.toMap))
+    }
+  }
+
   override def alterPartitions(
       db: String,
       table: String,
       newParts: Seq[CatalogTablePartition]): Unit = withClient {
     val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+
+    val rawTable = getRawTable(db, table)
+
+    // For datasource tables and hive serde tables created by spark 2.1 or higher,
+    // the data schema is stored in the table properties.
+    val schema = restoreTableMetadata(rawTable).schema
+
+    // convert partition statistics to properties so that we can persist them through hive api
+    val withStatsProps = lowerCasedParts.map(p => {
+      if (p.stats.isDefined) {
+        val statsProperties = statsToProperties(p.stats.get, schema)
+        p.copy(parameters = p.parameters ++ statsProperties)
+      } else {
+        p
+      }
+    })
+
     // Note: Before altering table partitions in Hive, you *must* set the current database
     // to the one that contains the table of interest. Otherwise you will end up with the
     // most helpful error message ever: "Unable to alter partition. alter is not possible."
     // See HIVE-2742 for more detail.
     client.setCurrentDatabase(db)
-    client.alterPartitions(db, table, lowerCasedParts)
+    client.alterPartitions(db, table, withStatsProps)
   }
 
   override def getPartition(
@@ -1055,7 +1095,34 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       table: String,
       spec: TablePartitionSpec): CatalogTablePartition = withClient {
     val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
-    part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+    restorePartitionMetadata(part, getTable(db, table))
+  }
+
+  /**
+   * Restores partition metadata from the partition properties.
+   *
+   * Reads partition-level statistics from partition properties, puts these
+   * into [[CatalogTablePartition#stats]] and removes these special entries
+   * from the partition properties.
+   */
+  private def restorePartitionMetadata(
+      partition: CatalogTablePartition,
+      table: CatalogTable): CatalogTablePartition = {
+    val restoredSpec = restorePartitionSpec(partition.spec, table.partitionColumnNames)
+
+    // Restore Spark's statistics from information in Metastore.
+    // Note: partition-level statistics were introduced in 2.3.
+    val restoredStats =
+      statsFromProperties(partition.parameters, table.identifier.table, table.schema)
+    if (restoredStats.isDefined) {
+      partition.copy(
+        spec = restoredSpec,
+        stats = restoredStats,
+        parameters = partition.parameters.filterNot {
+          case (key, _) => key.startsWith(SPARK_SQL_PREFIX) })
+    } else {
+      partition.copy(spec = restoredSpec)
+    }
   }
 
   /**
@@ -1066,7 +1133,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       table: String,
       spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
     client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
-      part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+      restorePartitionMetadata(part, getTable(db, table))
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 5e5c0a2a5078cfae67c148f17c0feffed9a22a65..995280e0e9416a2d6e2e6c30ff73f10fe8436956 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -21,6 +21,7 @@ import java.io.{File, PrintStream}
 import java.util.Locale
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.conf.Configuration
@@ -960,6 +961,7 @@ private[hive] object HiveClientImpl {
     tpart.setTableName(ht.getTableName)
     tpart.setValues(partValues.asJava)
     tpart.setSd(storageDesc)
+    tpart.setParameters(mutable.Map(p.parameters.toSeq: _*).asJava)
     new HivePartition(ht, tpart)
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 71cf79c473b461621a4713da533125ba8ca06c73..dc6140756d519df21d130da6dc11f9dfff35e22a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
 import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation}
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
 import org.apache.spark.sql.catalyst.util.StringUtils
@@ -256,6 +257,259 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("analyze single partition") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): CatalogStatistics = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats.get
+    }
+
+    def createPartition(ds: String, query: String): Unit = {
+      sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+    }
+
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)")
+
+      createPartition("2010-01-01", "SELECT '1', 'A' from src")
+      createPartition("2010-01-02", "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
+      createPartition("2010-01-03", "SELECT '1', 'A' from src")
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN")
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS NOSCAN")
+
+      assert(queryStats("2010-01-01").rowCount === None)
+      assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+      assert(queryStats("2010-01-02").rowCount === None)
+      assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS")
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS")
+
+      assert(queryStats("2010-01-01").rowCount.get === 500)
+      assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+      assert(queryStats("2010-01-02").rowCount.get === 2*500)
+      assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+    }
+  }
+
+  test("analyze a set of partitions") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+      val tableId = TableIdentifier(tableName)
+      val partition =
+        spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, "hr" -> hr))
+      partition.stats
+    }
+
+    def assertPartitionStats(
+        ds: String,
+        hr: String,
+        rowCount: Option[BigInt],
+        sizeInBytes: BigInt): Unit = {
+      val stats = queryStats(ds, hr).get
+      assert(stats.rowCount === rowCount)
+      assert(stats.sizeInBytes === sizeInBytes)
+    }
+
+    def createPartition(ds: String, hr: Int, query: String): Unit = {
+      sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) $query")
+    }
+
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)")
+
+      createPartition("2010-01-01", 10, "SELECT '1', 'A' from src")
+      createPartition("2010-01-01", 11, "SELECT '1', 'A' from src")
+      createPartition("2010-01-02", 10, "SELECT '1', 'A' from src")
+      createPartition("2010-01-02", 11,
+        "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN")
+
+      assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000)
+      assert(queryStats("2010-01-02", "10") === None)
+      assert(queryStats("2010-01-02", "11") === None)
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS NOSCAN")
+
+      assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "10", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "11", rowCount = None, sizeInBytes = 2*2000)
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS")
+
+      assertPartitionStats("2010-01-01", "10", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "10", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "11", rowCount = None, sizeInBytes = 2*2000)
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS")
+
+      assertPartitionStats("2010-01-01", "10", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "10", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "11", rowCount = Some(2*500), sizeInBytes = 2*2000)
+    }
+  }
+
+  test("analyze all partitions") {
+    val tableName = "analyzeTable_part"
+
+    def assertPartitionStats(
+        ds: String,
+        hr: String,
+        rowCount: Option[BigInt],
+        sizeInBytes: BigInt): Unit = {
+      val stats = spark.sessionState.catalog.getPartition(TableIdentifier(tableName),
+        Map("ds" -> ds, "hr" -> hr)).stats.get
+      assert(stats.rowCount === rowCount)
+      assert(stats.sizeInBytes === sizeInBytes)
+    }
+
+    def createPartition(ds: String, hr: Int, query: String): Unit = {
+      sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) $query")
+    }
+
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)")
+
+      createPartition("2010-01-01", 10, "SELECT '1', 'A' from src")
+      createPartition("2010-01-01", 11, "SELECT '1', 'A' from src")
+      createPartition("2010-01-02", 10, "SELECT '1', 'A' from src")
+      createPartition("2010-01-02", 11,
+        "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds, hr) COMPUTE STATISTICS NOSCAN")
+
+      assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "11", rowCount = None, sizeInBytes = 2*2000)
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds, hr) COMPUTE STATISTICS")
+
+      assertPartitionStats("2010-01-01", "10", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "11", rowCount = Some(2*500), sizeInBytes = 2*2000)
+    }
+  }
+
+  test("analyze partitions for an empty table") {
+    val tableName = "analyzeTable_part"
+
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)")
+
+      // make sure there is no exception
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds) COMPUTE STATISTICS NOSCAN")
+
+      // make sure there is no exception
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds) COMPUTE STATISTICS")
+    }
+  }
+
+  test("analyze partitions case sensitivity") {
+    val tableName = "analyzeTable_part"
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)")
+
+      sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src")
+
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+        sql(s"ANALYZE TABLE $tableName PARTITION (DS='2010-01-01') COMPUTE STATISTICS")
+      }
+
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+        val message = intercept[AnalysisException] {
+          sql(s"ANALYZE TABLE $tableName PARTITION (DS='2010-01-01') COMPUTE STATISTICS")
+        }.getMessage
+        assert(message.contains(
+          s"DS is not a valid partition column in table `default`.`${tableName.toLowerCase}`"))
+      }
+    }
+  }
+
+  test("analyze partial partition specifications") {
+
+    val tableName = "analyzeTable_part"
+
+    def assertAnalysisException(partitionSpec: String): Unit = {
+      val message = intercept[AnalysisException] {
+        sql(s"ANALYZE TABLE $tableName $partitionSpec COMPUTE STATISTICS")
+      }.getMessage
+      assert(message.contains("The list of partition columns with values " +
+        s"in partition specification for table '${tableName.toLowerCase}' in database 'default' " +
+        "is not a prefix of the list of partition columns defined in the table schema"))
+    }
+
+    withTable(tableName) {
+      sql(
+        s"""
+           |CREATE TABLE $tableName (key STRING, value STRING)
+           |PARTITIONED BY (a STRING, b INT, c STRING)
+         """.stripMargin)
+
+      sql(s"INSERT INTO TABLE $tableName PARTITION (a='a1', b=10, c='c1') SELECT * FROM src")
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (a='a1') COMPUTE STATISTICS")
+      sql(s"ANALYZE TABLE $tableName PARTITION (a='a1', b=10) COMPUTE STATISTICS")
+      sql(s"ANALYZE TABLE $tableName PARTITION (A='a1', b=10) COMPUTE STATISTICS")
+      sql(s"ANALYZE TABLE $tableName PARTITION (b=10, a='a1') COMPUTE STATISTICS")
+      sql(s"ANALYZE TABLE $tableName PARTITION (b=10, A='a1') COMPUTE STATISTICS")
+
+      assertAnalysisException("PARTITION (b=10)")
+      assertAnalysisException("PARTITION (a, b=10)")
+      assertAnalysisException("PARTITION (b=10, c='c1')")
+      assertAnalysisException("PARTITION (a, b=10, c='c1')")
+      assertAnalysisException("PARTITION (c='c1')")
+      assertAnalysisException("PARTITION (a, b, c='c1')")
+      assertAnalysisException("PARTITION (a='a1', c='c1')")
+      assertAnalysisException("PARTITION (a='a1', b, c='c1')")
+    }
+  }
+
+  test("analyze non-existent partition") {
+
+    def assertAnalysisException(analyzeCommand: String, errorMessage: String): Unit = {
+      val message = intercept[AnalysisException] {
+        sql(analyzeCommand)
+      }.getMessage
+      assert(message.contains(errorMessage))
+    }
+
+    val tableName = "analyzeTable_part"
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)")
+
+      sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src")
+
+      assertAnalysisException(
+        s"ANALYZE TABLE $tableName PARTITION (hour=20) COMPUTE STATISTICS",
+        s"hour is not a valid partition column in table `default`.`${tableName.toLowerCase}`"
+      )
+
+      assertAnalysisException(
+        s"ANALYZE TABLE $tableName PARTITION (hour) COMPUTE STATISTICS",
+        s"hour is not a valid partition column in table `default`.`${tableName.toLowerCase}`"
+      )
+
+      intercept[NoSuchPartitionException] {
+        sql(s"ANALYZE TABLE $tableName PARTITION (ds='2011-02-30') COMPUTE STATISTICS")
+      }
+    }
+  }
+
   test("test table-level statistics for hive tables created in HiveExternalCatalog") {
     val textTable = "textTable"
     withTable(textTable) {