Skip to content
Snippets Groups Projects
Commit a6483112 authored by Andrew Or's avatar Andrew Or Committed by Reynold Xin
Browse files

[SPARK-13079][SQL] Extend and implement InMemoryCatalog

This is a step towards consolidating `SQLContext` and `HiveContext`.

This patch extends the existing Catalog API added in #10982 to include methods for handling table partitions. In particular, a partition is identified by `PartitionSpec`, which is just a `Map[String, String]`. The Catalog is still not used by anything yet, but its API is now more or less complete and an implementation is fully tested.

About 200 lines are test code.

Author: Andrew Or <andrew@databricks.com>

Closes #11069 from andrewor14/catalog.
parent a8e2ba77
No related branches found
No related tags found
No related merge requests found
......@@ -28,9 +28,10 @@ import org.apache.spark.sql.AnalysisException
* All public methods should be synchronized for thread-safety.
*/
class InMemoryCatalog extends Catalog {
import Catalog._
private class TableDesc(var table: Table) {
val partitions = new mutable.HashMap[String, TablePartition]
val partitions = new mutable.HashMap[PartitionSpec, TablePartition]
}
private class DatabaseDesc(var db: Database) {
......@@ -46,13 +47,20 @@ class InMemoryCatalog extends Catalog {
}
private def existsFunction(db: String, funcName: String): Boolean = {
assertDbExists(db)
catalog(db).functions.contains(funcName)
}
private def existsTable(db: String, table: String): Boolean = {
assertDbExists(db)
catalog(db).tables.contains(table)
}
private def existsPartition(db: String, table: String, spec: PartitionSpec): Boolean = {
assertTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
}
private def assertDbExists(db: String): Unit = {
if (!catalog.contains(db)) {
throw new AnalysisException(s"Database $db does not exist")
......@@ -60,16 +68,20 @@ class InMemoryCatalog extends Catalog {
}
private def assertFunctionExists(db: String, funcName: String): Unit = {
assertDbExists(db)
if (!existsFunction(db, funcName)) {
throw new AnalysisException(s"Function $funcName does not exists in $db database")
throw new AnalysisException(s"Function $funcName does not exist in $db database")
}
}
private def assertTableExists(db: String, table: String): Unit = {
assertDbExists(db)
if (!existsTable(db, table)) {
throw new AnalysisException(s"Table $table does not exists in $db database")
throw new AnalysisException(s"Table $table does not exist in $db database")
}
}
private def assertPartitionExists(db: String, table: String, spec: PartitionSpec): Unit = {
if (!existsPartition(db, table, spec)) {
throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
}
}
......@@ -77,9 +89,11 @@ class InMemoryCatalog extends Catalog {
// Databases
// --------------------------------------------------------------------------
override def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit = synchronized {
override def createDatabase(
dbDefinition: Database,
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
if (!ifNotExists) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Database ${dbDefinition.name} already exists.")
}
} else {
......@@ -88,9 +102,9 @@ class InMemoryCatalog extends Catalog {
}
override def dropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = synchronized {
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = synchronized {
if (catalog.contains(db)) {
if (!cascade) {
// If cascade is false, make sure the database is empty.
......@@ -133,11 +147,13 @@ class InMemoryCatalog extends Catalog {
// Tables
// --------------------------------------------------------------------------
override def createTable(db: String, tableDefinition: Table, ifNotExists: Boolean)
: Unit = synchronized {
override def createTable(
db: String,
tableDefinition: Table,
ignoreIfExists: Boolean): Unit = synchronized {
assertDbExists(db)
if (existsTable(db, tableDefinition.name)) {
if (!ifNotExists) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
}
} else {
......@@ -145,8 +161,10 @@ class InMemoryCatalog extends Catalog {
}
}
override def dropTable(db: String, table: String, ignoreIfNotExists: Boolean)
: Unit = synchronized {
override def dropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean): Unit = synchronized {
assertDbExists(db)
if (existsTable(db, table)) {
catalog(db).tables.remove(table)
......@@ -190,14 +208,67 @@ class InMemoryCatalog extends Catalog {
// Partitions
// --------------------------------------------------------------------------
override def alterPartition(db: String, table: String, part: TablePartition)
: Unit = synchronized {
throw new UnsupportedOperationException
override def createPartitions(
db: String,
table: String,
parts: Seq[TablePartition],
ignoreIfExists: Boolean): Unit = synchronized {
assertTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfExists) {
val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
if (dupSpecs.nonEmpty) {
val dupSpecsStr = dupSpecs.mkString("\n===\n")
throw new AnalysisException(
s"The following partitions already exist in database $db table $table:\n$dupSpecsStr")
}
}
parts.foreach { p => existingParts.put(p.spec, p) }
}
override def dropPartitions(
db: String,
table: String,
partSpecs: Seq[PartitionSpec],
ignoreIfNotExists: Boolean): Unit = synchronized {
assertTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfNotExists) {
val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
if (missingSpecs.nonEmpty) {
val missingSpecsStr = missingSpecs.mkString("\n===\n")
throw new AnalysisException(
s"The following partitions do not exist in database $db table $table:\n$missingSpecsStr")
}
}
partSpecs.foreach(existingParts.remove)
}
override def alterPartitions(db: String, table: String, parts: Seq[TablePartition])
: Unit = synchronized {
throw new UnsupportedOperationException
override def alterPartition(
db: String,
table: String,
spec: Map[String, String],
newPart: TablePartition): Unit = synchronized {
assertPartitionExists(db, table, spec)
val existingParts = catalog(db).tables(table).partitions
if (spec != newPart.spec) {
// Also a change in specs; remove the old one and add the new one back
existingParts.remove(spec)
}
existingParts.put(newPart.spec, newPart)
}
override def getPartition(
db: String,
table: String,
spec: Map[String, String]): TablePartition = synchronized {
assertPartitionExists(db, table, spec)
catalog(db).tables(table).partitions(spec)
}
override def listPartitions(db: String, table: String): Seq[TablePartition] = synchronized {
assertTableExists(db, table)
catalog(db).tables(table).partitions.values.toSeq
}
// --------------------------------------------------------------------------
......@@ -205,11 +276,12 @@ class InMemoryCatalog extends Catalog {
// --------------------------------------------------------------------------
override def createFunction(
db: String, func: Function, ifNotExists: Boolean): Unit = synchronized {
db: String,
func: Function,
ignoreIfExists: Boolean): Unit = synchronized {
assertDbExists(db)
if (existsFunction(db, func.name)) {
if (!ifNotExists) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Function $func already exists in $db database")
}
} else {
......@@ -222,14 +294,16 @@ class InMemoryCatalog extends Catalog {
catalog(db).functions.remove(funcName)
}
override def alterFunction(db: String, funcName: String, funcDefinition: Function)
: Unit = synchronized {
override def alterFunction(
db: String,
funcName: String,
funcDefinition: Function): Unit = synchronized {
assertFunctionExists(db, funcName)
if (funcName != funcDefinition.name) {
// Also a rename; remove the old one and add the new one back
catalog(db).functions.remove(funcName)
}
catalog(db).functions.put(funcName, funcDefinition)
catalog(db).functions.put(funcDefinition.name, funcDefinition)
}
override def getFunction(db: String, funcName: String): Function = synchronized {
......@@ -239,7 +313,6 @@ class InMemoryCatalog extends Catalog {
override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
assertDbExists(db)
val regex = pattern.replaceAll("\\*", ".*").r
filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
}
......
......@@ -29,17 +29,15 @@ import org.apache.spark.sql.AnalysisException
* Implementations should throw [[AnalysisException]] when table or database don't exist.
*/
abstract class Catalog {
import Catalog._
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit
def createDatabase(dbDefinition: Database, ignoreIfExists: Boolean): Unit
def dropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
def alterDatabase(db: String, dbDefinition: Database): Unit
......@@ -71,11 +69,28 @@ abstract class Catalog {
// Partitions
// --------------------------------------------------------------------------
// TODO: need more functions for partitioning.
def createPartitions(
db: String,
table: String,
parts: Seq[TablePartition],
ignoreIfExists: Boolean): Unit
def alterPartition(db: String, table: String, part: TablePartition): Unit
def dropPartitions(
db: String,
table: String,
parts: Seq[PartitionSpec],
ignoreIfNotExists: Boolean): Unit
def alterPartitions(db: String, table: String, parts: Seq[TablePartition]): Unit
def alterPartition(
db: String,
table: String,
spec: PartitionSpec,
newPart: TablePartition): Unit
def getPartition(db: String, table: String, spec: PartitionSpec): TablePartition
// TODO: support listing by pattern
def listPartitions(db: String, table: String): Seq[TablePartition]
// --------------------------------------------------------------------------
// Functions
......@@ -132,11 +147,11 @@ case class Column(
/**
* A partition (Hive style) defined in the catalog.
*
* @param values values for the partition columns
* @param spec partition spec values indexed by column name
* @param storage storage format of the partition
*/
case class TablePartition(
values: Seq[String],
spec: Catalog.PartitionSpec,
storage: StorageFormat
)
......@@ -176,3 +191,8 @@ case class Database(
locationUri: String,
properties: Map[String, String]
)
object Catalog {
type PartitionSpec = Map[String, String]
}
......@@ -27,6 +27,11 @@ import org.apache.spark.sql.AnalysisException
* Implementations of the [[Catalog]] interface can create test suites by extending this.
*/
abstract class CatalogTestCases extends SparkFunSuite {
private val storageFormat = StorageFormat("usa", "$", "zzz", "serde", Map.empty[String, String])
private val part1 = TablePartition(Map[String, String]("a" -> "1"), storageFormat)
private val part2 = TablePartition(Map[String, String]("b" -> "2"), storageFormat)
private val part3 = TablePartition(Map[String, String]("c" -> "3"), storageFormat)
private val funcClass = "org.apache.spark.myFunc"
protected def newEmptyCatalog(): Catalog
......@@ -41,16 +46,16 @@ abstract class CatalogTestCases extends SparkFunSuite {
*/
private def newBasicCatalog(): Catalog = {
val catalog = newEmptyCatalog()
catalog.createDatabase(newDb("db1"), ifNotExists = false)
catalog.createDatabase(newDb("db2"), ifNotExists = false)
catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false)
catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false)
catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
catalog
}
private def newFunc(): Function = Function("funcname", "org.apache.spark.MyFunc")
private def newFunc(): Function = Function("funcname", funcClass)
private def newDb(name: String = "default"): Database =
Database(name, name + " description", "uri", Map.empty)
......@@ -59,7 +64,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE", 0, 0,
None, None)
private def newFunc(name: String): Function = Function(name, "class.name")
private def newFunc(name: String): Function = Function(name, funcClass)
// --------------------------------------------------------------------------
// Databases
......@@ -67,10 +72,10 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("basic create, drop and list databases") {
val catalog = newEmptyCatalog()
catalog.createDatabase(newDb(), ifNotExists = false)
catalog.createDatabase(newDb(), ignoreIfExists = false)
assert(catalog.listDatabases().toSet == Set("default"))
catalog.createDatabase(newDb("default2"), ifNotExists = false)
catalog.createDatabase(newDb("default2"), ignoreIfExists = false)
assert(catalog.listDatabases().toSet == Set("default", "default2"))
}
......@@ -253,11 +258,194 @@ abstract class CatalogTestCases extends SparkFunSuite {
// Partitions
// --------------------------------------------------------------------------
// TODO: Add tests cases for partitions
test("basic create and list partitions") {
val catalog = newEmptyCatalog()
catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
catalog.createTable("mydb", newTable("mytbl"), ignoreIfExists = false)
catalog.createPartitions("mydb", "mytbl", Seq(part1, part2), ignoreIfExists = false)
assert(catalog.listPartitions("mydb", "mytbl").toSet == Set(part1, part2))
}
test("create partitions when database / table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false)
}
intercept[AnalysisException] {
catalog.createPartitions("db2", "does_not_exist", Seq(), ignoreIfExists = false)
}
}
test("create partitions that already exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = false)
}
catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
}
test("drop partitions") {
val catalog = newBasicCatalog()
assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part2))
val catalog2 = newBasicCatalog()
assert(catalog2.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
}
test("drop partitions when database / table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
}
intercept[AnalysisException] {
catalog.dropPartitions("db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
}
}
test("drop partitions that do not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
}
catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
}
test("get partition") {
val catalog = newBasicCatalog()
assert(catalog.getPartition("db2", "tbl2", part1.spec) == part1)
assert(catalog.getPartition("db2", "tbl2", part2.spec) == part2)
intercept[AnalysisException] {
catalog.getPartition("db2", "tbl1", part3.spec)
}
}
test("get partition when database / table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.getPartition("does_not_exist", "tbl1", part1.spec)
}
intercept[AnalysisException] {
catalog.getPartition("db2", "does_not_exist", part1.spec)
}
}
test("alter partitions") {
val catalog = newBasicCatalog()
val partSameSpec = part1.copy(storage = storageFormat.copy(serde = "myserde"))
val partNewSpec = part1.copy(spec = Map("x" -> "10"))
// alter but keep spec the same
catalog.alterPartition("db2", "tbl2", part1.spec, partSameSpec)
assert(catalog.getPartition("db2", "tbl2", part1.spec) == partSameSpec)
// alter and change spec
catalog.alterPartition("db2", "tbl2", part1.spec, partNewSpec)
intercept[AnalysisException] {
catalog.getPartition("db2", "tbl2", part1.spec)
}
assert(catalog.getPartition("db2", "tbl2", partNewSpec.spec) == partNewSpec)
}
test("alter partition when database / table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.alterPartition("does_not_exist", "tbl1", part1.spec, part1)
}
intercept[AnalysisException] {
catalog.alterPartition("db2", "does_not_exist", part1.spec, part1)
}
}
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
// TODO: Add tests cases for functions
test("basic create and list functions") {
val catalog = newEmptyCatalog()
catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
catalog.createFunction("mydb", newFunc("myfunc"), ignoreIfExists = false)
assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
}
test("create function when database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.createFunction("does_not_exist", newFunc(), ignoreIfExists = false)
}
}
test("create function that already exists") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
}
catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = true)
}
test("drop function") {
val catalog = newBasicCatalog()
assert(catalog.listFunctions("db2", "*").toSet == Set("func1"))
catalog.dropFunction("db2", "func1")
assert(catalog.listFunctions("db2", "*").isEmpty)
}
test("drop function when database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropFunction("does_not_exist", "something")
}
}
test("drop function that does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropFunction("db2", "does_not_exist")
}
}
test("get function") {
val catalog = newBasicCatalog()
assert(catalog.getFunction("db2", "func1") == newFunc("func1"))
intercept[AnalysisException] {
catalog.getFunction("db2", "does_not_exist")
}
}
test("get function when database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.getFunction("does_not_exist", "func1")
}
}
test("alter function") {
val catalog = newBasicCatalog()
assert(catalog.getFunction("db2", "func1").className == funcClass)
// alter func but keep name
catalog.alterFunction("db2", "func1", newFunc("func1").copy(className = "muhaha"))
assert(catalog.getFunction("db2", "func1").className == "muhaha")
// alter func and change name
catalog.alterFunction("db2", "func1", newFunc("funcky"))
intercept[AnalysisException] {
catalog.getFunction("db2", "func1")
}
assert(catalog.getFunction("db2", "funcky").className == funcClass)
}
test("alter function when database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.alterFunction("does_not_exist", "func1", newFunc())
}
}
test("list functions") {
val catalog = newBasicCatalog()
catalog.createFunction("db2", newFunc("func2"), ignoreIfExists = false)
catalog.createFunction("db2", newFunc("not_me"), ignoreIfExists = false)
assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me"))
assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment