diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 83e01f95c06af16261df2ede7625008843e9e5a8..8408d765d4918489a36b1cdb29c73f769714df3d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -81,10 +81,12 @@ object CatalogStorageFormat {
  *
  * @param spec partition spec values indexed by column name
  * @param storage storage format of the partition
+ * @param parameters some parameters for the partition, for example, stats.
  */
 case class CatalogTablePartition(
     spec: CatalogTypes.TablePartitionSpec,
-    storage: CatalogStorageFormat)
+    storage: CatalogStorageFormat,
+    parameters: Map[String, String] = Map.empty)
 
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 3817f919f3a5a1e3ae1ef770aab700a6053368d5..53fb684eb5ce3fbc09ac0190b7c231a1e3333cd8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.sql.execution.command
 
-import scala.collection.GenSeq
+import scala.collection.{GenMap, GenSeq}
 import scala.collection.parallel.ForkJoinTaskSupport
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.control.NonFatal
 
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
 
 // Note: The definition of these commands are based on the ones described in
 // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -422,6 +424,9 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+
+case class PartitionStatistics(numFiles: Int, totalSize: Long)
+
 /**
  * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
  * update the catalog.
@@ -435,6 +440,31 @@ case class AlterTableDropPartitionCommand(
 case class AlterTableRecoverPartitionsCommand(
     tableName: TableIdentifier,
     cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
+
+  // These are list of statistics that can be collected quickly without requiring a scan of the data
+  // see https://github.com/apache/hive/blob/master/
+  //   common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
+  val NUM_FILES = "numFiles"
+  val TOTAL_SIZE = "totalSize"
+  val DDL_TIME = "transient_lastDdlTime"
+
+  private def getPathFilter(hadoopConf: Configuration): PathFilter = {
+    // Dummy jobconf to get to the pathFilter defined in configuration
+    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
+    val jobConf = new JobConf(hadoopConf, this.getClass)
+    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+    new PathFilter {
+      override def accept(path: Path): Boolean = {
+        val name = path.getName
+        if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
+          pathFilter == null || pathFilter.accept(path)
+        } else {
+          false
+        }
+      }
+    }
+  }
+
   override def run(spark: SparkSession): Seq[Row] = {
     val catalog = spark.sessionState.catalog
     if (!catalog.tableExists(tableName)) {
@@ -449,10 +479,6 @@ case class AlterTableRecoverPartitionsCommand(
       throw new AnalysisException(
         s"Operation not allowed: $cmd on datasource tables: $tableName")
     }
-    if (table.tableType != CatalogTableType.EXTERNAL) {
-      throw new AnalysisException(
-        s"Operation not allowed: $cmd only works on external tables: $tableName")
-    }
     if (table.partitionColumnNames.isEmpty) {
       throw new AnalysisException(
         s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
@@ -463,19 +489,26 @@ case class AlterTableRecoverPartitionsCommand(
     }
 
     val root = new Path(table.storage.locationUri.get)
+    logInfo(s"Recover all the partitions in $root")
     val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
-    // Dummy jobconf to get to the pathFilter defined in configuration
-    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
-    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
-    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+
+    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    val pathFilter = getPathFilter(hadoopConf)
     val partitionSpecsAndLocs = scanPartitions(
-      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
-    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
-      // inherit table storage format (possibly except for location)
-      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
+      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold)
+    val total = partitionSpecsAndLocs.length
+    logInfo(s"Found $total partitions in $root")
+
+    val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
+      gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
+    } else {
+      GenMap.empty[String, PartitionStatistics]
     }
-    spark.sessionState.catalog.createPartitions(tableName,
-      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+    logInfo(s"Finished to gather the fast stats for all $total partitions.")
+
+    addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
+    logInfo(s"Recovered all partitions ($total).")
     Seq.empty[Row]
   }
 
@@ -487,15 +520,16 @@ case class AlterTableRecoverPartitionsCommand(
       filter: PathFilter,
       path: Path,
       spec: TablePartitionSpec,
-      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
-    if (partitionNames.length == 0) {
+      partitionNames: Seq[String],
+      threshold: Int): GenSeq[(TablePartitionSpec, Path)] = {
+    if (partitionNames.isEmpty) {
       return Seq(spec -> path)
     }
 
-    val statuses = fs.listStatus(path)
-    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
+    val statuses = fs.listStatus(path, filter)
     val statusPar: GenSeq[FileStatus] =
       if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
+        // parallelize the list of partitions here, then we can have better parallelism later.
         val parArray = statuses.par
         parArray.tasksupport = evalTaskSupport
         parArray
@@ -510,21 +544,89 @@ case class AlterTableRecoverPartitionsCommand(
         // TODO: Validate the value
         val value = PartitioningUtils.unescapePathName(ps(1))
         // comparing with case-insensitive, but preserve the case
-        if (columnName == partitionNames(0)) {
-          scanPartitions(
-            spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1))
+        if (columnName == partitionNames.head) {
+          scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value),
+            partitionNames.drop(1), threshold)
         } else {
-          logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it")
+          logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it")
           Seq()
         }
       } else {
-        if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
-          logWarning(s"ignore ${new Path(path, name)}")
-        }
+        logWarning(s"ignore ${new Path(path, name)}")
         Seq()
       }
     }
   }
+
+  private def gatherPartitionStats(
+      spark: SparkSession,
+      partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
+      fs: FileSystem,
+      pathFilter: PathFilter,
+      threshold: Int): GenMap[String, PartitionStatistics] = {
+    if (partitionSpecsAndLocs.length > threshold) {
+      val hadoopConf = spark.sparkContext.hadoopConfiguration
+      val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+      val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray
+
+      // Set the number of parallelism to prevent following file listing from generating many tasks
+      // in case of large #defaultParallelism.
+      val numParallelism = Math.min(serializedPaths.length,
+        Math.min(spark.sparkContext.defaultParallelism, 10000))
+      // gather the fast stats for all the partitions otherwise Hive metastore will list all the
+      // files for all the new partitions in sequential way, which is super slow.
+      logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.")
+      spark.sparkContext.parallelize(serializedPaths, numParallelism)
+        .mapPartitions { paths =>
+          val pathFilter = getPathFilter(serializableConfiguration.value)
+          paths.map(new Path(_)).map{ path =>
+            val fs = path.getFileSystem(serializableConfiguration.value)
+            val statuses = fs.listStatus(path, pathFilter)
+            (path.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
+          }
+        }.collectAsMap()
+    } else {
+      partitionSpecsAndLocs.map { case (_, location) =>
+        val statuses = fs.listStatus(location, pathFilter)
+        (location.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
+      }.toMap
+    }
+  }
+
+  private def addPartitions(
+      spark: SparkSession,
+      table: CatalogTable,
+      partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
+      partitionStats: GenMap[String, PartitionStatistics]): Unit = {
+    val total = partitionSpecsAndLocs.length
+    var done = 0L
+    // Hive metastore may not have enough memory to handle millions of partitions in single RPC,
+    // we should split them into smaller batches. Since Hive client is not thread safe, we cannot
+    // do this in parallel.
+    val batchSize = 100
+    partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch =>
+      val now = System.currentTimeMillis() / 1000
+      val parts = batch.map { case (spec, location) =>
+        val params = partitionStats.get(location.toString).map {
+          case PartitionStatistics(numFiles, totalSize) =>
+            // This two fast stat could prevent Hive metastore to list the files again.
+            Map(NUM_FILES -> numFiles.toString,
+              TOTAL_SIZE -> totalSize.toString,
+              // Workaround a bug in HiveMetastore that try to mutate a read-only parameters.
+              // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+              DDL_TIME -> now.toString)
+        }.getOrElse(Map.empty)
+        // inherit table storage format (possibly except for location)
+        CatalogTablePartition(
+          spec,
+          table.storage.copy(locationUri = Some(location.toUri.toString)),
+          params)
+      }
+      spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true)
+      done += parts.length
+      logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
+    }
+  }
 }
 
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f2b1afd71adc64cf9fc2edd6214d46d20c6173be..91988270ada8df15c55c61f7f15dcdb3ed9159e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -310,6 +310,14 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val GATHER_FASTSTAT = SQLConfigBuilder("spark.sql.hive.gatherFastStats")
+      .internal()
+      .doc("When true, fast stats (number of files and total size of all files) will be gathered" +
+        " in parallel while repairing table partitions to avoid the sequential listing in Hive" +
+        " metastore.")
+      .booleanConf
+      .createWithDefault(true)
+
   // This is used to control the when we will split a schema's JSON string to multiple pieces
   // in order to fit the JSON string in metastore's table property (by default, the value has
   // a length restriction of 4000 characters). We will split the JSON string of a schema
@@ -608,6 +616,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
 
+  def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
+
   def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
 
   def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b343454b12d86b652649ac15f920bc460abd56fe..0073659a31541d45790837b8c97e6c8804fab7a1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -824,13 +824,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   }
 
   test("alter table: recover partitions (sequential)") {
-    withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
+    withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
       testRecoverPartitions()
     }
   }
 
   test("alter table: recover partition (parallel)") {
-    withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
+    withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
       testRecoverPartitions()
     }
   }
@@ -853,7 +853,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
     // valid
     fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+    fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv"))  // file
+    fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS"))  // file
     fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv"))  // file
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv"))  // file
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile"))  // file
+    fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
+
     // invalid
     fs.mkdirs(new Path(new Path(root, "a"), "b"))  // bad name
     fs.mkdirs(new Path(new Path(root, "b=1"), "a=1"))  // wrong order
@@ -867,6 +874,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       sql("ALTER TABLE tab1 RECOVER PARTITIONS")
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
         Set(part1, part2))
+      assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
+      assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
     } finally {
       fs.delete(root, true)
     }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 81d5a124e9d4aec3e848e60bcde215ce0f744ef2..b45ad30dcae410914081b16728e5e645d9a17a5a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -829,6 +829,8 @@ private[hive] class HiveClientImpl(
         serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
         compressed = apiPartition.getSd.isCompressed,
         properties = Option(apiPartition.getSd.getSerdeInfo.getParameters)
-          .map(_.asScala.toMap).orNull))
+          .map(_.asScala.toMap).orNull),
+        parameters =
+          if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty)
   }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 41527fcd05154d599c043f2aeb807c4959fe8ec0..32387707612f432baf964023d226d9d78a1e6e44 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -267,6 +267,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
     val table = hive.getTable(database, tableName)
     parts.foreach { s =>
       val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
+      val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
       val spec = s.spec.asJava
       if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
         // Ignore this partition since it already exists and ignoreIfExists == true
@@ -280,7 +281,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
           table,
           spec,
           location,
-          null, // partParams
+          params, // partParams
           null, // inputFormat
           null, // outputFormat
           -1: JInteger, // numBuckets
@@ -459,8 +460,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
       parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = {
     val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
-    parts.foreach { s =>
+    parts.zipWithIndex.foreach { case (s, i) =>
       addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
+      if (s.parameters.nonEmpty) {
+        addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
+      }
     }
     hive.createPartitions(addPartitionDesc)
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index f00a99b6d0b3d61ddcdd4e5cf82e94bc25149f1c..9019333d76869a2c0dcb61066481a3b6d6b0097f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -378,6 +378,44 @@ class HiveDDLSuite
       expectedSerdeProps)
   }
 
+  test("MSCK REPAIR RABLE") {
+    val catalog = spark.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1")
+    sql("CREATE TABLE tab1 (height INT, length INT) PARTITIONED BY (a INT, b INT)")
+    val part1 = Map("a" -> "1", "b" -> "5")
+    val part2 = Map("a" -> "2", "b" -> "6")
+    val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+    // valid
+    fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+    fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv"))  // file
+    fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS"))  // file
+    fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv"))  // file
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv"))  // file
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile"))  // file
+    fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
+
+    // invalid
+    fs.mkdirs(new Path(new Path(root, "a"), "b"))  // bad name
+    fs.mkdirs(new Path(new Path(root, "b=1"), "a=1"))  // wrong order
+    fs.mkdirs(new Path(root, "a=4")) // not enough columns
+    fs.createNewFile(new Path(new Path(root, "a=1"), "b=4"))  // file
+    fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS"))  // _SUCCESS
+    fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary"))  // _temporary
+    fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4"))  // start with .
+
+    try {
+      sql("MSCK REPAIR TABLE tab1")
+      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+        Set(part1, part2))
+      assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
+      assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
+    } finally {
+      fs.delete(root, true)
+    }
+  }
+
   test("drop table using drop view") {
     withTable("tab1") {
       sql("CREATE TABLE tab1(c1 int)")