From 8ca6a82c1d04b0986d3063e3ee321698fc278992 Mon Sep 17 00:00:00 2001 From: Michael Allman <michael@videoamp.com> Date: Tue, 6 Dec 2016 11:33:35 +0800 Subject: [PATCH] [SPARK-18572][SQL] Add a method `listPartitionNames` to `ExternalCatalog` (Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-18572) ## What changes were proposed in this pull request? Currently Spark answers the `SHOW PARTITIONS` command by fetching all of the table's partition metadata from the external catalog and constructing partition names therefrom. The Hive client has a `getPartitionNames` method which is many times faster for this purpose, with the performance improvement scaling with the number of partitions in a table. To test the performance impact of this PR, I ran the `SHOW PARTITIONS` command on two Hive tables with large numbers of partitions. One table has ~17,800 partitions, and the other has ~95,000 partitions. For the purposes of this PR, I'll call the former table `table1` and the latter table `table2`. I ran 5 trials for each table with before-and-after versions of this PR. The results are as follows: Spark at bdc8153, `SHOW PARTITIONS table1`, times in seconds: 7.901 3.983 4.018 4.331 4.261 Spark at bdc8153, `SHOW PARTITIONS table2` (Timed out after 10 minutes with a `SocketTimeoutException`.) Spark at this PR, `SHOW PARTITIONS table1`, times in seconds: 3.801 0.449 0.395 0.348 0.336 Spark at this PR, `SHOW PARTITIONS table2`, times in seconds: 5.184 1.63 1.474 1.519 1.41 Taking the best times from each trial, we get a 12x performance improvement for a table with ~17,800 partitions and at least a 426x improvement for a table with ~95,000 partitions. More significantly, the latter command doesn't even complete with the current code in master. This is actually a patch we've been using in-house at VideoAmp since Spark 1.1. It's made all the difference in the practical usability of our largest tables. Even with tables with about 1,000 partitions there's a performance improvement of about 2-3x. ## How was this patch tested? I added a unit test to `VersionsSuite` which tests that the Hive client's `getPartitionNames` method returns the correct number of partitions. Author: Michael Allman <michael@videoamp.com> Closes #15998 from mallman/spark-18572-list_partition_names. (cherry picked from commit 772ddbeaa6fe5abf189d01246f57d295f9346fa3) Signed-off-by: Wenchen Fan <wenchen@databricks.com> --- .../catalyst/catalog/ExternalCatalog.scala | 26 +++++++++++- .../catalyst/catalog/InMemoryCatalog.scala | 14 +++++++ .../sql/catalyst/catalog/SessionCatalog.scala | 23 ++++++++++ .../catalog/ExternalCatalogSuite.scala | 25 +++++++++++ .../catalog/SessionCatalogSuite.scala | 39 +++++++++++++++++ .../spark/sql/execution/command/tables.scala | 12 +----- .../datasources/DataSourceStrategy.scala | 22 +++++----- .../datasources/PartitioningUtils.scala | 13 +++++- .../spark/sql/hive/HiveExternalCatalog.scala | 42 +++++++++++++++++-- .../spark/sql/hive/client/HiveClient.scala | 10 +++++ .../sql/hive/client/HiveClientImpl.scala | 20 +++++++++ .../spark/sql/hive/client/VersionsSuite.scala | 5 +++ 12 files changed, 221 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 259008f183..4b8cac8f32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -189,15 +189,37 @@ abstract class ExternalCatalog { table: String, spec: TablePartitionSpec): Option[CatalogTablePartition] + /** + * List the names of all partitions that belong to the specified table, assuming it exists. + * + * For a table with partition columns p1, p2, p3, each partition name is formatted as + * `p1=v1/p2=v2/p3=v3`. Each partition column name and value is an escaped path name, and can be + * decoded with the `ExternalCatalogUtils.unescapePathName` method. + * + * The returned sequence is sorted as strings. + * + * A partial partition spec may optionally be provided to filter the partitions returned, as + * described in the `listPartitions` method. + * + * @param db database name + * @param table table name + * @param partialSpec partition spec + */ + def listPartitionNames( + db: String, + table: String, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] + /** * List the metadata of all partitions that belong to the specified table, assuming it exists. * * A partial partition spec may optionally be provided to filter the partitions returned. * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), * then a partial spec of (a='1') will return the first two only. + * * @param db database name * @param table table name - * @param partialSpec partition spec + * @param partialSpec partition spec */ def listPartitions( db: String, @@ -210,7 +232,7 @@ abstract class ExternalCatalog { * * @param db database name * @param table table name - * @param predicates partition-pruning predicates + * @param predicates partition-pruning predicates */ def listPartitionsByFilter( db: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 880a7a0dc4..a6bebe1a39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.StringUtils @@ -488,6 +489,19 @@ class InMemoryCatalog( } } + override def listPartitionNames( + db: String, + table: String, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] = synchronized { + val partitionColumnNames = getTable(db, table).partitionColumnNames + + listPartitions(db, table, partialSpec).map { partition => + partitionColumnNames.map { name => + escapePathName(name) + "=" + escapePathName(partition.spec(name)) + }.mkString("/") + }.sorted + } + override def listPartitions( db: String, table: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index da3a2079f4..7a3d2097a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -748,6 +748,26 @@ class SessionCatalog( externalCatalog.getPartition(db, table, spec) } + /** + * List the names of all partitions that belong to the specified table, assuming it exists. + * + * A partial partition spec may optionally be provided to filter the partitions returned. + * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), + * then a partial spec of (a='1') will return the first two only. + */ + def listPartitionNames( + tableName: TableIdentifier, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] = { + val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) + val table = formatTableName(tableName.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Option(db))) + partialSpec.foreach { spec => + requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) + } + externalCatalog.listPartitionNames(db, table, partialSpec) + } + /** * List the metadata of all partitions that belong to the specified table, assuming it exists. * @@ -762,6 +782,9 @@ class SessionCatalog( val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) + partialSpec.foreach { spec => + requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) + } externalCatalog.listPartitions(db, table, partialSpec) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 3b39f420af..00e663c324 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -346,6 +346,31 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(new Path(partitionLocation) == defaultPartitionLocation) } + test("list partition names") { + val catalog = newBasicCatalog() + val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat) + catalog.createPartitions("db2", "tbl2", Seq(newPart), ignoreIfExists = false) + + val partitionNames = catalog.listPartitionNames("db2", "tbl2") + assert(partitionNames == Seq("a=1/b=%25%3D", "a=1/b=2", "a=3/b=4")) + } + + test("list partition names with partial partition spec") { + val catalog = newBasicCatalog() + val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat) + catalog.createPartitions("db2", "tbl2", Seq(newPart), ignoreIfExists = false) + + val partitionNames1 = catalog.listPartitionNames("db2", "tbl2", Some(Map("a" -> "1"))) + assert(partitionNames1 == Seq("a=1/b=%25%3D", "a=1/b=2")) + + // Partial partition specs including "weird" partition values should use the unescaped values + val partitionNames2 = catalog.listPartitionNames("db2", "tbl2", Some(Map("b" -> "%="))) + assert(partitionNames2 == Seq("a=1/b=%25%3D")) + + val partitionNames3 = catalog.listPartitionNames("db2", "tbl2", Some(Map("b" -> "%25%3D"))) + assert(partitionNames3.isEmpty) + } + test("list partitions with partial partition spec") { val catalog = newBasicCatalog() val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1"))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index f9c4b2687b..5cc772d8e9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -878,6 +878,31 @@ class SessionCatalogSuite extends SparkFunSuite { "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) } + test("list partition names") { + val catalog = new SessionCatalog(newBasicCatalog()) + val expectedPartitionNames = Seq("a=1/b=2", "a=3/b=4") + assert(catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2"))) == + expectedPartitionNames) + // List partition names without explicitly specifying database + catalog.setCurrentDatabase("db2") + assert(catalog.listPartitionNames(TableIdentifier("tbl2")) == expectedPartitionNames) + } + + test("list partition names with partial partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert( + catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))) == + Seq("a=1/b=2")) + } + + test("list partition names with invalid partial partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), + Some(Map("unknown" -> "unknown"))) + } + } + test("list partitions") { val catalog = new SessionCatalog(newBasicCatalog()) assert(catalogPartitionsEqual( @@ -887,6 +912,20 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalogPartitionsEqual(catalog.listPartitions(TableIdentifier("tbl2")), part1, part2)) } + test("list partitions with partial partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert(catalogPartitionsEqual( + catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))), part1)) + } + + test("list partitions with invalid partial partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.listPartitions( + TableIdentifier("tbl2", Some("db2")), Some(Map("unknown" -> "unknown"))) + } + } + test("list partitions when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) intercept[NoSuchDatabaseException] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 57d66f1f14..5d507759d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -715,13 +715,6 @@ case class ShowPartitionsCommand( AttributeReference("partition", StringType, nullable = false)() :: Nil } - private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = { - partColNames.map { name => - ExternalCatalogUtils.escapePathName(name) + "=" + - ExternalCatalogUtils.escapePathName(spec(name)) - }.mkString(File.separator) - } - override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) @@ -758,10 +751,7 @@ case class ShowPartitionsCommand( } } - val partNames = catalog.listPartitions(tableName, spec).map { p => - getPartName(p.spec, table.partitionColumnNames) - } - + val partNames = catalog.listPartitionNames(tableName, spec) partNames.map(Row(_)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 4468dc58e4..03eed25176 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -161,8 +161,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { insert.copy(partition = parts.map(p => (p._1, None)), child = Project(projectList, query)) - case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation, _, table), part, query, overwrite, false) + case logical.InsertIntoTable( + l @ LogicalRelation(t: HadoopFsRelation, _, table), _, query, overwrite, false) if query.resolved && t.schema.sameType(query.schema) => // Sanity checks @@ -192,11 +192,19 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty + val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) { + overwrite.staticPartitionKeys.map { case (k, v) => + (partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v) + } + } else { + Map.empty + } + // When partitions are tracked by the catalog, compute all custom partition locations that // may be relevant to the insertion job. if (partitionsTrackedByCatalog) { val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions( - l.catalogTable.get.identifier, Some(overwrite.staticPartitionKeys)) + l.catalogTable.get.identifier, Some(staticPartitionKeys)) initialMatchingPartitions = matchingPartitions.map(_.spec) customPartitionLocations = getCustomPartitionLocations( t.sparkSession, l.catalogTable.get, outputPath, matchingPartitions) @@ -225,14 +233,6 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { t.location.refresh() } - val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) { - overwrite.staticPartitionKeys.map { case (k, v) => - (partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v) - } - } else { - Map.empty - } - val insertCmd = InsertIntoHadoopFsRelationCommand( outputPath, staticPartitionKeys, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index bf9f318780..bc290702dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -244,13 +244,22 @@ object PartitioningUtils { /** * Given a partition path fragment, e.g. `fieldOne=1/fieldTwo=2`, returns a parsed spec - * for that fragment, e.g. `Map(("fieldOne", "1"), ("fieldTwo", "2"))`. + * for that fragment as a `TablePartitionSpec`, e.g. `Map(("fieldOne", "1"), ("fieldTwo", "2"))`. */ def parsePathFragment(pathFragment: String): TablePartitionSpec = { + parsePathFragmentAsSeq(pathFragment).toMap + } + + /** + * Given a partition path fragment, e.g. `fieldOne=1/fieldTwo=2`, returns a parsed spec + * for that fragment as a `Seq[(String, String)]`, e.g. + * `Seq(("fieldOne", "1"), ("fieldTwo", "2"))`. + */ + def parsePathFragmentAsSeq(pathFragment: String): Seq[(String, String)] = { pathFragment.split("/").map { kv => val pair = kv.split("=", 2) (unescapePathName(pair(0)), unescapePathName(pair(1))) - }.toMap + } } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index c213e8e0b2..f67ddc9be1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -35,10 +35,12 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.StaticSQLConf._ @@ -812,9 +814,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat spec.map { case (k, v) => k.toLowerCase -> v } } + // Build a map from lower-cased partition column names to exact column names for a given table + private def buildLowerCasePartColNameMap(table: CatalogTable): Map[String, String] = { + val actualPartColNames = table.partitionColumnNames + actualPartColNames.map(colName => (colName.toLowerCase, colName)).toMap + } + // Hive metastore is not case preserving and the column names of the partition specification we // get from the metastore are always lower cased. We should restore them w.r.t. the actual table // partition columns. + private def restorePartitionSpec( + spec: TablePartitionSpec, + partColMap: Map[String, String]): TablePartitionSpec = { + spec.map { case (k, v) => partColMap(k.toLowerCase) -> v } + } + private def restorePartitionSpec( spec: TablePartitionSpec, partCols: Seq[String]): TablePartitionSpec = { @@ -927,13 +941,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat /** * Returns the partition names from hive metastore for a given table in a database. */ + override def listPartitionNames( + db: String, + table: String, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withClient { + val catalogTable = getTable(db, table) + val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName) + val clientPartitionNames = + client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec)) + clientPartitionNames.map { partName => + val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName) + partSpec.map { case (partName, partValue) => + partColNameMap(partName.toLowerCase) + "=" + escapePathName(partValue) + }.mkString("/") + } + } + + /** + * Returns the partitions from hive metastore for a given table in a database. + */ override def listPartitions( db: String, table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient { - val actualPartColNames = getTable(db, table).partitionColumnNames + val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table)) client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part => - part.copy(spec = restorePartitionSpec(part.spec, actualPartColNames)) + part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) } } @@ -954,10 +987,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } val partitionSchema = catalogTable.partitionSchema + val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table)) if (predicates.nonEmpty) { val clientPrunedPartitions = client.getPartitionsByFilter(rawTable, predicates).map { part => - part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames)) + part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) } val boundPredicate = InterpretedPredicate.create(predicates.reduce(And).transform { @@ -968,7 +1002,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat clientPrunedPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) } } else { client.getPartitions(catalogTable).map { part => - part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames)) + part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 4c76932b61..8e7c871183 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -156,6 +156,16 @@ private[hive] trait HiveClient { } } + /** + * Returns the partition names for the given table that match the supplied partition spec. + * If no partition spec is specified, all partitions are returned. + * + * The returned sequence is sorted as strings. + */ + def getPartitionNames( + table: CatalogTable, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] + /** Returns the specified partition or None if it does not exist. */ final def getPartitionOption( db: String, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bd840af5b1..db73596e5f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -519,6 +519,26 @@ private[hive] class HiveClientImpl( client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava) } + /** + * Returns the partition names for the given table that match the supplied partition spec. + * If no partition spec is specified, all partitions are returned. + * + * The returned sequence is sorted as strings. + */ + override def getPartitionNames( + table: CatalogTable, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withHiveState { + val hivePartitionNames = + partialSpec match { + case None => + // -1 for result limit means "no limit/return all" + client.getPartitionNames(table.database, table.identifier.table, -1) + case Some(s) => + client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1) + } + hivePartitionNames.asScala.sorted + } + override def getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 16ae345de6..79e76b3134 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -254,6 +254,11 @@ class VersionsSuite extends SparkFunSuite with Logging { "default", "src_part", partitions, ignoreIfExists = true) } + test(s"$version: getPartitionNames(catalogTable)") { + val partitionNames = (1 to testPartitionCount).map(key2 => s"key1=1/key2=$key2") + assert(partitionNames == client.getPartitionNames(client.getTable("default", "src_part"))) + } + test(s"$version: getPartitions(catalogTable)") { assert(testPartitionCount == client.getPartitions(client.getTable("default", "src_part")).size) -- GitLab