Skip to content
Snippets Groups Projects
Commit e9131ec2 authored by gatorsmile's avatar gatorsmile Committed by Wenchen Fan
Browse files

[SPARK-15185][SQL] InMemoryCatalog: Silent Removal of an Existent...

[SPARK-15185][SQL] InMemoryCatalog: Silent Removal of an Existent Table/Function/Partitions by Rename

#### What changes were proposed in this pull request?
So far, in the implementation of InMemoryCatalog, we do not check if the new/destination table/function/partition exists or not. Thus, we just silently remove the existent table/function/partition.

This PR is to detect them and issue an appropriate exception.

#### How was this patch tested?
Added the related test cases. They also verify if HiveExternalCatalog also detects these errors.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #12960 from gatorsmile/renameInMemoryCatalog.
parent 454ba4d6
No related branches found
No related tags found
No related merge requests found
......@@ -59,6 +59,13 @@ class InMemoryCatalog extends ExternalCatalog {
}
}
private def requireFunctionNotExists(db: String, funcName: String): Unit = {
if (functionExists(db, funcName)) {
throw new AnalysisException(
s"Function already exists: '$funcName' exists in database '$db'")
}
}
private def requireTableExists(db: String, table: String): Unit = {
if (!tableExists(db, table)) {
throw new AnalysisException(
......@@ -66,10 +73,34 @@ class InMemoryCatalog extends ExternalCatalog {
}
}
private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
if (!partitionExists(db, table, spec)) {
private def requireTableNotExists(db: String, table: String): Unit = {
if (tableExists(db, table)) {
throw new AnalysisException(
s"Partition not found: database '$db' table '$table' does not contain: '$spec'")
s"Table or view exists: '$table' exists in database '$db'")
}
}
private def requirePartitionsExist(
db: String,
table: String,
specs: Seq[TablePartitionSpec]): Unit = {
specs foreach { s =>
if (!partitionExists(db, table, s)) {
throw new AnalysisException(
s"Partition not found: database '$db' table '$table' does not contain: '$s'")
}
}
}
private def requirePartitionsNotExist(
db: String,
table: String,
specs: Seq[TablePartitionSpec]): Unit = {
specs foreach { s =>
if (partitionExists(db, table, s)) {
throw new AnalysisException(
s"Partition exists: database '$db' table '$table' already contains: '$s'")
}
}
}
......@@ -171,6 +202,7 @@ class InMemoryCatalog extends ExternalCatalog {
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
requireTableExists(db, oldName)
requireTableNotExists(db, newName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))
catalog(db).tables.put(newName, oldDesc)
......@@ -272,6 +304,8 @@ class InMemoryCatalog extends ExternalCatalog {
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = synchronized {
require(specs.size == newSpecs.size, "number of old and new partition specs differ")
requirePartitionsExist(db, table, specs)
requirePartitionsNotExist(db, table, newSpecs)
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
val existingParts = catalog(db).tables(table).partitions
......@@ -284,8 +318,8 @@ class InMemoryCatalog extends ExternalCatalog {
db: String,
table: String,
parts: Seq[CatalogTablePartition]): Unit = synchronized {
requirePartitionsExist(db, table, parts.map(p => p.spec))
parts.foreach { p =>
requirePartitionExists(db, table, p.spec)
catalog(db).tables(table).partitions.put(p.spec, p)
}
}
......@@ -294,7 +328,7 @@ class InMemoryCatalog extends ExternalCatalog {
db: String,
table: String,
spec: TablePartitionSpec): CatalogTablePartition = synchronized {
requirePartitionExists(db, table, spec)
requirePartitionsExist(db, table, Seq(spec))
catalog(db).tables(table).partitions(spec)
}
......@@ -330,6 +364,7 @@ class InMemoryCatalog extends ExternalCatalog {
override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
requireFunctionNotExists(db, newName)
val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
catalog(db).functions.remove(oldName)
catalog(db).functions.put(newName, newFunc)
......
......@@ -198,6 +198,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}
}
test("rename table when destination table already exists") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.renameTable("db2", "tbl1", "tbl2")
}
}
test("alter table") {
val catalog = newBasicCatalog()
val tbl1 = catalog.getTable("db2", "tbl1")
......@@ -356,6 +363,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}
}
test("rename partitions when the new partition already exists") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.renamePartitions("db2", "tbl2", Seq(part1.spec), Seq(part2.spec))
}
}
test("alter partitions") {
val catalog = newBasicCatalog()
try {
......@@ -480,6 +494,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}
}
test("rename function when new function already exists") {
val catalog = newBasicCatalog()
catalog.createFunction("db2", newFunc("func2", Some("db2")))
intercept[AnalysisException] {
catalog.renameFunction("db2", "func1", "func2")
}
}
test("list functions") {
val catalog = newBasicCatalog()
catalog.createFunction("db2", newFunc("func2"))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment