Skip to content
Snippets Groups Projects
Commit e2b3d236 authored by Herman van Hovell's avatar Herman van Hovell Committed by Reynold Xin
Browse files

[SPARK-20420][SQL] Add events to the external catalog

## What changes were proposed in this pull request?
It is often useful to be able to track changes to the `ExternalCatalog`. This PR makes the `ExternalCatalog` emit events when a catalog object is changed. Events are fired before and after the change.

The following events are fired per object:

- Database
  - CreateDatabasePreEvent: event fired before the database is created.
  - CreateDatabaseEvent: event fired after the database has been created.
  - DropDatabasePreEvent: event fired before the database is dropped.
  - DropDatabaseEvent: event fired after the database has been dropped.
- Table
  - CreateTablePreEvent: event fired before the table is created.
  - CreateTableEvent: event fired after the table has been created.
  - RenameTablePreEvent: event fired before the table is renamed.
  - RenameTableEvent: event fired after the table has been renamed.
  - DropTablePreEvent: event fired before the table is dropped.
  - DropTableEvent: event fired after the table has been dropped.
- Function
  - CreateFunctionPreEvent: event fired before the function is created.
  - CreateFunctionEvent: event fired after the function has been created.
  - RenameFunctionPreEvent: event fired before the function is renamed.
  - RenameFunctionEvent: event fired after the function has been renamed.
  - DropFunctionPreEvent: event fired before the function is dropped.
  - DropFunctionPreEvent: event fired after the function has been dropped.

The current events currently only contain the names of the object modified. We add more events, and more details at a later point.

A user can monitor changes to the external catalog by adding a listener to the Spark listener bus checking for `ExternalCatalogEvent`s using the `SparkListener.onOtherEvent` hook. A more direct approach is add listener directly to the `ExternalCatalog`.

## How was this patch tested?
Added the `ExternalCatalogEventSuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #17710 from hvanhovell/SPARK-20420.
parent 48d760d0
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog ...@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException} import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ListenerBus
/** /**
* Interface for the system catalog (of functions, partitions, tables, and databases). * Interface for the system catalog (of functions, partitions, tables, and databases).
...@@ -30,7 +31,8 @@ import org.apache.spark.sql.types.StructType ...@@ -30,7 +31,8 @@ import org.apache.spark.sql.types.StructType
* *
* Implementations should throw [[NoSuchDatabaseException]] when databases don't exist. * Implementations should throw [[NoSuchDatabaseException]] when databases don't exist.
*/ */
abstract class ExternalCatalog { abstract class ExternalCatalog
extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
import CatalogTypes.TablePartitionSpec import CatalogTypes.TablePartitionSpec
protected def requireDbExists(db: String): Unit = { protected def requireDbExists(db: String): Unit = {
...@@ -61,9 +63,22 @@ abstract class ExternalCatalog { ...@@ -61,9 +63,22 @@ abstract class ExternalCatalog {
// Databases // Databases
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit final def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
val db = dbDefinition.name
postToAll(CreateDatabasePreEvent(db))
doCreateDatabase(dbDefinition, ignoreIfExists)
postToAll(CreateDatabaseEvent(db))
}
protected def doCreateDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
final def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
postToAll(DropDatabasePreEvent(db))
doDropDatabase(db, ignoreIfNotExists, cascade)
postToAll(DropDatabaseEvent(db))
}
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit protected def doDropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/** /**
* Alter a database whose name matches the one specified in `dbDefinition`, * Alter a database whose name matches the one specified in `dbDefinition`,
...@@ -88,11 +103,39 @@ abstract class ExternalCatalog { ...@@ -88,11 +103,39 @@ abstract class ExternalCatalog {
// Tables // Tables
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit final def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
val db = tableDefinition.database
val name = tableDefinition.identifier.table
postToAll(CreateTablePreEvent(db, name))
doCreateTable(tableDefinition, ignoreIfExists)
postToAll(CreateTableEvent(db, name))
}
def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
def renameTable(db: String, oldName: String, newName: String): Unit final def dropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = {
postToAll(DropTablePreEvent(db, table))
doDropTable(db, table, ignoreIfNotExists, purge)
postToAll(DropTableEvent(db, table))
}
protected def doDropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit
final def renameTable(db: String, oldName: String, newName: String): Unit = {
postToAll(RenameTablePreEvent(db, oldName, newName))
doRenameTable(db, oldName, newName)
postToAll(RenameTableEvent(db, oldName, newName))
}
protected def doRenameTable(db: String, oldName: String, newName: String): Unit
/** /**
* Alter a table whose database and name match the ones specified in `tableDefinition`, assuming * Alter a table whose database and name match the ones specified in `tableDefinition`, assuming
...@@ -269,11 +312,30 @@ abstract class ExternalCatalog { ...@@ -269,11 +312,30 @@ abstract class ExternalCatalog {
// Functions // Functions
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
def createFunction(db: String, funcDefinition: CatalogFunction): Unit final def createFunction(db: String, funcDefinition: CatalogFunction): Unit = {
val name = funcDefinition.identifier.funcName
postToAll(CreateFunctionPreEvent(db, name))
doCreateFunction(db, funcDefinition)
postToAll(CreateFunctionEvent(db, name))
}
def dropFunction(db: String, funcName: String): Unit protected def doCreateFunction(db: String, funcDefinition: CatalogFunction): Unit
def renameFunction(db: String, oldName: String, newName: String): Unit final def dropFunction(db: String, funcName: String): Unit = {
postToAll(DropFunctionPreEvent(db, funcName))
doDropFunction(db, funcName)
postToAll(DropFunctionEvent(db, funcName))
}
protected def doDropFunction(db: String, funcName: String): Unit
final def renameFunction(db: String, oldName: String, newName: String): Unit = {
postToAll(RenameFunctionPreEvent(db, oldName, newName))
doRenameFunction(db, oldName, newName)
postToAll(RenameFunctionEvent(db, oldName, newName))
}
protected def doRenameFunction(db: String, oldName: String, newName: String): Unit
def getFunction(db: String, funcName: String): CatalogFunction def getFunction(db: String, funcName: String): CatalogFunction
...@@ -281,4 +343,9 @@ abstract class ExternalCatalog { ...@@ -281,4 +343,9 @@ abstract class ExternalCatalog {
def listFunctions(db: String, pattern: String): Seq[String] def listFunctions(db: String, pattern: String): Seq[String]
override protected def doPostEvent(
listener: ExternalCatalogEventListener,
event: ExternalCatalogEvent): Unit = {
listener.onEvent(event)
}
} }
...@@ -98,7 +98,7 @@ class InMemoryCatalog( ...@@ -98,7 +98,7 @@ class InMemoryCatalog(
// Databases // Databases
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
override def createDatabase( override protected def doCreateDatabase(
dbDefinition: CatalogDatabase, dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = synchronized { ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) { if (catalog.contains(dbDefinition.name)) {
...@@ -119,7 +119,7 @@ class InMemoryCatalog( ...@@ -119,7 +119,7 @@ class InMemoryCatalog(
} }
} }
override def dropDatabase( override protected def doDropDatabase(
db: String, db: String,
ignoreIfNotExists: Boolean, ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = synchronized { cascade: Boolean): Unit = synchronized {
...@@ -180,7 +180,7 @@ class InMemoryCatalog( ...@@ -180,7 +180,7 @@ class InMemoryCatalog(
// Tables // Tables
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
override def createTable( override protected def doCreateTable(
tableDefinition: CatalogTable, tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized { ignoreIfExists: Boolean): Unit = synchronized {
assert(tableDefinition.identifier.database.isDefined) assert(tableDefinition.identifier.database.isDefined)
...@@ -221,7 +221,7 @@ class InMemoryCatalog( ...@@ -221,7 +221,7 @@ class InMemoryCatalog(
} }
} }
override def dropTable( override protected def doDropTable(
db: String, db: String,
table: String, table: String,
ignoreIfNotExists: Boolean, ignoreIfNotExists: Boolean,
...@@ -264,7 +264,10 @@ class InMemoryCatalog( ...@@ -264,7 +264,10 @@ class InMemoryCatalog(
} }
} }
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized { override protected def doRenameTable(
db: String,
oldName: String,
newName: String): Unit = synchronized {
requireTableExists(db, oldName) requireTableExists(db, oldName)
requireTableNotExists(db, newName) requireTableNotExists(db, newName)
val oldDesc = catalog(db).tables(oldName) val oldDesc = catalog(db).tables(oldName)
...@@ -565,18 +568,21 @@ class InMemoryCatalog( ...@@ -565,18 +568,21 @@ class InMemoryCatalog(
// Functions // Functions
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized { override protected def doCreateFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db) requireDbExists(db)
requireFunctionNotExists(db, func.identifier.funcName) requireFunctionNotExists(db, func.identifier.funcName)
catalog(db).functions.put(func.identifier.funcName, func) catalog(db).functions.put(func.identifier.funcName, func)
} }
override def dropFunction(db: String, funcName: String): Unit = synchronized { override protected def doDropFunction(db: String, funcName: String): Unit = synchronized {
requireFunctionExists(db, funcName) requireFunctionExists(db, funcName)
catalog(db).functions.remove(funcName) catalog(db).functions.remove(funcName)
} }
override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized { override protected def doRenameFunction(
db: String,
oldName: String,
newName: String): Unit = synchronized {
requireFunctionExists(db, oldName) requireFunctionExists(db, oldName)
requireFunctionNotExists(db, newName) requireFunctionNotExists(db, newName)
val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db))) val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.scheduler.SparkListenerEvent
/**
* Event emitted by the external catalog when it is modified. Events are either fired before or
* after the modification (the event should document this).
*/
trait ExternalCatalogEvent extends SparkListenerEvent
/**
* Listener interface for external catalog modification events.
*/
trait ExternalCatalogEventListener {
def onEvent(event: ExternalCatalogEvent): Unit
}
/**
* Event fired when a database is create or dropped.
*/
trait DatabaseEvent extends ExternalCatalogEvent {
/**
* Database of the object that was touched.
*/
val database: String
}
/**
* Event fired before a database is created.
*/
case class CreateDatabasePreEvent(database: String) extends DatabaseEvent
/**
* Event fired after a database has been created.
*/
case class CreateDatabaseEvent(database: String) extends DatabaseEvent
/**
* Event fired before a database is dropped.
*/
case class DropDatabasePreEvent(database: String) extends DatabaseEvent
/**
* Event fired after a database has been dropped.
*/
case class DropDatabaseEvent(database: String) extends DatabaseEvent
/**
* Event fired when a table is created, dropped or renamed.
*/
trait TableEvent extends DatabaseEvent {
/**
* Name of the table that was touched.
*/
val name: String
}
/**
* Event fired before a table is created.
*/
case class CreateTablePreEvent(database: String, name: String) extends TableEvent
/**
* Event fired after a table has been created.
*/
case class CreateTableEvent(database: String, name: String) extends TableEvent
/**
* Event fired before a table is dropped.
*/
case class DropTablePreEvent(database: String, name: String) extends TableEvent
/**
* Event fired after a table has been dropped.
*/
case class DropTableEvent(database: String, name: String) extends TableEvent
/**
* Event fired before a table is renamed.
*/
case class RenameTablePreEvent(
database: String,
name: String,
newName: String)
extends TableEvent
/**
* Event fired after a table has been renamed.
*/
case class RenameTableEvent(
database: String,
name: String,
newName: String)
extends TableEvent
/**
* Event fired when a function is created, dropped or renamed.
*/
trait FunctionEvent extends DatabaseEvent {
/**
* Name of the function that was touched.
*/
val name: String
}
/**
* Event fired before a function is created.
*/
case class CreateFunctionPreEvent(database: String, name: String) extends FunctionEvent
/**
* Event fired after a function has been created.
*/
case class CreateFunctionEvent(database: String, name: String) extends FunctionEvent
/**
* Event fired before a function is dropped.
*/
case class DropFunctionPreEvent(database: String, name: String) extends FunctionEvent
/**
* Event fired after a function has been dropped.
*/
case class DropFunctionEvent(database: String, name: String) extends FunctionEvent
/**
* Event fired before a function is renamed.
*/
case class RenameFunctionPreEvent(
database: String,
name: String,
newName: String)
extends FunctionEvent
/**
* Event fired after a function has been renamed.
*/
case class RenameFunctionEvent(
database: String,
name: String,
newName: String)
extends FunctionEvent
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.catalog
import java.net.URI
import java.nio.file.{Files, Path}
import scala.collection.mutable
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.types.StructType
/**
* Test Suite for external catalog events
*/
class ExternalCatalogEventSuite extends SparkFunSuite {
protected def newCatalog: ExternalCatalog = new InMemoryCatalog()
private def testWithCatalog(
name: String)(
f: (ExternalCatalog, Seq[ExternalCatalogEvent] => Unit) => Unit): Unit = test(name) {
val catalog = newCatalog
val recorder = mutable.Buffer.empty[ExternalCatalogEvent]
catalog.addListener(new ExternalCatalogEventListener {
override def onEvent(event: ExternalCatalogEvent): Unit = {
recorder += event
}
})
f(catalog, (expected: Seq[ExternalCatalogEvent]) => {
val actual = recorder.clone()
recorder.clear()
assert(expected === actual)
})
}
private def createDbDefinition(uri: URI): CatalogDatabase = {
CatalogDatabase(name = "db5", description = "", locationUri = uri, Map.empty)
}
private def createDbDefinition(): CatalogDatabase = {
createDbDefinition(preparePath(Files.createTempDirectory("db_")))
}
private def preparePath(path: Path): URI = path.normalize().toUri
testWithCatalog("database") { (catalog, checkEvents) =>
// CREATE
val dbDefinition = createDbDefinition()
catalog.createDatabase(dbDefinition, ignoreIfExists = false)
checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
catalog.createDatabase(dbDefinition, ignoreIfExists = true)
checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
intercept[AnalysisException] {
catalog.createDatabase(dbDefinition, ignoreIfExists = false)
}
checkEvents(CreateDatabasePreEvent("db5") :: Nil)
// DROP
intercept[AnalysisException] {
catalog.dropDatabase("db4", ignoreIfNotExists = false, cascade = false)
}
checkEvents(DropDatabasePreEvent("db4") :: Nil)
catalog.dropDatabase("db5", ignoreIfNotExists = false, cascade = false)
checkEvents(DropDatabasePreEvent("db5") :: DropDatabaseEvent("db5") :: Nil)
catalog.dropDatabase("db4", ignoreIfNotExists = true, cascade = false)
checkEvents(DropDatabasePreEvent("db4") :: DropDatabaseEvent("db4") :: Nil)
}
testWithCatalog("table") { (catalog, checkEvents) =>
val path1 = Files.createTempDirectory("db_")
val path2 = Files.createTempDirectory(path1, "tbl_")
val uri1 = preparePath(path1)
val uri2 = preparePath(path2)
// CREATE
val dbDefinition = createDbDefinition(uri1)
val storage = CatalogStorageFormat.empty.copy(
locationUri = Option(uri2))
val tableDefinition = CatalogTable(
identifier = TableIdentifier("tbl1", Some("db5")),
tableType = CatalogTableType.MANAGED,
storage = storage,
schema = new StructType().add("id", "long"))
catalog.createDatabase(dbDefinition, ignoreIfExists = false)
checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
catalog.createTable(tableDefinition, ignoreIfExists = false)
checkEvents(CreateTablePreEvent("db5", "tbl1") :: CreateTableEvent("db5", "tbl1") :: Nil)
catalog.createTable(tableDefinition, ignoreIfExists = true)
checkEvents(CreateTablePreEvent("db5", "tbl1") :: CreateTableEvent("db5", "tbl1") :: Nil)
intercept[AnalysisException] {
catalog.createTable(tableDefinition, ignoreIfExists = false)
}
checkEvents(CreateTablePreEvent("db5", "tbl1") :: Nil)
// RENAME
catalog.renameTable("db5", "tbl1", "tbl2")
checkEvents(
RenameTablePreEvent("db5", "tbl1", "tbl2") ::
RenameTableEvent("db5", "tbl1", "tbl2") :: Nil)
intercept[AnalysisException] {
catalog.renameTable("db5", "tbl1", "tbl2")
}
checkEvents(RenameTablePreEvent("db5", "tbl1", "tbl2") :: Nil)
// DROP
intercept[AnalysisException] {
catalog.dropTable("db5", "tbl1", ignoreIfNotExists = false, purge = true)
}
checkEvents(DropTablePreEvent("db5", "tbl1") :: Nil)
catalog.dropTable("db5", "tbl2", ignoreIfNotExists = false, purge = true)
checkEvents(DropTablePreEvent("db5", "tbl2") :: DropTableEvent("db5", "tbl2") :: Nil)
catalog.dropTable("db5", "tbl2", ignoreIfNotExists = true, purge = true)
checkEvents(DropTablePreEvent("db5", "tbl2") :: DropTableEvent("db5", "tbl2") :: Nil)
}
testWithCatalog("function") { (catalog, checkEvents) =>
// CREATE
val dbDefinition = createDbDefinition()
val functionDefinition = CatalogFunction(
identifier = FunctionIdentifier("fn7", Some("db5")),
className = "",
resources = Seq.empty)
val newIdentifier = functionDefinition.identifier.copy(funcName = "fn4")
val renamedFunctionDefinition = functionDefinition.copy(identifier = newIdentifier)
catalog.createDatabase(dbDefinition, ignoreIfExists = false)
checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
catalog.createFunction("db5", functionDefinition)
checkEvents(CreateFunctionPreEvent("db5", "fn7") :: CreateFunctionEvent("db5", "fn7") :: Nil)
intercept[AnalysisException] {
catalog.createFunction("db5", functionDefinition)
}
checkEvents(CreateFunctionPreEvent("db5", "fn7") :: Nil)
// RENAME
catalog.renameFunction("db5", "fn7", "fn4")
checkEvents(
RenameFunctionPreEvent("db5", "fn7", "fn4") ::
RenameFunctionEvent("db5", "fn7", "fn4") :: Nil)
intercept[AnalysisException] {
catalog.renameFunction("db5", "fn7", "fn4")
}
checkEvents(RenameFunctionPreEvent("db5", "fn7", "fn4") :: Nil)
// DROP
intercept[AnalysisException] {
catalog.dropFunction("db5", "fn7")
}
checkEvents(DropFunctionPreEvent("db5", "fn7") :: Nil)
catalog.dropFunction("db5", "fn4")
checkEvents(DropFunctionPreEvent("db5", "fn4") :: DropFunctionEvent("db5", "fn4") :: Nil)
}
}
...@@ -109,6 +109,13 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { ...@@ -109,6 +109,13 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
} }
} }
// Make sure we propagate external catalog events to the spark listener bus
externalCatalog.addListener(new ExternalCatalogEventListener {
override def onEvent(event: ExternalCatalogEvent): Unit = {
sparkContext.listenerBus.post(event)
}
})
/** /**
* A manager for global temporary views. * A manager for global temporary views.
*/ */
......
...@@ -141,13 +141,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat ...@@ -141,13 +141,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Databases // Databases
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
override def createDatabase( override protected def doCreateDatabase(
dbDefinition: CatalogDatabase, dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = withClient { ignoreIfExists: Boolean): Unit = withClient {
client.createDatabase(dbDefinition, ignoreIfExists) client.createDatabase(dbDefinition, ignoreIfExists)
} }
override def dropDatabase( override protected def doDropDatabase(
db: String, db: String,
ignoreIfNotExists: Boolean, ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = withClient { cascade: Boolean): Unit = withClient {
...@@ -194,7 +194,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat ...@@ -194,7 +194,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Tables // Tables
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
override def createTable( override protected def doCreateTable(
tableDefinition: CatalogTable, tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = withClient { ignoreIfExists: Boolean): Unit = withClient {
assert(tableDefinition.identifier.database.isDefined) assert(tableDefinition.identifier.database.isDefined)
...@@ -456,7 +456,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat ...@@ -456,7 +456,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
} }
} }
override def dropTable( override protected def doDropTable(
db: String, db: String,
table: String, table: String,
ignoreIfNotExists: Boolean, ignoreIfNotExists: Boolean,
...@@ -465,7 +465,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat ...@@ -465,7 +465,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.dropTable(db, table, ignoreIfNotExists, purge) client.dropTable(db, table, ignoreIfNotExists, purge)
} }
override def renameTable(db: String, oldName: String, newName: String): Unit = withClient { override protected def doRenameTable(
db: String,
oldName: String,
newName: String): Unit = withClient {
val rawTable = getRawTable(db, oldName) val rawTable = getRawTable(db, oldName)
// Note that Hive serde tables don't use path option in storage properties to store the value // Note that Hive serde tables don't use path option in storage properties to store the value
...@@ -1056,7 +1059,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat ...@@ -1056,7 +1059,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Functions // Functions
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
override def createFunction( override protected def doCreateFunction(
db: String, db: String,
funcDefinition: CatalogFunction): Unit = withClient { funcDefinition: CatalogFunction): Unit = withClient {
requireDbExists(db) requireDbExists(db)
...@@ -1069,12 +1072,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat ...@@ -1069,12 +1072,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier)) client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
} }
override def dropFunction(db: String, name: String): Unit = withClient { override protected def doDropFunction(db: String, name: String): Unit = withClient {
requireFunctionExists(db, name) requireFunctionExists(db, name)
client.dropFunction(db, name) client.dropFunction(db, name)
} }
override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient { override protected def doRenameFunction(
db: String,
oldName: String,
newName: String): Unit = withClient {
requireFunctionExists(db, oldName) requireFunctionExists(db, oldName)
requireFunctionNotExists(db, newName) requireFunctionNotExists(db, newName)
client.renameFunction(db, oldName, newName) client.renameFunction(db, oldName, newName)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment