diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 4db3edb733a568d0bb1709e8a02fe1e48b21a29f..2ffe0ac9bc9822db7a94e975bda37bf6ec725636 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -46,7 +46,16 @@ object MimaExcludes { // [SPARK-16967] Move Mesos to Module ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"), // [SPARK-16240] ML persistence backward compatibility for LDA - ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$") + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$"), + // [SPARK-17717] Add Find and Exists method to Catalog. + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findDatabase"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findTable"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findFunction"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findColumn"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.columnExists") ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 1aed245fdd332dc8528c212b6fa3cd84e5cb6a9e..b439022d227cc3d9d9356e425d37aba1bb40e6ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -101,6 +101,89 @@ abstract class Catalog { @throws[AnalysisException]("database or table does not exist") def listColumns(dbName: String, tableName: String): Dataset[Column] + /** + * Find the database with the specified name. This throws an AnalysisException when the database + * cannot be found. + * + * @since 2.1.0 + */ + @throws[AnalysisException]("database does not exist") + def findDatabase(dbName: String): Database + + /** + * Find the table with the specified name. This table can be a temporary table or a table in the + * current database. This throws an AnalysisException when the table cannot be found. + * + * @since 2.1.0 + */ + @throws[AnalysisException]("table does not exist") + def findTable(tableName: String): Table + + /** + * Find the table with the specified name in the specified database. This throws an + * AnalysisException when the table cannot be found. + * + * @since 2.1.0 + */ + @throws[AnalysisException]("database or table does not exist") + def findTable(dbName: String, tableName: String): Table + + /** + * Find the function with the specified name. This function can be a temporary function or a + * function in the current database. This throws an AnalysisException when the function cannot + * be found. + * + * @since 2.1.0 + */ + @throws[AnalysisException]("function does not exist") + def findFunction(functionName: String): Function + + /** + * Find the function with the specified name. This throws an AnalysisException when the function + * cannot be found. + * + * @since 2.1.0 + */ + @throws[AnalysisException]("database or function does not exist") + def findFunction(dbName: String, functionName: String): Function + + /** + * Check if the database with the specified name exists. + * + * @since 2.1.0 + */ + def databaseExists(dbName: String): Boolean + + /** + * Check if the table with the specified name exists. This can either be a temporary table or a + * table in the current database. + * + * @since 2.1.0 + */ + def tableExists(tableName: String): Boolean + + /** + * Check if the table with the specified name exists in the specified database. + * + * @since 2.1.0 + */ + def tableExists(dbName: String, tableName: String): Boolean + + /** + * Check if the function with the specified name exists. This can either be a temporary function + * or a function in the current database. + * + * @since 2.1.0 + */ + def functionExists(functionName: String): Boolean + + /** + * Check if the function with the specified name exists in the specified database. + * + * @since 2.1.0 + */ + def functionExists(dbName: String, functionName: String): Boolean + /** * :: Experimental :: * Creates an external table from the given path and returns the corresponding DataFrame. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index f2525357658993b6e852990794072294e0feecd7..a1087edd03fdfd95ec486cf38770659129313ff6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -23,10 +23,10 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.Experimental import org.apache.spark.sql._ import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table} -import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog} +import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.types.StructType @@ -69,15 +69,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ override def listDatabases(): Dataset[Database] = { val databases = sessionCatalog.listDatabases().map { dbName => - val metadata = sessionCatalog.getDatabaseMetadata(dbName) - new Database( - name = metadata.name, - description = metadata.description, - locationUri = metadata.locationUri) + makeDatabase(sessionCatalog.getDatabaseMetadata(dbName)) } CatalogImpl.makeDataset(databases, sparkSession) } + private def makeDatabase(metadata: CatalogDatabase): Database = { + new Database( + name = metadata.name, + description = metadata.description, + locationUri = metadata.locationUri) + } + /** * Returns a list of tables in the current database. * This includes all temporary tables. @@ -94,18 +97,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { override def listTables(dbName: String): Dataset[Table] = { requireDatabaseExists(dbName) val tables = sessionCatalog.listTables(dbName).map { tableIdent => - val isTemp = tableIdent.database.isEmpty - val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent)) - new Table( - name = tableIdent.identifier, - database = metadata.flatMap(_.identifier.database).orNull, - description = metadata.flatMap(_.comment).orNull, - tableType = metadata.map(_.tableType.name).getOrElse("TEMPORARY"), - isTemporary = isTemp) + makeTable(tableIdent, tableIdent.database.isEmpty) } CatalogImpl.makeDataset(tables, sparkSession) } + private def makeTable(tableIdent: TableIdentifier, isTemp: Boolean): Table = { + val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent)) + new Table( + name = tableIdent.identifier, + database = metadata.flatMap(_.identifier.database).orNull, + description = metadata.flatMap(_.comment).orNull, + tableType = metadata.map(_.tableType.name).getOrElse("TEMPORARY"), + isTemporary = isTemp) + } + /** * Returns a list of functions registered in the current database. * This includes all temporary functions @@ -121,18 +127,22 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { @throws[AnalysisException]("database does not exist") override def listFunctions(dbName: String): Dataset[Function] = { requireDatabaseExists(dbName) - val functions = sessionCatalog.listFunctions(dbName).map { case (funcIdent, _) => - val metadata = sessionCatalog.lookupFunctionInfo(funcIdent) - new Function( - name = funcIdent.identifier, - database = funcIdent.database.orNull, - description = null, // for now, this is always undefined - className = metadata.getClassName, - isTemporary = funcIdent.database.isEmpty) + val functions = sessionCatalog.listFunctions(dbName).map { case (functIdent, _) => + makeFunction(functIdent) } CatalogImpl.makeDataset(functions, sparkSession) } + private def makeFunction(funcIdent: FunctionIdentifier): Function = { + val metadata = sessionCatalog.lookupFunctionInfo(funcIdent) + new Function( + name = funcIdent.identifier, + database = funcIdent.database.orNull, + description = null, // for now, this is always undefined + className = metadata.getClassName, + isTemporary = funcIdent.database.isEmpty) + } + /** * Returns a list of columns for the given table in the current database. */ @@ -167,6 +177,100 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { CatalogImpl.makeDataset(columns, sparkSession) } + /** + * Find the database with the specified name. This throws an [[AnalysisException]] when no + * [[Database]] can be found. + */ + override def findDatabase(dbName: String): Database = { + if (sessionCatalog.databaseExists(dbName)) { + makeDatabase(sessionCatalog.getDatabaseMetadata(dbName)) + } else { + throw new AnalysisException(s"The specified database $dbName does not exist.") + } + } + + /** + * Find the table with the specified name. This table can be a temporary table or a table in the + * current database. This throws an [[AnalysisException]] when no [[Table]] can be found. + */ + override def findTable(tableName: String): Table = { + findTable(null, tableName) + } + + /** + * Find the table with the specified name in the specified database. This throws an + * [[AnalysisException]] when no [[Table]] can be found. + */ + override def findTable(dbName: String, tableName: String): Table = { + val tableIdent = TableIdentifier(tableName, Option(dbName)) + val isTemporary = sessionCatalog.isTemporaryTable(tableIdent) + if (isTemporary || sessionCatalog.tableExists(tableIdent)) { + makeTable(tableIdent, isTemporary) + } else { + throw new AnalysisException(s"The specified table $tableIdent does not exist.") + } + } + + /** + * Find the function with the specified name. This function can be a temporary function or a + * function in the current database. This throws an [[AnalysisException]] when no [[Function]] + * can be found. + */ + override def findFunction(functionName: String): Function = { + findFunction(null, functionName) + } + + /** + * Find the function with the specified name. This returns [[None]] when no [[Function]] can be + * found. + */ + override def findFunction(dbName: String, functionName: String): Function = { + val functionIdent = FunctionIdentifier(functionName, Option(dbName)) + if (sessionCatalog.functionExists(functionIdent)) { + makeFunction(functionIdent) + } else { + throw new AnalysisException(s"The specified function $functionIdent does not exist.") + } + } + + /** + * Check if the database with the specified name exists. + */ + override def databaseExists(dbName: String): Boolean = { + sessionCatalog.databaseExists(dbName) + } + + /** + * Check if the table with the specified name exists. This can either be a temporary table or a + * table in the current database. + */ + override def tableExists(tableName: String): Boolean = { + tableExists(null, tableName) + } + + /** + * Check if the table with the specified name exists in the specified database. + */ + override def tableExists(dbName: String, tableName: String): Boolean = { + val tableIdent = TableIdentifier(tableName, Option(dbName)) + sessionCatalog.isTemporaryTable(tableIdent) || sessionCatalog.tableExists(tableIdent) + } + + /** + * Check if the function with the specified name exists. This can either be a temporary function + * or a function in the current database. + */ + override def functionExists(functionName: String): Boolean = { + functionExists(null, functionName) + } + + /** + * Check if the function with the specified name exists in the specified database. + */ + override def functionExists(dbName: String, functionName: String): Boolean = { + sessionCatalog.functionExists(FunctionIdentifier(functionName, Option(dbName))) + } + /** * :: Experimental :: * Creates an external table from the given path and returns the corresponding DataFrame. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 3dc67ffafb048c17fc066823232eec190abed3c2..783bf77f86b469bb9362f77b4e5b46efb29b271a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -340,6 +340,124 @@ class CatalogSuite } } + test("find database") { + intercept[AnalysisException](spark.catalog.findDatabase("db10")) + withTempDatabase { db => + assert(spark.catalog.findDatabase(db).name === db) + } + } + + test("find table") { + withTempDatabase { db => + withTable(s"tbl_x", s"$db.tbl_y") { + // Try to find non existing tables. + intercept[AnalysisException](spark.catalog.findTable("tbl_x")) + intercept[AnalysisException](spark.catalog.findTable("tbl_y")) + intercept[AnalysisException](spark.catalog.findTable(db, "tbl_y")) + + // Create objects. + createTempTable("tbl_x") + createTable("tbl_y", Some(db)) + + // Find a temporary table + assert(spark.catalog.findTable("tbl_x").name === "tbl_x") + + // Find a qualified table + assert(spark.catalog.findTable(db, "tbl_y").name === "tbl_y") + + // Find an unqualified table using the current database + intercept[AnalysisException](spark.catalog.findTable("tbl_y")) + spark.catalog.setCurrentDatabase(db) + assert(spark.catalog.findTable("tbl_y").name === "tbl_y") + } + } + } + + test("find function") { + withTempDatabase { db => + withUserDefinedFunction("fn1" -> true, s"$db.fn2" -> false) { + // Try to find non existing functions. + intercept[AnalysisException](spark.catalog.findFunction("fn1")) + intercept[AnalysisException](spark.catalog.findFunction("fn2")) + intercept[AnalysisException](spark.catalog.findFunction(db, "fn2")) + + // Create objects. + createTempFunction("fn1") + createFunction("fn2", Some(db)) + + // Find a temporary function + assert(spark.catalog.findFunction("fn1").name === "fn1") + + // Find a qualified function + assert(spark.catalog.findFunction(db, "fn2").name === "fn2") + + // Find an unqualified function using the current database + intercept[AnalysisException](spark.catalog.findFunction("fn2")) + spark.catalog.setCurrentDatabase(db) + assert(spark.catalog.findFunction("fn2").name === "fn2") + } + } + } + + test("database exists") { + assert(!spark.catalog.databaseExists("db10")) + createDatabase("db10") + assert(spark.catalog.databaseExists("db10")) + dropDatabase("db10") + } + + test("table exists") { + withTempDatabase { db => + withTable(s"tbl_x", s"$db.tbl_y") { + // Try to find non existing tables. + assert(!spark.catalog.tableExists("tbl_x")) + assert(!spark.catalog.tableExists("tbl_y")) + assert(!spark.catalog.tableExists(db, "tbl_y")) + + // Create objects. + createTempTable("tbl_x") + createTable("tbl_y", Some(db)) + + // Find a temporary table + assert(spark.catalog.tableExists("tbl_x")) + + // Find a qualified table + assert(spark.catalog.tableExists(db, "tbl_y")) + + // Find an unqualified table using the current database + assert(!spark.catalog.tableExists("tbl_y")) + spark.catalog.setCurrentDatabase(db) + assert(spark.catalog.tableExists("tbl_y")) + } + } + } + + test("function exists") { + withTempDatabase { db => + withUserDefinedFunction("fn1" -> true, s"$db.fn2" -> false) { + // Try to find non existing functions. + assert(!spark.catalog.functionExists("fn1")) + assert(!spark.catalog.functionExists("fn2")) + assert(!spark.catalog.functionExists(db, "fn2")) + + // Create objects. + createTempFunction("fn1") + createFunction("fn2", Some(db)) + + // Find a temporary function + assert(spark.catalog.functionExists("fn1")) + + // Find a qualified function + assert(spark.catalog.functionExists(db, "fn2")) + + // Find an unqualified function using the current database + assert(!spark.catalog.functionExists("fn2")) + spark.catalog.setCurrentDatabase(db) + assert(spark.catalog.functionExists("fn2")) + } + } + } + // TODO: add tests for the rest of them }