Skip to content
Snippets Groups Projects
Commit 24c0c941 authored by gatorsmile's avatar gatorsmile Committed by Reynold Xin
Browse files

[SPARK-18949][SQL] Add recoverPartitions API to Catalog

### What changes were proposed in this pull request?

Currently, we only have a SQL interface for recovering all the partitions in the directory of a table and update the catalog. `MSCK REPAIR TABLE` or `ALTER TABLE table RECOVER PARTITIONS`. (Actually, very hard for me to remember `MSCK` and have no clue what it means)

After the new "Scalable Partition Handling", the table repair becomes much more important for making visible the data in the created data source partitioned table.

Thus, this PR is to add it into the Catalog interface. After this PR, users can repair the table by
```Scala
spark.catalog.recoverPartitions("testTable")
```

### How was this patch tested?
Modified the existing test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16356 from gatorsmile/repairTable.
parent b2dd8ec6
No related branches found
No related tags found
No related merge requests found
......@@ -37,7 +37,9 @@ object MimaExcludes {
// Exclude rules for 2.2.x
lazy val v22excludes = v21excludes ++ Seq(
// [SPARK-18663][SQL] Simplify CountMinSketch aggregate implementation
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.CountMinSketch.toByteArray")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.CountMinSketch.toByteArray"),
// [SPARK-18949] [SQL] Add repairTable API to Catalog
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions")
)
// Exclude rules for 2.1.x
......
......@@ -258,6 +258,11 @@ class Catalog(object):
"""Invalidate and refresh all the cached metadata of the given table."""
self._jcatalog.refreshTable(tableName)
@since('2.1.1')
def recoverPartitions(self, tableName):
"""Recover all the partitions of the given table and update the catalog."""
self._jcatalog.recoverPartitions(tableName)
def _reset(self):
"""(Internal use only) Drop all existing databases (except "default"), tables,
partitions and functions, and set the current database to "default".
......
......@@ -300,6 +300,13 @@ abstract class Catalog {
*/
def dropGlobalTempView(viewName: String): Boolean
/**
* Recover all the partitions in the directory of a table and update the catalog.
*
* @since 2.1.1
*/
def recoverPartitions(tableName: String): Unit
/**
* Returns true if the table is currently cached in-memory.
*
......
......@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.types.StructType
......@@ -393,6 +394,19 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
}
/**
* Recover all the partitions in the directory of a table and update the catalog.
*
* @param tableName the name of the table to be repaired.
* @group ddl_ops
* @since 2.1.1
*/
override def recoverPartitions(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
sparkSession.sessionState.executePlan(
AlterTableRecoverPartitionsCommand(tableIdent)).toRdd
}
/**
* Returns true if the table is currently cached in-memory.
*
......
......@@ -70,7 +70,7 @@ class PartitionProviderCompatibilitySuite
}
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
verifyIsLegacyTable("test")
spark.sql("msck repair table test")
spark.catalog.recoverPartitions("test")
spark.sql("show partitions test").count() // check we are a new table
// sanity check table performance
......@@ -90,7 +90,7 @@ class PartitionProviderCompatibilitySuite
setupPartitionedDatasourceTable("test", dir)
spark.sql("show partitions test").count() // check we are a new table
assert(spark.sql("select * from test").count() == 0) // needs repair
spark.sql("msck repair table test")
spark.catalog.recoverPartitions("test")
assert(spark.sql("select * from test").count() == 5)
}
}
......@@ -160,7 +160,7 @@ class PartitionProviderCompatibilitySuite
withTable("test") {
withTempDir { dir =>
setupPartitionedDatasourceTable("test", dir)
sql("msck repair table test")
spark.catalog.recoverPartitions("test")
spark.sql(
"""insert overwrite table test
|partition (partCol=1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment