diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 978a328f3e2d06240a0b06fc59e9d432807cadd1..6d1b4d2b277f9aab9e8a190bab4c46de2452ee5c 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -110,7 +110,10 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query"), + + // [SPARK-18949] [SQL] Add repairTable API to Catalog + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions") ) } diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index a36d02e0db13444ca49d2a7946bded08f4ba7982..30c7a3fe4fe6018f18a5d0604bb15c330f879846 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -258,6 +258,11 @@ class Catalog(object): """Invalidate and refresh all the cached metadata of the given table.""" self._jcatalog.refreshTable(tableName) + @since('2.1.1') + def recoverPartitions(self, tableName): + """Recover all the partitions of the given table and update the catalog.""" + self._jcatalog.recoverPartitions(tableName) + def _reset(self): """(Internal use only) Drop all existing databases (except "default"), tables, partitions and functions, and set the current database to "default". diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index aecdda1c364983789bb2abe51a58cdcffed73b53..6b061f8ab2740653c8605384428424adcebe5369 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -300,6 +300,13 @@ abstract class Catalog { */ def dropGlobalTempView(viewName: String): Boolean + /** + * Recover all the partitions in the directory of a table and update the catalog. + * + * @since 2.1.1 + */ + def recoverPartitions(tableName: String): Unit + /** * Returns true if the table is currently cached in-memory. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 6d984621ccca1c90b1f54479357356bdfbc08ca2..41ed9d71809e965361a53740fb6cc1cd49b7ede7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} import org.apache.spark.sql.types.StructType @@ -393,6 +394,19 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } } + /** + * Recover all the partitions in the directory of a table and update the catalog. + * + * @param tableName the name of the table to be repaired. + * @group ddl_ops + * @since 2.1.1 + */ + override def recoverPartitions(tableName: String): Unit = { + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + sparkSession.sessionState.executePlan( + AlterTableRecoverPartitionsCommand(tableIdent)).toRdd + } + /** * Returns true if the table is currently cached in-memory. * diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index c2ac0327607805d0b328cd15556d8bc29122a3a8..3f84cbdb1b09c506fa958aa8ed6c8130822c2098 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -70,7 +70,7 @@ class PartitionProviderCompatibilitySuite } withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { verifyIsLegacyTable("test") - spark.sql("msck repair table test") + spark.catalog.recoverPartitions("test") spark.sql("show partitions test").count() // check we are a new table // sanity check table performance @@ -90,7 +90,7 @@ class PartitionProviderCompatibilitySuite setupPartitionedDatasourceTable("test", dir) spark.sql("show partitions test").count() // check we are a new table assert(spark.sql("select * from test").count() == 0) // needs repair - spark.sql("msck repair table test") + spark.catalog.recoverPartitions("test") assert(spark.sql("select * from test").count() == 5) } } @@ -160,7 +160,7 @@ class PartitionProviderCompatibilitySuite withTable("test") { withTempDir { dir => setupPartitionedDatasourceTable("test", dir) - sql("msck repair table test") + spark.catalog.recoverPartitions("test") spark.sql( """insert overwrite table test |partition (partCol=1)