Skip to content
Snippets Groups Projects
Commit bc7a3ec2 authored by Andrew Or's avatar Andrew Or Committed by Reynold Xin
Browse files

[SPARK-13685][SQL] Rename catalog.Catalog to ExternalCatalog

## What changes were proposed in this pull request?

Today we have `analysis.Catalog` and `catalog.Catalog`. In the future the former will call the latter. When that happens, if both of them are still called `Catalog` it will be very confusing. This patch renames the latter `ExternalCatalog` because it is expected to talk to external systems.

## How was this patch tested?

Jenkins.

Author: Andrew Or <andrew@databricks.com>

Closes #11526 from andrewor14/rename-catalog.
parent ee913e6e
No related branches found
No related tags found
No related merge requests found
Showing
with 34 additions and 27 deletions
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.analysis package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.catalog.Catalog.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
/** /**
......
...@@ -25,10 +25,14 @@ import org.apache.spark.sql.AnalysisException ...@@ -25,10 +25,14 @@ import org.apache.spark.sql.AnalysisException
/** /**
* An in-memory (ephemeral) implementation of the system catalog. * An in-memory (ephemeral) implementation of the system catalog.
* *
* This is a dummy implementation that does not require setting up external systems.
* It is intended for testing or exploration purposes only and should not be used
* in production.
*
* All public methods should be synchronized for thread-safety. * All public methods should be synchronized for thread-safety.
*/ */
class InMemoryCatalog extends Catalog { class InMemoryCatalog extends ExternalCatalog {
import Catalog._ import ExternalCatalog._
private class TableDesc(var table: CatalogTable) { private class TableDesc(var table: CatalogTable) {
val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition] val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition]
......
...@@ -26,12 +26,13 @@ import org.apache.spark.sql.AnalysisException ...@@ -26,12 +26,13 @@ import org.apache.spark.sql.AnalysisException
* Interface for the system catalog (of columns, partitions, tables, and databases). * Interface for the system catalog (of columns, partitions, tables, and databases).
* *
* This is only used for non-temporary items, and implementations must be thread-safe as they * This is only used for non-temporary items, and implementations must be thread-safe as they
* can be accessed in multiple threads. * can be accessed in multiple threads. This is an external catalog because it is expected to
* interact with external systems.
* *
* Implementations should throw [[AnalysisException]] when table or database don't exist. * Implementations should throw [[AnalysisException]] when table or database don't exist.
*/ */
abstract class Catalog { abstract class ExternalCatalog {
import Catalog._ import ExternalCatalog._
protected def requireDbExists(db: String): Unit = { protected def requireDbExists(db: String): Unit = {
if (!databaseExists(db)) { if (!databaseExists(db)) {
...@@ -198,7 +199,9 @@ case class CatalogColumn( ...@@ -198,7 +199,9 @@ case class CatalogColumn(
* @param spec partition spec values indexed by column name * @param spec partition spec values indexed by column name
* @param storage storage format of the partition * @param storage storage format of the partition
*/ */
case class CatalogTablePartition(spec: Catalog.TablePartitionSpec, storage: CatalogStorageFormat) case class CatalogTablePartition(
spec: ExternalCatalog.TablePartitionSpec,
storage: CatalogStorageFormat)
/** /**
...@@ -263,7 +266,7 @@ case class CatalogDatabase( ...@@ -263,7 +266,7 @@ case class CatalogDatabase(
properties: Map[String, String]) properties: Map[String, String])
object Catalog { object ExternalCatalog {
/** /**
* Specifications of a table partition. Mapping column name to column value. * Specifications of a table partition. Mapping column name to column value.
*/ */
......
...@@ -24,9 +24,9 @@ import org.apache.spark.sql.AnalysisException ...@@ -24,9 +24,9 @@ import org.apache.spark.sql.AnalysisException
/** /**
* A reasonable complete test suite (i.e. behaviors) for a [[Catalog]]. * A reasonable complete test suite (i.e. behaviors) for a [[ExternalCatalog]].
* *
* Implementations of the [[Catalog]] interface can create test suites by extending this. * Implementations of the [[ExternalCatalog]] interface can create test suites by extending this.
*/ */
abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
private lazy val storageFormat = CatalogStorageFormat( private lazy val storageFormat = CatalogStorageFormat(
...@@ -45,7 +45,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { ...@@ -45,7 +45,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
protected val tableOutputFormat: String = "org.apache.park.serde.MyOutputFormat" protected val tableOutputFormat: String = "org.apache.park.serde.MyOutputFormat"
protected def newUriForDatabase(): String = "uri" protected def newUriForDatabase(): String = "uri"
protected def resetState(): Unit = { } protected def resetState(): Unit = { }
protected def newEmptyCatalog(): Catalog protected def newEmptyCatalog(): ExternalCatalog
// Clear all state after each test // Clear all state after each test
override def afterEach(): Unit = { override def afterEach(): Unit = {
...@@ -68,7 +68,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { ...@@ -68,7 +68,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
* - part2 * - part2
* - func1 * - func1
*/ */
private def newBasicCatalog(): Catalog = { private def newBasicCatalog(): ExternalCatalog = {
val catalog = newEmptyCatalog() val catalog = newEmptyCatalog()
// When testing against a real catalog, the default database may already exist // When testing against a real catalog, the default database may already exist
catalog.createDatabase(newDb("default"), ignoreIfExists = true) catalog.createDatabase(newDb("default"), ignoreIfExists = true)
...@@ -104,7 +104,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { ...@@ -104,7 +104,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
* Note: Hive sets some random serde things, so we just compare the specs here. * Note: Hive sets some random serde things, so we just compare the specs here.
*/ */
private def catalogPartitionsEqual( private def catalogPartitionsEqual(
catalog: Catalog, catalog: ExternalCatalog,
db: String, db: String,
table: String, table: String,
parts: Seq[CatalogTablePartition]): Boolean = { parts: Seq[CatalogTablePartition]): Boolean = {
......
...@@ -19,5 +19,5 @@ package org.apache.spark.sql.catalyst.catalog ...@@ -19,5 +19,5 @@ package org.apache.spark.sql.catalyst.catalog
/** Test suite for the [[InMemoryCatalog]]. */ /** Test suite for the [[InMemoryCatalog]]. */
class InMemoryCatalogSuite extends CatalogTestCases { class InMemoryCatalogSuite extends CatalogTestCases {
override protected def newEmptyCatalog(): Catalog = new InMemoryCatalog override protected def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
} }
...@@ -33,8 +33,8 @@ import org.apache.spark.sql.hive.client.HiveClient ...@@ -33,8 +33,8 @@ import org.apache.spark.sql.hive.client.HiveClient
* A persistent implementation of the system catalog using Hive. * A persistent implementation of the system catalog using Hive.
* All public methods must be synchronized for thread-safety. * All public methods must be synchronized for thread-safety.
*/ */
private[spark] class HiveCatalog(client: HiveClient) extends Catalog with Logging { private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog with Logging {
import Catalog._ import ExternalCatalog._
// Exceptions thrown by the hive client that we would like to wrap // Exceptions thrown by the hive client that we would like to wrap
private val clientExceptions = Set( private val clientExceptions = Set(
......
...@@ -132,7 +132,7 @@ private[hive] trait HiveClient { ...@@ -132,7 +132,7 @@ private[hive] trait HiveClient {
def dropPartitions( def dropPartitions(
db: String, db: String,
table: String, table: String,
specs: Seq[Catalog.TablePartitionSpec]): Unit specs: Seq[ExternalCatalog.TablePartitionSpec]): Unit
/** /**
* Rename one or many existing table partitions, assuming they exist. * Rename one or many existing table partitions, assuming they exist.
...@@ -140,8 +140,8 @@ private[hive] trait HiveClient { ...@@ -140,8 +140,8 @@ private[hive] trait HiveClient {
def renamePartitions( def renamePartitions(
db: String, db: String,
table: String, table: String,
specs: Seq[Catalog.TablePartitionSpec], specs: Seq[ExternalCatalog.TablePartitionSpec],
newSpecs: Seq[Catalog.TablePartitionSpec]): Unit newSpecs: Seq[ExternalCatalog.TablePartitionSpec]): Unit
/** /**
* Alter one or more table partitions whose specs match the ones specified in `newParts`, * Alter one or more table partitions whose specs match the ones specified in `newParts`,
...@@ -156,7 +156,7 @@ private[hive] trait HiveClient { ...@@ -156,7 +156,7 @@ private[hive] trait HiveClient {
final def getPartition( final def getPartition(
dbName: String, dbName: String,
tableName: String, tableName: String,
spec: Catalog.TablePartitionSpec): CatalogTablePartition = { spec: ExternalCatalog.TablePartitionSpec): CatalogTablePartition = {
getPartitionOption(dbName, tableName, spec).getOrElse { getPartitionOption(dbName, tableName, spec).getOrElse {
throw new NoSuchPartitionException(dbName, tableName, spec) throw new NoSuchPartitionException(dbName, tableName, spec)
} }
...@@ -166,14 +166,14 @@ private[hive] trait HiveClient { ...@@ -166,14 +166,14 @@ private[hive] trait HiveClient {
final def getPartitionOption( final def getPartitionOption(
db: String, db: String,
table: String, table: String,
spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = { spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition] = {
getPartitionOption(getTable(db, table), spec) getPartitionOption(getTable(db, table), spec)
} }
/** Returns the specified partition or None if it does not exist. */ /** Returns the specified partition or None if it does not exist. */
def getPartitionOption( def getPartitionOption(
table: CatalogTable, table: CatalogTable,
spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition]
/** Returns all partitions for the given table. */ /** Returns all partitions for the given table. */
final def getAllPartitions(db: String, table: String): Seq[CatalogTablePartition] = { final def getAllPartitions(db: String, table: String): Seq[CatalogTablePartition] = {
......
...@@ -366,7 +366,7 @@ private[hive] class HiveClientImpl( ...@@ -366,7 +366,7 @@ private[hive] class HiveClientImpl(
override def dropPartitions( override def dropPartitions(
db: String, db: String,
table: String, table: String,
specs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState { specs: Seq[ExternalCatalog.TablePartitionSpec]): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call // TODO: figure out how to drop multiple partitions in one call
specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, true) } specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, true) }
} }
...@@ -374,8 +374,8 @@ private[hive] class HiveClientImpl( ...@@ -374,8 +374,8 @@ private[hive] class HiveClientImpl(
override def renamePartitions( override def renamePartitions(
db: String, db: String,
table: String, table: String,
specs: Seq[Catalog.TablePartitionSpec], specs: Seq[ExternalCatalog.TablePartitionSpec],
newSpecs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState { newSpecs: Seq[ExternalCatalog.TablePartitionSpec]): Unit = withHiveState {
require(specs.size == newSpecs.size, "number of old and new partition specs differ") require(specs.size == newSpecs.size, "number of old and new partition specs differ")
val catalogTable = getTable(db, table) val catalogTable = getTable(db, table)
val hiveTable = toHiveTable(catalogTable) val hiveTable = toHiveTable(catalogTable)
...@@ -397,7 +397,7 @@ private[hive] class HiveClientImpl( ...@@ -397,7 +397,7 @@ private[hive] class HiveClientImpl(
override def getPartitionOption( override def getPartitionOption(
table: CatalogTable, table: CatalogTable,
spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table) val hiveTable = toHiveTable(table)
val hivePartition = client.getPartition(hiveTable, spec.asJava, false) val hivePartition = client.getPartition(hiveTable, spec.asJava, false)
Option(hivePartition).map(fromHivePartition) Option(hivePartition).map(fromHivePartition)
......
...@@ -44,6 +44,6 @@ class HiveCatalogSuite extends CatalogTestCases { ...@@ -44,6 +44,6 @@ class HiveCatalogSuite extends CatalogTestCases {
protected override def resetState(): Unit = client.reset() protected override def resetState(): Unit = client.reset()
protected override def newEmptyCatalog(): Catalog = new HiveCatalog(client) protected override def newEmptyCatalog(): ExternalCatalog = new HiveCatalog(client)
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment