diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 81399db9bc070af43918dc863f713a648be72f0a..e9f04eecf8d706dc30ee59505346db068ae25a51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.catalog.Catalog.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index cba4de34f2b44dc23e17d0dc905df1a2e66efdc0..f3fa7958db41b3b20befa7fd03fafaddd4aae77e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -25,10 +25,14 @@ import org.apache.spark.sql.AnalysisException /** * An in-memory (ephemeral) implementation of the system catalog. * + * This is a dummy implementation that does not require setting up external systems. + * It is intended for testing or exploration purposes only and should not be used + * in production. + * * All public methods should be synchronized for thread-safety. */ -class InMemoryCatalog extends Catalog { - import Catalog._ +class InMemoryCatalog extends ExternalCatalog { + import ExternalCatalog._ private class TableDesc(var table: CatalogTable) { val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index dac5f023d1f58fcb7c310331bacc7d60bd827bb8..db34af3d26fc5f10ac5f664e815075807dc59aa2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -26,12 +26,13 @@ import org.apache.spark.sql.AnalysisException * Interface for the system catalog (of columns, partitions, tables, and databases). * * This is only used for non-temporary items, and implementations must be thread-safe as they - * can be accessed in multiple threads. + * can be accessed in multiple threads. This is an external catalog because it is expected to + * interact with external systems. * * Implementations should throw [[AnalysisException]] when table or database don't exist. */ -abstract class Catalog { - import Catalog._ +abstract class ExternalCatalog { + import ExternalCatalog._ protected def requireDbExists(db: String): Unit = { if (!databaseExists(db)) { @@ -198,7 +199,9 @@ case class CatalogColumn( * @param spec partition spec values indexed by column name * @param storage storage format of the partition */ -case class CatalogTablePartition(spec: Catalog.TablePartitionSpec, storage: CatalogStorageFormat) +case class CatalogTablePartition( + spec: ExternalCatalog.TablePartitionSpec, + storage: CatalogStorageFormat) /** @@ -263,7 +266,7 @@ case class CatalogDatabase( properties: Map[String, String]) -object Catalog { +object ExternalCatalog { /** * Specifications of a table partition. Mapping column name to column value. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index e0d1220d13e7c33a187451c559c40eb8ec43ccd5..b03ba81b50572fc9213e46d651884e3204de5798 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -24,9 +24,9 @@ import org.apache.spark.sql.AnalysisException /** - * A reasonable complete test suite (i.e. behaviors) for a [[Catalog]]. + * A reasonable complete test suite (i.e. behaviors) for a [[ExternalCatalog]]. * - * Implementations of the [[Catalog]] interface can create test suites by extending this. + * Implementations of the [[ExternalCatalog]] interface can create test suites by extending this. */ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { private lazy val storageFormat = CatalogStorageFormat( @@ -45,7 +45,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { protected val tableOutputFormat: String = "org.apache.park.serde.MyOutputFormat" protected def newUriForDatabase(): String = "uri" protected def resetState(): Unit = { } - protected def newEmptyCatalog(): Catalog + protected def newEmptyCatalog(): ExternalCatalog // Clear all state after each test override def afterEach(): Unit = { @@ -68,7 +68,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { * - part2 * - func1 */ - private def newBasicCatalog(): Catalog = { + private def newBasicCatalog(): ExternalCatalog = { val catalog = newEmptyCatalog() // When testing against a real catalog, the default database may already exist catalog.createDatabase(newDb("default"), ignoreIfExists = true) @@ -104,7 +104,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { * Note: Hive sets some random serde things, so we just compare the specs here. */ private def catalogPartitionsEqual( - catalog: Catalog, + catalog: ExternalCatalog, db: String, table: String, parts: Seq[CatalogTablePartition]): Boolean = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala index 871f0a0f46a220400520cf54b6c4eaeb26f28e43..9531758ffd59749ea89c76262d402eba5fcf6dd4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala @@ -19,5 +19,5 @@ package org.apache.spark.sql.catalyst.catalog /** Test suite for the [[InMemoryCatalog]]. */ class InMemoryCatalogSuite extends CatalogTestCases { - override protected def newEmptyCatalog(): Catalog = new InMemoryCatalog + override protected def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala index 21b9cfb820eaa542829994d16be3aebf49a36c8c..5185e9aac05f037ab92c80d94940473462b7b062 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala @@ -33,8 +33,8 @@ import org.apache.spark.sql.hive.client.HiveClient * A persistent implementation of the system catalog using Hive. * All public methods must be synchronized for thread-safety. */ -private[spark] class HiveCatalog(client: HiveClient) extends Catalog with Logging { - import Catalog._ +private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog with Logging { + import ExternalCatalog._ // Exceptions thrown by the hive client that we would like to wrap private val clientExceptions = Set( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 6a0a089fd1f44274b8c9813b0c975711d1fd00db..b32aff25be68d0d2121e7621c973955bbf5ca902 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -132,7 +132,7 @@ private[hive] trait HiveClient { def dropPartitions( db: String, table: String, - specs: Seq[Catalog.TablePartitionSpec]): Unit + specs: Seq[ExternalCatalog.TablePartitionSpec]): Unit /** * Rename one or many existing table partitions, assuming they exist. @@ -140,8 +140,8 @@ private[hive] trait HiveClient { def renamePartitions( db: String, table: String, - specs: Seq[Catalog.TablePartitionSpec], - newSpecs: Seq[Catalog.TablePartitionSpec]): Unit + specs: Seq[ExternalCatalog.TablePartitionSpec], + newSpecs: Seq[ExternalCatalog.TablePartitionSpec]): Unit /** * Alter one or more table partitions whose specs match the ones specified in `newParts`, @@ -156,7 +156,7 @@ private[hive] trait HiveClient { final def getPartition( dbName: String, tableName: String, - spec: Catalog.TablePartitionSpec): CatalogTablePartition = { + spec: ExternalCatalog.TablePartitionSpec): CatalogTablePartition = { getPartitionOption(dbName, tableName, spec).getOrElse { throw new NoSuchPartitionException(dbName, tableName, spec) } @@ -166,14 +166,14 @@ private[hive] trait HiveClient { final def getPartitionOption( db: String, table: String, - spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = { + spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition] = { getPartitionOption(getTable(db, table), spec) } /** Returns the specified partition or None if it does not exist. */ def getPartitionOption( table: CatalogTable, - spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] + spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition] /** Returns all partitions for the given table. */ final def getAllPartitions(db: String, table: String): Seq[CatalogTablePartition] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5d62854c40c5d6d898b94b21fb8174ce07f48580..c1c8e631ee74088c5ca6d93a5e9c6b5ea61f95ba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -366,7 +366,7 @@ private[hive] class HiveClientImpl( override def dropPartitions( db: String, table: String, - specs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState { + specs: Seq[ExternalCatalog.TablePartitionSpec]): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, true) } } @@ -374,8 +374,8 @@ private[hive] class HiveClientImpl( override def renamePartitions( db: String, table: String, - specs: Seq[Catalog.TablePartitionSpec], - newSpecs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState { + specs: Seq[ExternalCatalog.TablePartitionSpec], + newSpecs: Seq[ExternalCatalog.TablePartitionSpec]): Unit = withHiveState { require(specs.size == newSpecs.size, "number of old and new partition specs differ") val catalogTable = getTable(db, table) val hiveTable = toHiveTable(catalogTable) @@ -397,7 +397,7 @@ private[hive] class HiveClientImpl( override def getPartitionOption( table: CatalogTable, - spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { + spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { val hiveTable = toHiveTable(table) val hivePartition = client.getPartition(hiveTable, spec.asJava, false) Option(hivePartition).map(fromHivePartition) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala index f73e7e2351447d6c2a55b11e2fbeed9b239de089..f557abcd522e67383c9e045131f49397a955684a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala @@ -44,6 +44,6 @@ class HiveCatalogSuite extends CatalogTestCases { protected override def resetState(): Unit = client.reset() - protected override def newEmptyCatalog(): Catalog = new HiveCatalog(client) + protected override def newEmptyCatalog(): ExternalCatalog = new HiveCatalog(client) }