Skip to content
Snippets Groups Projects
Commit 8cba57a7 authored by gatorsmile's avatar gatorsmile Committed by Andrew Or
Browse files

[SPARK-14124][SQL][FOLLOWUP] Implement Database-related DDL Commands

#### What changes were proposed in this pull request?

First, a few test cases failed in mac OS X  because the property value of `java.io.tmpdir` does not include a trailing slash on some platform. Hive always removes the last trailing slash. For example, what I got in the web:
```
Win NT  --> C:\TEMP\
Win XP  --> C:\TEMP
Solaris --> /var/tmp/
Linux   --> /var/tmp
```
Second, a couple of test cases are added to verify if the commands work properly.

#### How was this patch tested?
Added a test case for it and correct the previous test cases.

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #12081 from gatorsmile/mkdir.
parent 63db2bd2
No related branches found
No related tags found
No related merge requests found
......@@ -146,6 +146,10 @@ class SessionCatalog(
currentDb = db
}
/**
* Get the path for creating a non-default database when database location is not provided
* by users.
*/
def getDefaultDBPath(db: String): String = {
val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase
new Path(new Path(conf.warehousePath), database + ".db").toString
......
......@@ -40,7 +40,10 @@ import org.apache.spark.sql.types._
* unless 'ifNotExists' is true.
* The syntax of using this command in SQL is:
* {{{
* CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name
* CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
* [COMMENT database_comment]
* [LOCATION database_directory]
* [WITH DBPROPERTIES (property_name=property_value, ...)];
* }}}
*/
case class CreateDatabase(
......
......@@ -95,49 +95,81 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
}
private def appendTrailingSlash(path: String): String = {
if (!path.endsWith(File.separator)) path + File.separator else path
}
test("the qualified path of a database is stored in the catalog") {
val catalog = sqlContext.sessionState.catalog
val path = System.getProperty("java.io.tmpdir")
// The generated temp path is not qualified.
assert(!path.startsWith("file:/"))
sql(s"CREATE DATABASE db1 LOCATION '$path'")
val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri
assert("file" === pathInCatalog.getScheme)
assert(path === pathInCatalog.getPath)
withSQLConf(
SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir"))) {
sql(s"CREATE DATABASE db2")
val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
withTempDir { tmpDir =>
val path = tmpDir.toString
// The generated temp path is not qualified.
assert(!path.startsWith("file:/"))
sql(s"CREATE DATABASE db1 LOCATION '$path'")
val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri
assert("file" === pathInCatalog.getScheme)
assert(s"${sqlContext.conf.warehousePath}/db2.db" === pathInCatalog.getPath)
}
val expectedPath = if (path.endsWith(File.separator)) path.dropRight(1) else path
assert(expectedPath === pathInCatalog.getPath)
withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
sql(s"CREATE DATABASE db2")
val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
assert("file" === pathInCatalog.getScheme)
val expectedPath = appendTrailingSlash(sqlContext.conf.warehousePath) + "db2.db"
assert(expectedPath === pathInCatalog.getPath)
}
sql("DROP DATABASE db1")
sql("DROP DATABASE db2")
sql("DROP DATABASE db1")
sql("DROP DATABASE db2")
}
}
test("Create/Drop Database") {
withSQLConf(
SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
val catalog = sqlContext.sessionState.catalog
withTempDir { tmpDir =>
val path = tmpDir.toString
withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
val databaseNames = Seq("db1", "`database`")
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
val expectedLocation =
"file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
expectedLocation,
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbNameWithoutBackTicks))
} finally {
catalog.reset()
}
}
}
}
}
test("Create/Drop Database - location") {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")
withTempDir { tmpDir =>
val path = tmpDir.toString
val dbPath = "file:" + path
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
sql(s"CREATE DATABASE $dbName Location '$path'")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
val expectedLocation =
"file:" + System.getProperty("java.io.tmpdir") +
File.separator + s"$dbNameWithoutBackTicks.db"
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
expectedLocation,
if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath,
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbNameWithoutBackTicks))
......@@ -149,77 +181,78 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
test("Create Database - database already exists") {
withSQLConf(
SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
val expectedLocation =
"file:" + System.getProperty("java.io.tmpdir") +
File.separator + s"$dbNameWithoutBackTicks.db"
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
expectedLocation,
Map.empty))
val message = intercept[AnalysisException] {
withTempDir { tmpDir =>
val path = tmpDir.toString
withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
}.getMessage
assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
} finally {
catalog.reset()
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
val expectedLocation =
"file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
expectedLocation,
Map.empty))
val message = intercept[AnalysisException] {
sql(s"CREATE DATABASE $dbName")
}.getMessage
assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
} finally {
catalog.reset()
}
}
}
}
}
test("Alter/Describe Database") {
withSQLConf(
SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")
withTempDir { tmpDir =>
val path = tmpDir.toString
withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
val location =
"file:" + System.getProperty("java.io.tmpdir") +
File.separator + s"$dbNameWithoutBackTicks.db"
sql(s"CREATE DATABASE $dbName")
checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "") :: Nil)
sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)
sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
} finally {
catalog.reset()
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
val location = "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
sql(s"CREATE DATABASE $dbName")
checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "") :: Nil)
sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)
sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
} finally {
catalog.reset()
}
}
}
}
......@@ -251,7 +284,43 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
// TODO: test drop database in restrict mode
test("drop non-empty database in restrict mode") {
val catalog = sqlContext.sessionState.catalog
val dbName = "db1"
sql(s"CREATE DATABASE $dbName")
// create a table in database
val tableIdent1 = TableIdentifier("tab1", Some(dbName))
createTable(catalog, tableIdent1)
// drop a non-empty database in Restrict mode
val message = intercept[AnalysisException] {
sql(s"DROP DATABASE $dbName RESTRICT")
}.getMessage
assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
catalog.dropTable(tableIdent1, ignoreIfNotExists = false)
assert(catalog.listDatabases().contains(dbName))
sql(s"DROP DATABASE $dbName RESTRICT")
assert(!catalog.listDatabases().contains(dbName))
}
test("drop non-empty database in cascade mode") {
val catalog = sqlContext.sessionState.catalog
val dbName = "db1"
sql(s"CREATE DATABASE $dbName")
// create a table in database
val tableIdent1 = TableIdentifier("tab1", Some(dbName))
createTable(catalog, tableIdent1)
// drop a non-empty database in CASCADE mode
assert(catalog.listTables(dbName).contains(tableIdent1))
assert(catalog.listDatabases().contains(dbName))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.listDatabases().contains(dbName))
}
test("create table in default db") {
val catalog = sqlContext.sessionState.catalog
......
......@@ -20,21 +20,37 @@ package org.apache.spark.sql.hive.execution
import java.io.File
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
import hiveContext.implicits._
override def afterEach(): Unit = {
try {
// drop all databases, tables and functions after each test
sqlContext.sessionState.catalog.reset()
} finally {
super.afterEach()
}
}
// check if the directory for recording the data of the table exists.
private def tableDirectoryExists(tableIdentifier: TableIdentifier): Boolean = {
private def tableDirectoryExists(
tableIdentifier: TableIdentifier,
dbPath: Option[String] = None): Boolean = {
val expectedTablePath =
hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)
if (dbPath.isEmpty) {
hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)
} else {
new Path(new Path(dbPath.get), tableIdentifier.table).toString
}
val filesystemPath = new Path(expectedTablePath)
val fs = filesystemPath.getFileSystem(hiveContext.sessionState.newHadoopConf())
fs.exists(filesystemPath)
......@@ -56,7 +72,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
test("drop managed tables") {
test("drop managed tables in default database") {
withTempDir { tmpDir =>
val tabName = "tab1"
withTable(tabName) {
......@@ -83,7 +99,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
test("drop external data source table") {
test("drop external data source table in default database") {
withTempDir { tmpDir =>
val tabName = "tab1"
withTable(tabName) {
......@@ -365,4 +381,126 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
.exists(_.getString(0) == "# Detailed Table Information"))
}
}
private def createDatabaseWithLocation(tmpDir: File, dirExists: Boolean): Unit = {
val catalog = sqlContext.sessionState.catalog
val dbName = "db1"
val tabName = "tab1"
val fs = new Path(tmpDir.toString).getFileSystem(hiveContext.sessionState.newHadoopConf())
withTable(tabName) {
if (dirExists) {
assert(tmpDir.listFiles.isEmpty)
} else {
assert(!fs.exists(new Path(tmpDir.toString)))
}
sql(s"CREATE DATABASE $dbName Location '$tmpDir'")
val db1 = catalog.getDatabaseMetadata(dbName)
val dbPath = "file:" + tmpDir
assert(db1 == CatalogDatabase(
dbName,
"",
if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath,
Map.empty))
sql("USE db1")
sql(s"CREATE TABLE $tabName as SELECT 1")
assert(tableDirectoryExists(TableIdentifier(tabName), Option(tmpDir.toString)))
assert(tmpDir.listFiles.nonEmpty)
sql(s"DROP TABLE $tabName")
assert(tmpDir.listFiles.isEmpty)
sql(s"DROP DATABASE $dbName")
assert(!fs.exists(new Path(tmpDir.toString)))
}
}
test("create/drop database - location without pre-created directory") {
withTempPath { tmpDir =>
createDatabaseWithLocation(tmpDir, dirExists = false)
}
}
test("create/drop database - location with pre-created directory") {
withTempDir { tmpDir =>
createDatabaseWithLocation(tmpDir, dirExists = true)
}
}
private def appendTrailingSlash(path: String): String = {
if (!path.endsWith(File.separator)) path + File.separator else path
}
private def dropDatabase(cascade: Boolean, tableExists: Boolean): Unit = {
withTempPath { tmpDir =>
val path = tmpDir.toString
withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
val dbName = "db1"
val fs = new Path(path).getFileSystem(hiveContext.sessionState.newHadoopConf())
val dbPath = new Path(path)
// the database directory does not exist
assert(!fs.exists(dbPath))
sql(s"CREATE DATABASE $dbName")
val catalog = sqlContext.sessionState.catalog
val expectedDBLocation = "file:" + appendTrailingSlash(dbPath.toString) + s"$dbName.db"
val db1 = catalog.getDatabaseMetadata(dbName)
assert(db1 == CatalogDatabase(
dbName,
"",
expectedDBLocation,
Map.empty))
// the database directory was created
assert(fs.exists(dbPath) && fs.isDirectory(dbPath))
sql(s"USE $dbName")
val tabName = "tab1"
assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation)))
sql(s"CREATE TABLE $tabName as SELECT 1")
assert(tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation)))
if (!tableExists) {
sql(s"DROP TABLE $tabName")
assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation)))
}
val sqlDropDatabase = s"DROP DATABASE $dbName ${if (cascade) "CASCADE" else "RESTRICT"}"
if (tableExists && !cascade) {
val message = intercept[AnalysisException] {
sql(sqlDropDatabase)
}.getMessage
assert(message.contains(s"Database $dbName is not empty. One or more tables exist."))
// the database directory was not removed
assert(fs.exists(new Path(expectedDBLocation)))
} else {
sql(sqlDropDatabase)
// the database directory was removed and the inclusive table directories are also removed
assert(!fs.exists(new Path(expectedDBLocation)))
}
}
}
}
test("drop database containing tables - CASCADE") {
dropDatabase(cascade = true, tableExists = true)
}
test("drop an empty database - CASCADE") {
dropDatabase(cascade = true, tableExists = false)
}
test("drop database containing tables - RESTRICT") {
dropDatabase(cascade = false, tableExists = true)
}
test("drop an empty database - RESTRICT") {
dropDatabase(cascade = false, tableExists = false)
}
test("drop default database") {
val message = intercept[AnalysisException] {
sql("DROP DATABASE default")
}.getMessage
assert(message.contains("Can not drop default database"))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment