diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala new file mode 100644 index 0000000000000000000000000000000000000000..9e6dfb7e9506ff2048426ed3d6e590087724cd63 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import scala.collection.mutable + +import org.apache.spark.sql.AnalysisException + + +/** + * An in-memory (ephemeral) implementation of the system catalog. + * + * All public methods should be synchronized for thread-safety. + */ +class InMemoryCatalog extends Catalog { + + private class TableDesc(var table: Table) { + val partitions = new mutable.HashMap[String, TablePartition] + } + + private class DatabaseDesc(var db: Database) { + val tables = new mutable.HashMap[String, TableDesc] + val functions = new mutable.HashMap[String, Function] + } + + private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc] + + private def filterPattern(names: Seq[String], pattern: String): Seq[String] = { + val regex = pattern.replaceAll("\\*", ".*").r + names.filter { funcName => regex.pattern.matcher(funcName).matches() } + } + + private def existsFunction(db: String, funcName: String): Boolean = { + catalog(db).functions.contains(funcName) + } + + private def existsTable(db: String, table: String): Boolean = { + catalog(db).tables.contains(table) + } + + private def assertDbExists(db: String): Unit = { + if (!catalog.contains(db)) { + throw new AnalysisException(s"Database $db does not exist") + } + } + + private def assertFunctionExists(db: String, funcName: String): Unit = { + assertDbExists(db) + if (!existsFunction(db, funcName)) { + throw new AnalysisException(s"Function $funcName does not exists in $db database") + } + } + + private def assertTableExists(db: String, table: String): Unit = { + assertDbExists(db) + if (!existsTable(db, table)) { + throw new AnalysisException(s"Table $table does not exists in $db database") + } + } + + // -------------------------------------------------------------------------- + // Databases + // -------------------------------------------------------------------------- + + override def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit = synchronized { + if (catalog.contains(dbDefinition.name)) { + if (!ifNotExists) { + throw new AnalysisException(s"Database ${dbDefinition.name} already exists.") + } + } else { + catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition)) + } + } + + override def dropDatabase( + db: String, + ignoreIfNotExists: Boolean, + cascade: Boolean): Unit = synchronized { + if (catalog.contains(db)) { + if (!cascade) { + // If cascade is false, make sure the database is empty. + if (catalog(db).tables.nonEmpty) { + throw new AnalysisException(s"Database $db is not empty. One or more tables exist.") + } + if (catalog(db).functions.nonEmpty) { + throw new AnalysisException(s"Database $db is not empty. One or more functions exist.") + } + } + // Remove the database. + catalog.remove(db) + } else { + if (!ignoreIfNotExists) { + throw new AnalysisException(s"Database $db does not exist") + } + } + } + + override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized { + assertDbExists(db) + assert(db == dbDefinition.name) + catalog(db).db = dbDefinition + } + + override def getDatabase(db: String): Database = synchronized { + assertDbExists(db) + catalog(db).db + } + + override def listDatabases(): Seq[String] = synchronized { + catalog.keySet.toSeq + } + + override def listDatabases(pattern: String): Seq[String] = synchronized { + filterPattern(listDatabases(), pattern) + } + + // -------------------------------------------------------------------------- + // Tables + // -------------------------------------------------------------------------- + + override def createTable(db: String, tableDefinition: Table, ifNotExists: Boolean) + : Unit = synchronized { + assertDbExists(db) + if (existsTable(db, tableDefinition.name)) { + if (!ifNotExists) { + throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database") + } + } else { + catalog(db).tables.put(tableDefinition.name, new TableDesc(tableDefinition)) + } + } + + override def dropTable(db: String, table: String, ignoreIfNotExists: Boolean) + : Unit = synchronized { + assertDbExists(db) + if (existsTable(db, table)) { + catalog(db).tables.remove(table) + } else { + if (!ignoreIfNotExists) { + throw new AnalysisException(s"Table $table does not exist in $db database") + } + } + } + + override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized { + assertTableExists(db, oldName) + val oldDesc = catalog(db).tables(oldName) + oldDesc.table = oldDesc.table.copy(name = newName) + catalog(db).tables.put(newName, oldDesc) + catalog(db).tables.remove(oldName) + } + + override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized { + assertTableExists(db, table) + assert(table == tableDefinition.name) + catalog(db).tables(table).table = tableDefinition + } + + override def getTable(db: String, table: String): Table = synchronized { + assertTableExists(db, table) + catalog(db).tables(table).table + } + + override def listTables(db: String): Seq[String] = synchronized { + assertDbExists(db) + catalog(db).tables.keySet.toSeq + } + + override def listTables(db: String, pattern: String): Seq[String] = synchronized { + assertDbExists(db) + filterPattern(listTables(db), pattern) + } + + // -------------------------------------------------------------------------- + // Partitions + // -------------------------------------------------------------------------- + + override def alterPartition(db: String, table: String, part: TablePartition) + : Unit = synchronized { + throw new UnsupportedOperationException + } + + override def alterPartitions(db: String, table: String, parts: Seq[TablePartition]) + : Unit = synchronized { + throw new UnsupportedOperationException + } + + // -------------------------------------------------------------------------- + // Functions + // -------------------------------------------------------------------------- + + override def createFunction( + db: String, func: Function, ifNotExists: Boolean): Unit = synchronized { + assertDbExists(db) + + if (existsFunction(db, func.name)) { + if (!ifNotExists) { + throw new AnalysisException(s"Function $func already exists in $db database") + } + } else { + catalog(db).functions.put(func.name, func) + } + } + + override def dropFunction(db: String, funcName: String): Unit = synchronized { + assertFunctionExists(db, funcName) + catalog(db).functions.remove(funcName) + } + + override def alterFunction(db: String, funcName: String, funcDefinition: Function) + : Unit = synchronized { + assertFunctionExists(db, funcName) + if (funcName != funcDefinition.name) { + // Also a rename; remove the old one and add the new one back + catalog(db).functions.remove(funcName) + } + catalog(db).functions.put(funcName, funcDefinition) + } + + override def getFunction(db: String, funcName: String): Function = synchronized { + assertFunctionExists(db, funcName) + catalog(db).functions(funcName) + } + + override def listFunctions(db: String, pattern: String): Seq[String] = synchronized { + assertDbExists(db) + val regex = pattern.replaceAll("\\*", ".*").r + filterPattern(catalog(db).functions.keysIterator.toSeq, pattern) + } + +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala new file mode 100644 index 0000000000000000000000000000000000000000..a6caf91f3304b5d9769b725a3a434f2ab171f812 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import org.apache.spark.sql.AnalysisException + + +/** + * Interface for the system catalog (of columns, partitions, tables, and databases). + * + * This is only used for non-temporary items, and implementations must be thread-safe as they + * can be accessed in multiple threads. + * + * Implementations should throw [[AnalysisException]] when table or database don't exist. + */ +abstract class Catalog { + + // -------------------------------------------------------------------------- + // Databases + // -------------------------------------------------------------------------- + + def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit + + def dropDatabase( + db: String, + ignoreIfNotExists: Boolean, + cascade: Boolean): Unit + + def alterDatabase(db: String, dbDefinition: Database): Unit + + def getDatabase(db: String): Database + + def listDatabases(): Seq[String] + + def listDatabases(pattern: String): Seq[String] + + // -------------------------------------------------------------------------- + // Tables + // -------------------------------------------------------------------------- + + def createTable(db: String, tableDefinition: Table, ignoreIfExists: Boolean): Unit + + def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit + + def renameTable(db: String, oldName: String, newName: String): Unit + + def alterTable(db: String, table: String, tableDefinition: Table): Unit + + def getTable(db: String, table: String): Table + + def listTables(db: String): Seq[String] + + def listTables(db: String, pattern: String): Seq[String] + + // -------------------------------------------------------------------------- + // Partitions + // -------------------------------------------------------------------------- + + // TODO: need more functions for partitioning. + + def alterPartition(db: String, table: String, part: TablePartition): Unit + + def alterPartitions(db: String, table: String, parts: Seq[TablePartition]): Unit + + // -------------------------------------------------------------------------- + // Functions + // -------------------------------------------------------------------------- + + def createFunction(db: String, funcDefinition: Function, ignoreIfExists: Boolean): Unit + + def dropFunction(db: String, funcName: String): Unit + + def alterFunction(db: String, funcName: String, funcDefinition: Function): Unit + + def getFunction(db: String, funcName: String): Function + + def listFunctions(db: String, pattern: String): Seq[String] + +} + + +/** + * A function defined in the catalog. + * + * @param name name of the function + * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc" + */ +case class Function( + name: String, + className: String +) + + +/** + * Storage format, used to describe how a partition or a table is stored. + */ +case class StorageFormat( + locationUri: String, + inputFormat: String, + outputFormat: String, + serde: String, + serdeProperties: Map[String, String] +) + + +/** + * A column in a table. + */ +case class Column( + name: String, + dataType: String, + nullable: Boolean, + comment: String +) + + +/** + * A partition (Hive style) defined in the catalog. + * + * @param values values for the partition columns + * @param storage storage format of the partition + */ +case class TablePartition( + values: Seq[String], + storage: StorageFormat +) + + +/** + * A table defined in the catalog. + * + * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the + * future once we have a better understanding of how we want to handle skewed columns. + */ +case class Table( + name: String, + description: String, + schema: Seq[Column], + partitionColumns: Seq[Column], + sortColumns: Seq[Column], + storage: StorageFormat, + numBuckets: Int, + properties: Map[String, String], + tableType: String, + createTime: Long, + lastAccessTime: Long, + viewOriginalText: Option[String], + viewText: Option[String]) { + + require(tableType == "EXTERNAL_TABLE" || tableType == "INDEX_TABLE" || + tableType == "MANAGED_TABLE" || tableType == "VIRTUAL_VIEW") +} + + +/** + * A database defined in the catalog. + */ +case class Database( + name: String, + description: String, + locationUri: String, + properties: Map[String, String] +) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala new file mode 100644 index 0000000000000000000000000000000000000000..ab9d5ac8a20ebcfe0e058a06fd493e28bd585d38 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException + + +/** + * A reasonable complete test suite (i.e. behaviors) for a [[Catalog]]. + * + * Implementations of the [[Catalog]] interface can create test suites by extending this. + */ +abstract class CatalogTestCases extends SparkFunSuite { + + protected def newEmptyCatalog(): Catalog + + /** + * Creates a basic catalog, with the following structure: + * + * db1 + * db2 + * - tbl1 + * - tbl2 + * - func1 + */ + private def newBasicCatalog(): Catalog = { + val catalog = newEmptyCatalog() + catalog.createDatabase(newDb("db1"), ifNotExists = false) + catalog.createDatabase(newDb("db2"), ifNotExists = false) + + catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false) + catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false) + catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false) + catalog + } + + private def newFunc(): Function = Function("funcname", "org.apache.spark.MyFunc") + + private def newDb(name: String = "default"): Database = + Database(name, name + " description", "uri", Map.empty) + + private def newTable(name: String): Table = + Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE", 0, 0, + None, None) + + private def newFunc(name: String): Function = Function(name, "class.name") + + // -------------------------------------------------------------------------- + // Databases + // -------------------------------------------------------------------------- + + test("basic create, drop and list databases") { + val catalog = newEmptyCatalog() + catalog.createDatabase(newDb(), ifNotExists = false) + assert(catalog.listDatabases().toSet == Set("default")) + + catalog.createDatabase(newDb("default2"), ifNotExists = false) + assert(catalog.listDatabases().toSet == Set("default", "default2")) + } + + test("get database when a database exists") { + val db1 = newBasicCatalog().getDatabase("db1") + assert(db1.name == "db1") + assert(db1.description.contains("db1")) + } + + test("get database should throw exception when the database does not exist") { + intercept[AnalysisException] { newBasicCatalog().getDatabase("db_that_does_not_exist") } + } + + test("list databases without pattern") { + val catalog = newBasicCatalog() + assert(catalog.listDatabases().toSet == Set("db1", "db2")) + } + + test("list databases with pattern") { + val catalog = newBasicCatalog() + assert(catalog.listDatabases("db").toSet == Set.empty) + assert(catalog.listDatabases("db*").toSet == Set("db1", "db2")) + assert(catalog.listDatabases("*1").toSet == Set("db1")) + assert(catalog.listDatabases("db2").toSet == Set("db2")) + } + + test("drop database") { + val catalog = newBasicCatalog() + catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false) + assert(catalog.listDatabases().toSet == Set("db2")) + } + + test("drop database when the database is not empty") { + // Throw exception if there are functions left + val catalog1 = newBasicCatalog() + catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false) + catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false) + intercept[AnalysisException] { + catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) + } + + // Throw exception if there are tables left + val catalog2 = newBasicCatalog() + catalog2.dropFunction("db2", "func1") + intercept[AnalysisException] { + catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) + } + + // When cascade is true, it should drop them + val catalog3 = newBasicCatalog() + catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true) + assert(catalog3.listDatabases().toSet == Set("db1")) + } + + test("drop database when the database does not exist") { + val catalog = newBasicCatalog() + + intercept[AnalysisException] { + catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false) + } + + catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false) + } + + test("alter database") { + val catalog = newBasicCatalog() + catalog.alterDatabase("db1", Database("db1", "new description", "lll", Map.empty)) + assert(catalog.getDatabase("db1").description == "new description") + } + + test("alter database should throw exception when the database does not exist") { + intercept[AnalysisException] { + newBasicCatalog().alterDatabase("no_db", Database("no_db", "ddd", "lll", Map.empty)) + } + } + + // -------------------------------------------------------------------------- + // Tables + // -------------------------------------------------------------------------- + + test("drop table") { + val catalog = newBasicCatalog() + assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false) + assert(catalog.listTables("db2").toSet == Set("tbl2")) + } + + test("drop table when database / table does not exist") { + val catalog = newBasicCatalog() + + // Should always throw exception when the database does not exist + intercept[AnalysisException] { + catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false) + } + + intercept[AnalysisException] { + catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true) + } + + // Should throw exception when the table does not exist, if ignoreIfNotExists is false + intercept[AnalysisException] { + catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false) + } + + catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true) + } + + test("rename table") { + val catalog = newBasicCatalog() + + assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + catalog.renameTable("db2", "tbl1", "tblone") + assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2")) + } + + test("rename table when database / table does not exist") { + val catalog = newBasicCatalog() + + intercept[AnalysisException] { // Throw exception when the database does not exist + catalog.renameTable("unknown_db", "unknown_table", "unknown_table") + } + + intercept[AnalysisException] { // Throw exception when the table does not exist + catalog.renameTable("db2", "unknown_table", "unknown_table") + } + } + + test("alter table") { + val catalog = newBasicCatalog() + catalog.alterTable("db2", "tbl1", newTable("tbl1").copy(createTime = 10)) + assert(catalog.getTable("db2", "tbl1").createTime == 10) + } + + test("alter table when database / table does not exist") { + val catalog = newBasicCatalog() + + intercept[AnalysisException] { // Throw exception when the database does not exist + catalog.alterTable("unknown_db", "unknown_table", newTable("unknown_table")) + } + + intercept[AnalysisException] { // Throw exception when the table does not exist + catalog.alterTable("db2", "unknown_table", newTable("unknown_table")) + } + } + + test("get table") { + assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1") + } + + test("get table when database / table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.getTable("unknown_db", "unknown_table") + } + + intercept[AnalysisException] { + catalog.getTable("db2", "unknown_table") + } + } + + test("list tables without pattern") { + val catalog = newBasicCatalog() + assert(catalog.listTables("db1").toSet == Set.empty) + assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + } + + test("list tables with pattern") { + val catalog = newBasicCatalog() + + // Test when database does not exist + intercept[AnalysisException] { catalog.listTables("unknown_db") } + + assert(catalog.listTables("db1", "*").toSet == Set.empty) + assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2")) + assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2")) + assert(catalog.listTables("db2", "*1").toSet == Set("tbl1")) + } + + // -------------------------------------------------------------------------- + // Partitions + // -------------------------------------------------------------------------- + + // TODO: Add tests cases for partitions + + // -------------------------------------------------------------------------- + // Functions + // -------------------------------------------------------------------------- + + // TODO: Add tests cases for functions +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..871f0a0f46a220400520cf54b6c4eaeb26f28e43 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +/** Test suite for the [[InMemoryCatalog]]. */ +class InMemoryCatalogSuite extends CatalogTestCases { + override protected def newEmptyCatalog(): Catalog = new InMemoryCatalog +}