diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index c7d50869eaa0b166fdd84f33134e0e9257daf082..d2b5c53487652048959ce0f2e5e79f42f7b7c959 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -84,6 +84,7 @@ statement
     | ALTER VIEW tableIdentifier
         DROP (IF EXISTS)? partitionSpec (',' partitionSpec)*           #dropTablePartitions
     | ALTER TABLE tableIdentifier partitionSpec? SET locationSpec      #setTableLocation
+    | ALTER TABLE tableIdentifier RECOVER PARTITIONS                   #recoverPartitions
     | DROP TABLE (IF EXISTS)? tableIdentifier PURGE?                   #dropTable
     | DROP VIEW (IF EXISTS)? tableIdentifier                           #dropTable
     | CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
@@ -121,6 +122,7 @@ statement
     | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
         tableIdentifier partitionSpec?                                 #loadData
     | TRUNCATE TABLE tableIdentifier partitionSpec?                    #truncateTable
+    | MSCK REPAIR TABLE tableIdentifier                                #repairTable
     | op=(ADD | LIST) identifier .*?                                   #manageResource
     | SET ROLE .*?                                                     #failNativeCommand
     | SET .*?                                                          #setConfiguration
@@ -154,7 +156,6 @@ unsupportedHiveNativeCommands
     | kw1=UNLOCK kw2=DATABASE
     | kw1=CREATE kw2=TEMPORARY kw3=MACRO
     | kw1=DROP kw2=TEMPORARY kw3=MACRO
-    | kw1=MSCK kw2=REPAIR kw3=TABLE
     | kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=CLUSTERED
     | kw1=ALTER kw2=TABLE tableIdentifier kw3=CLUSTERED kw4=BY
     | kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=SORTED
@@ -653,7 +654,7 @@ nonReserved
     | CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT
     | DBPROPERTIES | DFS | TRUNCATE | COMPUTE | LIST
     | STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
-    | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
+    | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | RECOVER | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
     | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH
     | ASC | DESC | LIMIT | RENAME | SETS
     | AT | NULLS | OVERWRITE | ALL | ALTER | AS | BETWEEN | BY | CREATE | DELETE
@@ -866,6 +867,7 @@ LOCK: 'LOCK';
 UNLOCK: 'UNLOCK';
 MSCK: 'MSCK';
 REPAIR: 'REPAIR';
+RECOVER: 'RECOVER';
 EXPORT: 'EXPORT';
 IMPORT: 'IMPORT';
 LOAD: 'LOAD';
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 2a452f4379afba94f928795a77fc904c9ef0efd1..9da2b5a254e2857ac239f1c5f5591b78ab863a34 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -414,6 +414,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
   }
 
+  /**
+   * Create a [[AlterTableRecoverPartitionsCommand]] command.
+   *
+   * For example:
+   * {{{
+   *   MSCK REPAIR TABLE tablename
+   * }}}
+   */
+  override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableRecoverPartitionsCommand(
+      visitTableIdentifier(ctx.tableIdentifier),
+      "MSCK REPAIR TABLE")
+  }
+
   /**
    * Convert a table property list into a key-value map.
    * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
@@ -784,6 +798,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       ctx.PURGE != null)
   }
 
+  /**
+   * Create an [[AlterTableDiscoverPartitionsCommand]] command
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table RECOVER PARTITIONS;
+   * }}}
+   */
+  override def visitRecoverPartitions(
+      ctx: RecoverPartitionsContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableRecoverPartitionsCommand(visitTableIdentifier(ctx.tableIdentifier))
+  }
+
   /**
    * Create an [[AlterTableSetLocationCommand]] command
    *
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index f0e49e65c459d146c2a863da11b829ab2fd1c791..8fa7615b97b18dee7c5849107c7684dcfa16ecb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -17,18 +17,23 @@
 
 package org.apache.spark.sql.execution.command
 
+import scala.collection.GenSeq
+import scala.collection.parallel.ForkJoinTaskSupport
+import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.control.NonFatal
 
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes._
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.types._
 
-
 // Note: The definition of these commands are based on the ones described in
 // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
 
@@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+    tableName: TableIdentifier,
+    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+    val catalog = spark.sessionState.catalog
+    if (!catalog.tableExists(tableName)) {
+      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
+    }
+    val table = catalog.getTableMetadata(tableName)
+    if (catalog.isTemporaryTable(tableName)) {
+      throw new AnalysisException(
+        s"Operation not allowed: $cmd on temporary tables: $tableName")
+    }
+    if (DDLUtils.isDatasourceTable(table)) {
+      throw new AnalysisException(
+        s"Operation not allowed: $cmd on datasource tables: $tableName")
+    }
+    if (table.tableType != CatalogTableType.EXTERNAL) {
+      throw new AnalysisException(
+        s"Operation not allowed: $cmd only works on external tables: $tableName")
+    }
+    if (!DDLUtils.isTablePartitioned(table)) {
+      throw new AnalysisException(
+        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
+    }
+    if (table.storage.locationUri.isEmpty) {
+      throw new AnalysisException(
+        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
+    }
+
+    val root = new Path(table.storage.locationUri.get)
+    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+    // Dummy jobconf to get to the pathFilter defined in configuration
+    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
+    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
+    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+    val partitionSpecsAndLocs = scanPartitions(
+      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
+    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+      // inherit table storage format (possibly except for location)
+      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
+    }
+    spark.sessionState.catalog.createPartitions(tableName,
+      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+    Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+      spark: SparkSession,
+      fs: FileSystem,
+      filter: PathFilter,
+      path: Path,
+      spec: TablePartitionSpec,
+      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+    if (partitionNames.length == 0) {
+      return Seq(spec -> path)
+    }
+
+    val statuses = fs.listStatus(path)
+    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
+    val statusPar: GenSeq[FileStatus] =
+      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
+        val parArray = statuses.par
+        parArray.tasksupport = evalTaskSupport
+        parArray
+      } else {
+        statuses
+      }
+    statusPar.flatMap { st =>
+      val name = st.getPath.getName
+      if (st.isDirectory && name.contains("=")) {
+        val ps = name.split("=", 2)
+        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+        // TODO: Validate the value
+        val value = PartitioningUtils.unescapePathName(ps(1))
+        // comparing with case-insensitive, but preserve the case
+        if (columnName == partitionNames(0)) {
+          scanPartitions(
+            spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1))
+        } else {
+          logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it")
+          Seq()
+        }
+      } else {
+        if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
+          logWarning(s"ignore ${new Path(path, name)}")
+        }
+        Seq()
+      }
+    }
+  }
+}
+
 
 /**
  * A command that sets the location of a table or a partition.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index e6fe9a73a1f303921e516c5bc31d4841fa31c026..3b1052619b63f58d5b964962d91491c3dcbc3301 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{PartitioningUtils}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 044fa5fb9a111da54d9894897e6d6461484ff433..be1bccbd990a0e81b0431316c63bfd0bd72a972e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -540,6 +540,14 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed2, expected2)
   }
 
+  test("alter table: recover partitions") {
+    val sql = "ALTER TABLE table_name RECOVER PARTITIONS"
+    val parsed = parser.parsePlan(sql)
+    val expected = AlterTableRecoverPartitionsCommand(
+      TableIdentifier("table_name", None))
+    comparePlans(parsed, expected)
+  }
+
   test("alter view: add partition (not supported)") {
     assertUnsupported(
       """
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index ca9b210125b58873d19470786b60bc97866728af..53376c56f185845359fda90f226d1fa138d3c8bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -864,6 +864,55 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     testAddPartitions(isDatasourceTable = true)
   }
 
+  test("alter table: recover partitions (sequential)") {
+    withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
+      testRecoverPartitions()
+    }
+  }
+
+  test("alter table: recover partition (parallel)") {
+    withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
+      testRecoverPartitions()
+    }
+  }
+
+  private def testRecoverPartitions() {
+    val catalog = spark.sessionState.catalog
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
+    }
+
+    val tableIdent = TableIdentifier("tab1")
+    createTable(catalog, tableIdent)
+    val part1 = Map("a" -> "1", "b" -> "5")
+    createTablePartition(catalog, part1, tableIdent)
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
+
+    val part2 = Map("a" -> "2", "b" -> "6")
+    val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+    // valid
+    fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+    fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
+    // invalid
+    fs.mkdirs(new Path(new Path(root, "a"), "b"))  // bad name
+    fs.mkdirs(new Path(new Path(root, "b=1"), "a=1"))  // wrong order
+    fs.mkdirs(new Path(root, "a=4")) // not enough columns
+    fs.createNewFile(new Path(new Path(root, "a=1"), "b=4"))  // file
+    fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS"))  // _SUCCESS
+    fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary"))  // _temporary
+    fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4"))  // start with .
+
+    try {
+      sql("ALTER TABLE tab1 RECOVER PARTITIONS")
+      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+        Set(part1, part2))
+    } finally {
+      fs.delete(root, true)
+    }
+  }
+
   test("alter table: add partition is not supported for views") {
     assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')")
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 69a6884c7aa6dcd6c7c3ddc0f8eb38bf3f50cb7b..54e27b6f73502b222ad93f6cba3a2eb509b2c38b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -499,8 +500,13 @@ class HiveDDLCommandSuite extends PlanTest {
     }
   }
 
-  test("MSCK repair table (not supported)") {
-    assertUnsupported("MSCK REPAIR TABLE tab1")
+  test("MSCK REPAIR table") {
+    val sql = "MSCK REPAIR TABLE tab1"
+    val parsed = parser.parsePlan(sql)
+    val expected = AlterTableRecoverPartitionsCommand(
+      TableIdentifier("tab1", None),
+      "MSCK REPAIR TABLE")
+    comparePlans(parsed, expected)
   }
 
   test("create table like") {