Skip to content
Snippets Groups Projects
Commit be7a2fc0 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-13078][SQL] API and test cases for internal catalog

This pull request creates an internal catalog API. The creation of this API is the first step towards consolidating SQLContext and HiveContext. I envision we will have two different implementations in Spark 2.0: (1) a simple in-memory implementation, and (2) an implementation based on the current HiveClient (ClientWrapper).

I took a look at what Hive's internal metastore interface/implementation, and then created this API based on it. I believe this is the minimal set needed in order to achieve all the needed functionality.

Author: Reynold Xin <rxin@databricks.com>

Closes #10982 from rxin/SPARK-13078.
parent a2973fed
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.catalog
import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
/**
* An in-memory (ephemeral) implementation of the system catalog.
*
* All public methods should be synchronized for thread-safety.
*/
class InMemoryCatalog extends Catalog {
private class TableDesc(var table: Table) {
val partitions = new mutable.HashMap[String, TablePartition]
}
private class DatabaseDesc(var db: Database) {
val tables = new mutable.HashMap[String, TableDesc]
val functions = new mutable.HashMap[String, Function]
}
private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
private def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
val regex = pattern.replaceAll("\\*", ".*").r
names.filter { funcName => regex.pattern.matcher(funcName).matches() }
}
private def existsFunction(db: String, funcName: String): Boolean = {
catalog(db).functions.contains(funcName)
}
private def existsTable(db: String, table: String): Boolean = {
catalog(db).tables.contains(table)
}
private def assertDbExists(db: String): Unit = {
if (!catalog.contains(db)) {
throw new AnalysisException(s"Database $db does not exist")
}
}
private def assertFunctionExists(db: String, funcName: String): Unit = {
assertDbExists(db)
if (!existsFunction(db, funcName)) {
throw new AnalysisException(s"Function $funcName does not exists in $db database")
}
}
private def assertTableExists(db: String, table: String): Unit = {
assertDbExists(db)
if (!existsTable(db, table)) {
throw new AnalysisException(s"Table $table does not exists in $db database")
}
}
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
override def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
if (!ifNotExists) {
throw new AnalysisException(s"Database ${dbDefinition.name} already exists.")
}
} else {
catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition))
}
}
override def dropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = synchronized {
if (catalog.contains(db)) {
if (!cascade) {
// If cascade is false, make sure the database is empty.
if (catalog(db).tables.nonEmpty) {
throw new AnalysisException(s"Database $db is not empty. One or more tables exist.")
}
if (catalog(db).functions.nonEmpty) {
throw new AnalysisException(s"Database $db is not empty. One or more functions exist.")
}
}
// Remove the database.
catalog.remove(db)
} else {
if (!ignoreIfNotExists) {
throw new AnalysisException(s"Database $db does not exist")
}
}
}
override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized {
assertDbExists(db)
assert(db == dbDefinition.name)
catalog(db).db = dbDefinition
}
override def getDatabase(db: String): Database = synchronized {
assertDbExists(db)
catalog(db).db
}
override def listDatabases(): Seq[String] = synchronized {
catalog.keySet.toSeq
}
override def listDatabases(pattern: String): Seq[String] = synchronized {
filterPattern(listDatabases(), pattern)
}
// --------------------------------------------------------------------------
// Tables
// --------------------------------------------------------------------------
override def createTable(db: String, tableDefinition: Table, ifNotExists: Boolean)
: Unit = synchronized {
assertDbExists(db)
if (existsTable(db, tableDefinition.name)) {
if (!ifNotExists) {
throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
}
} else {
catalog(db).tables.put(tableDefinition.name, new TableDesc(tableDefinition))
}
}
override def dropTable(db: String, table: String, ignoreIfNotExists: Boolean)
: Unit = synchronized {
assertDbExists(db)
if (existsTable(db, table)) {
catalog(db).tables.remove(table)
} else {
if (!ignoreIfNotExists) {
throw new AnalysisException(s"Table $table does not exist in $db database")
}
}
}
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
assertTableExists(db, oldName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(name = newName)
catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}
override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized {
assertTableExists(db, table)
assert(table == tableDefinition.name)
catalog(db).tables(table).table = tableDefinition
}
override def getTable(db: String, table: String): Table = synchronized {
assertTableExists(db, table)
catalog(db).tables(table).table
}
override def listTables(db: String): Seq[String] = synchronized {
assertDbExists(db)
catalog(db).tables.keySet.toSeq
}
override def listTables(db: String, pattern: String): Seq[String] = synchronized {
assertDbExists(db)
filterPattern(listTables(db), pattern)
}
// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
override def alterPartition(db: String, table: String, part: TablePartition)
: Unit = synchronized {
throw new UnsupportedOperationException
}
override def alterPartitions(db: String, table: String, parts: Seq[TablePartition])
: Unit = synchronized {
throw new UnsupportedOperationException
}
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
override def createFunction(
db: String, func: Function, ifNotExists: Boolean): Unit = synchronized {
assertDbExists(db)
if (existsFunction(db, func.name)) {
if (!ifNotExists) {
throw new AnalysisException(s"Function $func already exists in $db database")
}
} else {
catalog(db).functions.put(func.name, func)
}
}
override def dropFunction(db: String, funcName: String): Unit = synchronized {
assertFunctionExists(db, funcName)
catalog(db).functions.remove(funcName)
}
override def alterFunction(db: String, funcName: String, funcDefinition: Function)
: Unit = synchronized {
assertFunctionExists(db, funcName)
if (funcName != funcDefinition.name) {
// Also a rename; remove the old one and add the new one back
catalog(db).functions.remove(funcName)
}
catalog(db).functions.put(funcName, funcDefinition)
}
override def getFunction(db: String, funcName: String): Function = synchronized {
assertFunctionExists(db, funcName)
catalog(db).functions(funcName)
}
override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
assertDbExists(db)
val regex = pattern.replaceAll("\\*", ".*").r
filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.AnalysisException
/**
* Interface for the system catalog (of columns, partitions, tables, and databases).
*
* This is only used for non-temporary items, and implementations must be thread-safe as they
* can be accessed in multiple threads.
*
* Implementations should throw [[AnalysisException]] when table or database don't exist.
*/
abstract class Catalog {
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit
def dropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit
def alterDatabase(db: String, dbDefinition: Database): Unit
def getDatabase(db: String): Database
def listDatabases(): Seq[String]
def listDatabases(pattern: String): Seq[String]
// --------------------------------------------------------------------------
// Tables
// --------------------------------------------------------------------------
def createTable(db: String, tableDefinition: Table, ignoreIfExists: Boolean): Unit
def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit
def renameTable(db: String, oldName: String, newName: String): Unit
def alterTable(db: String, table: String, tableDefinition: Table): Unit
def getTable(db: String, table: String): Table
def listTables(db: String): Seq[String]
def listTables(db: String, pattern: String): Seq[String]
// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
// TODO: need more functions for partitioning.
def alterPartition(db: String, table: String, part: TablePartition): Unit
def alterPartitions(db: String, table: String, parts: Seq[TablePartition]): Unit
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
def createFunction(db: String, funcDefinition: Function, ignoreIfExists: Boolean): Unit
def dropFunction(db: String, funcName: String): Unit
def alterFunction(db: String, funcName: String, funcDefinition: Function): Unit
def getFunction(db: String, funcName: String): Function
def listFunctions(db: String, pattern: String): Seq[String]
}
/**
* A function defined in the catalog.
*
* @param name name of the function
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
*/
case class Function(
name: String,
className: String
)
/**
* Storage format, used to describe how a partition or a table is stored.
*/
case class StorageFormat(
locationUri: String,
inputFormat: String,
outputFormat: String,
serde: String,
serdeProperties: Map[String, String]
)
/**
* A column in a table.
*/
case class Column(
name: String,
dataType: String,
nullable: Boolean,
comment: String
)
/**
* A partition (Hive style) defined in the catalog.
*
* @param values values for the partition columns
* @param storage storage format of the partition
*/
case class TablePartition(
values: Seq[String],
storage: StorageFormat
)
/**
* A table defined in the catalog.
*
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
* future once we have a better understanding of how we want to handle skewed columns.
*/
case class Table(
name: String,
description: String,
schema: Seq[Column],
partitionColumns: Seq[Column],
sortColumns: Seq[Column],
storage: StorageFormat,
numBuckets: Int,
properties: Map[String, String],
tableType: String,
createTime: Long,
lastAccessTime: Long,
viewOriginalText: Option[String],
viewText: Option[String]) {
require(tableType == "EXTERNAL_TABLE" || tableType == "INDEX_TABLE" ||
tableType == "MANAGED_TABLE" || tableType == "VIRTUAL_VIEW")
}
/**
* A database defined in the catalog.
*/
case class Database(
name: String,
description: String,
locationUri: String,
properties: Map[String, String]
)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
/**
* A reasonable complete test suite (i.e. behaviors) for a [[Catalog]].
*
* Implementations of the [[Catalog]] interface can create test suites by extending this.
*/
abstract class CatalogTestCases extends SparkFunSuite {
protected def newEmptyCatalog(): Catalog
/**
* Creates a basic catalog, with the following structure:
*
* db1
* db2
* - tbl1
* - tbl2
* - func1
*/
private def newBasicCatalog(): Catalog = {
val catalog = newEmptyCatalog()
catalog.createDatabase(newDb("db1"), ifNotExists = false)
catalog.createDatabase(newDb("db2"), ifNotExists = false)
catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false)
catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false)
catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
catalog
}
private def newFunc(): Function = Function("funcname", "org.apache.spark.MyFunc")
private def newDb(name: String = "default"): Database =
Database(name, name + " description", "uri", Map.empty)
private def newTable(name: String): Table =
Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE", 0, 0,
None, None)
private def newFunc(name: String): Function = Function(name, "class.name")
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
test("basic create, drop and list databases") {
val catalog = newEmptyCatalog()
catalog.createDatabase(newDb(), ifNotExists = false)
assert(catalog.listDatabases().toSet == Set("default"))
catalog.createDatabase(newDb("default2"), ifNotExists = false)
assert(catalog.listDatabases().toSet == Set("default", "default2"))
}
test("get database when a database exists") {
val db1 = newBasicCatalog().getDatabase("db1")
assert(db1.name == "db1")
assert(db1.description.contains("db1"))
}
test("get database should throw exception when the database does not exist") {
intercept[AnalysisException] { newBasicCatalog().getDatabase("db_that_does_not_exist") }
}
test("list databases without pattern") {
val catalog = newBasicCatalog()
assert(catalog.listDatabases().toSet == Set("db1", "db2"))
}
test("list databases with pattern") {
val catalog = newBasicCatalog()
assert(catalog.listDatabases("db").toSet == Set.empty)
assert(catalog.listDatabases("db*").toSet == Set("db1", "db2"))
assert(catalog.listDatabases("*1").toSet == Set("db1"))
assert(catalog.listDatabases("db2").toSet == Set("db2"))
}
test("drop database") {
val catalog = newBasicCatalog()
catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
assert(catalog.listDatabases().toSet == Set("db2"))
}
test("drop database when the database is not empty") {
// Throw exception if there are functions left
val catalog1 = newBasicCatalog()
catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
intercept[AnalysisException] {
catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
}
// Throw exception if there are tables left
val catalog2 = newBasicCatalog()
catalog2.dropFunction("db2", "func1")
intercept[AnalysisException] {
catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
}
// When cascade is true, it should drop them
val catalog3 = newBasicCatalog()
catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
assert(catalog3.listDatabases().toSet == Set("db1"))
}
test("drop database when the database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
}
catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
}
test("alter database") {
val catalog = newBasicCatalog()
catalog.alterDatabase("db1", Database("db1", "new description", "lll", Map.empty))
assert(catalog.getDatabase("db1").description == "new description")
}
test("alter database should throw exception when the database does not exist") {
intercept[AnalysisException] {
newBasicCatalog().alterDatabase("no_db", Database("no_db", "ddd", "lll", Map.empty))
}
}
// --------------------------------------------------------------------------
// Tables
// --------------------------------------------------------------------------
test("drop table") {
val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false)
assert(catalog.listTables("db2").toSet == Set("tbl2"))
}
test("drop table when database / table does not exist") {
val catalog = newBasicCatalog()
// Should always throw exception when the database does not exist
intercept[AnalysisException] {
catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
}
intercept[AnalysisException] {
catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
}
// Should throw exception when the table does not exist, if ignoreIfNotExists is false
intercept[AnalysisException] {
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
}
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
}
test("rename table") {
val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
catalog.renameTable("db2", "tbl1", "tblone")
assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
}
test("rename table when database / table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] { // Throw exception when the database does not exist
catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
}
intercept[AnalysisException] { // Throw exception when the table does not exist
catalog.renameTable("db2", "unknown_table", "unknown_table")
}
}
test("alter table") {
val catalog = newBasicCatalog()
catalog.alterTable("db2", "tbl1", newTable("tbl1").copy(createTime = 10))
assert(catalog.getTable("db2", "tbl1").createTime == 10)
}
test("alter table when database / table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] { // Throw exception when the database does not exist
catalog.alterTable("unknown_db", "unknown_table", newTable("unknown_table"))
}
intercept[AnalysisException] { // Throw exception when the table does not exist
catalog.alterTable("db2", "unknown_table", newTable("unknown_table"))
}
}
test("get table") {
assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1")
}
test("get table when database / table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.getTable("unknown_db", "unknown_table")
}
intercept[AnalysisException] {
catalog.getTable("db2", "unknown_table")
}
}
test("list tables without pattern") {
val catalog = newBasicCatalog()
assert(catalog.listTables("db1").toSet == Set.empty)
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
}
test("list tables with pattern") {
val catalog = newBasicCatalog()
// Test when database does not exist
intercept[AnalysisException] { catalog.listTables("unknown_db") }
assert(catalog.listTables("db1", "*").toSet == Set.empty)
assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
assert(catalog.listTables("db2", "*1").toSet == Set("tbl1"))
}
// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
// TODO: Add tests cases for partitions
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
// TODO: Add tests cases for functions
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.catalog
/** Test suite for the [[InMemoryCatalog]]. */
class InMemoryCatalogSuite extends CatalogTestCases {
override protected def newEmptyCatalog(): Catalog = new InMemoryCatalog
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment