diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index ff6303471e145e7ed0751fbca742f8fa4f86e779..eff420eb4c5ace69cc6c64539114b14d520637c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -146,6 +146,10 @@ class SessionCatalog(
     currentDb = db
   }
 
+  /**
+   * Get the path for creating a non-default database when database location is not provided
+   * by users.
+   */
   def getDefaultDBPath(db: String): String = {
     val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase
     new Path(new Path(conf.warehousePath), database + ".db").toString
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index aa06c014fb0a2b6d2e5a020b4639ce0f23148f85..085bdaff4e03b55da2759f452b257135cf16ee24 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -40,7 +40,10 @@ import org.apache.spark.sql.types._
  * unless 'ifNotExists' is true.
  * The syntax of using this command in SQL is:
  * {{{
- *    CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name
+ *   CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
+ *     [COMMENT database_comment]
+ *     [LOCATION database_directory]
+ *     [WITH DBPROPERTIES (property_name=property_value, ...)];
  * }}}
  */
 case class CreateDatabase(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 0ae099ecc2bddc719401c9f698427d0ea6807f3a..6085098a709e4c9d7bf549074e8364aa3730a3a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -95,49 +95,81 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
   }
 
+  private def appendTrailingSlash(path: String): String = {
+    if (!path.endsWith(File.separator)) path + File.separator else path
+  }
+
   test("the qualified path of a database is stored in the catalog") {
     val catalog = sqlContext.sessionState.catalog
 
-    val path = System.getProperty("java.io.tmpdir")
-    // The generated temp path is not qualified.
-    assert(!path.startsWith("file:/"))
-    sql(s"CREATE DATABASE db1 LOCATION '$path'")
-    val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri
-    assert("file" === pathInCatalog.getScheme)
-    assert(path === pathInCatalog.getPath)
-
-    withSQLConf(
-      SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir"))) {
-      sql(s"CREATE DATABASE db2")
-      val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
+    withTempDir { tmpDir =>
+      val path = tmpDir.toString
+      // The generated temp path is not qualified.
+      assert(!path.startsWith("file:/"))
+      sql(s"CREATE DATABASE db1 LOCATION '$path'")
+      val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri
       assert("file" === pathInCatalog.getScheme)
-      assert(s"${sqlContext.conf.warehousePath}/db2.db" === pathInCatalog.getPath)
-    }
+      val expectedPath = if (path.endsWith(File.separator)) path.dropRight(1) else path
+      assert(expectedPath === pathInCatalog.getPath)
+
+      withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
+        sql(s"CREATE DATABASE db2")
+        val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
+        assert("file" === pathInCatalog.getScheme)
+        val expectedPath = appendTrailingSlash(sqlContext.conf.warehousePath) + "db2.db"
+        assert(expectedPath === pathInCatalog.getPath)
+      }
 
-    sql("DROP DATABASE db1")
-    sql("DROP DATABASE db2")
+      sql("DROP DATABASE db1")
+      sql("DROP DATABASE db2")
+    }
   }
 
   test("Create/Drop Database") {
-    withSQLConf(
-        SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
-      val catalog = sqlContext.sessionState.catalog
+    withTempDir { tmpDir =>
+      val path = tmpDir.toString
+      withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
+        val catalog = sqlContext.sessionState.catalog
+        val databaseNames = Seq("db1", "`database`")
+
+        databaseNames.foreach { dbName =>
+          try {
+            val dbNameWithoutBackTicks = cleanIdentifier(dbName)
 
-      val databaseNames = Seq("db1", "`database`")
+            sql(s"CREATE DATABASE $dbName")
+            val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
+            val expectedLocation =
+              "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
+            assert(db1 == CatalogDatabase(
+              dbNameWithoutBackTicks,
+              "",
+              expectedLocation,
+              Map.empty))
+            sql(s"DROP DATABASE $dbName CASCADE")
+            assert(!catalog.databaseExists(dbNameWithoutBackTicks))
+          } finally {
+            catalog.reset()
+          }
+        }
+      }
+    }
+  }
 
+  test("Create/Drop Database - location") {
+    val catalog = sqlContext.sessionState.catalog
+    val databaseNames = Seq("db1", "`database`")
+    withTempDir { tmpDir =>
+      val path = tmpDir.toString
+      val dbPath = "file:" + path
       databaseNames.foreach { dbName =>
         try {
           val dbNameWithoutBackTicks = cleanIdentifier(dbName)
-
-          sql(s"CREATE DATABASE $dbName")
+          sql(s"CREATE DATABASE $dbName Location '$path'")
           val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
-          val expectedLocation =
-            "file:" + System.getProperty("java.io.tmpdir") +
-              File.separator + s"$dbNameWithoutBackTicks.db"
           assert(db1 == CatalogDatabase(
             dbNameWithoutBackTicks,
             "",
-            expectedLocation,
+            if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath,
             Map.empty))
           sql(s"DROP DATABASE $dbName CASCADE")
           assert(!catalog.databaseExists(dbNameWithoutBackTicks))
@@ -149,77 +181,78 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   }
 
   test("Create Database - database already exists") {
-    withSQLConf(
-      SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
-      val catalog = sqlContext.sessionState.catalog
-      val databaseNames = Seq("db1", "`database`")
-
-      databaseNames.foreach { dbName =>
-        try {
-          val dbNameWithoutBackTicks = cleanIdentifier(dbName)
-          sql(s"CREATE DATABASE $dbName")
-          val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
-          val expectedLocation =
-            "file:" + System.getProperty("java.io.tmpdir") +
-              File.separator + s"$dbNameWithoutBackTicks.db"
-          assert(db1 == CatalogDatabase(
-            dbNameWithoutBackTicks,
-            "",
-            expectedLocation,
-            Map.empty))
-
-          val message = intercept[AnalysisException] {
+    withTempDir { tmpDir =>
+      val path = tmpDir.toString
+      withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
+        val catalog = sqlContext.sessionState.catalog
+        val databaseNames = Seq("db1", "`database`")
+
+        databaseNames.foreach { dbName =>
+          try {
+            val dbNameWithoutBackTicks = cleanIdentifier(dbName)
             sql(s"CREATE DATABASE $dbName")
-          }.getMessage
-          assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
-        } finally {
-          catalog.reset()
+            val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
+            val expectedLocation =
+              "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
+            assert(db1 == CatalogDatabase(
+              dbNameWithoutBackTicks,
+              "",
+              expectedLocation,
+              Map.empty))
+
+            val message = intercept[AnalysisException] {
+              sql(s"CREATE DATABASE $dbName")
+            }.getMessage
+            assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
+          } finally {
+            catalog.reset()
+          }
         }
       }
     }
   }
 
   test("Alter/Describe Database") {
-    withSQLConf(
-      SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
-      val catalog = sqlContext.sessionState.catalog
-      val databaseNames = Seq("db1", "`database`")
+    withTempDir { tmpDir =>
+      val path = tmpDir.toString
+      withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
+        val catalog = sqlContext.sessionState.catalog
+        val databaseNames = Seq("db1", "`database`")
 
-      databaseNames.foreach { dbName =>
-        try {
-          val dbNameWithoutBackTicks = cleanIdentifier(dbName)
-          val location =
-            "file:" + System.getProperty("java.io.tmpdir") +
-              File.separator + s"$dbNameWithoutBackTicks.db"
-
-          sql(s"CREATE DATABASE $dbName")
-
-          checkAnswer(
-            sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
-            Row("Database Name", dbNameWithoutBackTicks) ::
-              Row("Description", "") ::
-              Row("Location", location) ::
-              Row("Properties", "") :: Nil)
-
-          sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
-
-          checkAnswer(
-            sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
-            Row("Database Name", dbNameWithoutBackTicks) ::
-              Row("Description", "") ::
-              Row("Location", location) ::
-              Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)
-
-          sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
-
-          checkAnswer(
-            sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
-            Row("Database Name", dbNameWithoutBackTicks) ::
-              Row("Description", "") ::
-              Row("Location", location) ::
-              Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
-        } finally {
-          catalog.reset()
+        databaseNames.foreach { dbName =>
+          try {
+            val dbNameWithoutBackTicks = cleanIdentifier(dbName)
+            val location = "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
+
+            sql(s"CREATE DATABASE $dbName")
+
+            checkAnswer(
+              sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
+              Row("Database Name", dbNameWithoutBackTicks) ::
+                Row("Description", "") ::
+                Row("Location", location) ::
+                Row("Properties", "") :: Nil)
+
+            sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
+
+            checkAnswer(
+              sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
+              Row("Database Name", dbNameWithoutBackTicks) ::
+                Row("Description", "") ::
+                Row("Location", location) ::
+                Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)
+
+            sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
+
+            checkAnswer(
+              sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
+              Row("Database Name", dbNameWithoutBackTicks) ::
+                Row("Description", "") ::
+                Row("Location", location) ::
+                Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
+          } finally {
+            catalog.reset()
+          }
         }
       }
     }
@@ -251,7 +284,43 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
-  // TODO: test drop database in restrict mode
+  test("drop non-empty database in restrict mode") {
+    val catalog = sqlContext.sessionState.catalog
+    val dbName = "db1"
+    sql(s"CREATE DATABASE $dbName")
+
+    // create a table in database
+    val tableIdent1 = TableIdentifier("tab1", Some(dbName))
+    createTable(catalog, tableIdent1)
+
+    // drop a non-empty database in Restrict mode
+    val message = intercept[AnalysisException] {
+      sql(s"DROP DATABASE $dbName RESTRICT")
+    }.getMessage
+    assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
+
+    catalog.dropTable(tableIdent1, ignoreIfNotExists = false)
+
+    assert(catalog.listDatabases().contains(dbName))
+    sql(s"DROP DATABASE $dbName RESTRICT")
+    assert(!catalog.listDatabases().contains(dbName))
+  }
+
+  test("drop non-empty database in cascade mode") {
+    val catalog = sqlContext.sessionState.catalog
+    val dbName = "db1"
+    sql(s"CREATE DATABASE $dbName")
+
+    // create a table in database
+    val tableIdent1 = TableIdentifier("tab1", Some(dbName))
+    createTable(catalog, tableIdent1)
+
+    // drop a non-empty database in CASCADE mode
+    assert(catalog.listTables(dbName).contains(tableIdent1))
+    assert(catalog.listDatabases().contains(dbName))
+    sql(s"DROP DATABASE $dbName CASCADE")
+    assert(!catalog.listDatabases().contains(dbName))
+  }
 
   test("create table in default db") {
     val catalog = sqlContext.sessionState.catalog
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 373d1a1e0ebc1b367a7321ce8c8bd40cd0d6736c..d55ddb251d00d79409bff3cf1327b4777a183b3a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -20,21 +20,37 @@ package org.apache.spark.sql.hive.execution
 import java.io.File
 
 import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 
-class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+class HiveDDLSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
   import hiveContext.implicits._
 
+  override def afterEach(): Unit = {
+    try {
+      // drop all databases, tables and functions after each test
+      sqlContext.sessionState.catalog.reset()
+    } finally {
+      super.afterEach()
+    }
+  }
   // check if the directory for recording the data of the table exists.
-  private def tableDirectoryExists(tableIdentifier: TableIdentifier): Boolean = {
+  private def tableDirectoryExists(
+      tableIdentifier: TableIdentifier,
+      dbPath: Option[String] = None): Boolean = {
     val expectedTablePath =
-      hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)
+      if (dbPath.isEmpty) {
+        hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)
+      } else {
+        new Path(new Path(dbPath.get), tableIdentifier.table).toString
+      }
     val filesystemPath = new Path(expectedTablePath)
     val fs = filesystemPath.getFileSystem(hiveContext.sessionState.newHadoopConf())
     fs.exists(filesystemPath)
@@ -56,7 +72,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
-  test("drop managed tables") {
+  test("drop managed tables in default database") {
     withTempDir { tmpDir =>
       val tabName = "tab1"
       withTable(tabName) {
@@ -83,7 +99,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
-  test("drop external data source table") {
+  test("drop external data source table in default database") {
     withTempDir { tmpDir =>
       val tabName = "tab1"
       withTable(tabName) {
@@ -365,4 +381,126 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
           .exists(_.getString(0) == "# Detailed Table Information"))
     }
   }
+
+  private def createDatabaseWithLocation(tmpDir: File, dirExists: Boolean): Unit = {
+    val catalog = sqlContext.sessionState.catalog
+    val dbName = "db1"
+    val tabName = "tab1"
+    val fs = new Path(tmpDir.toString).getFileSystem(hiveContext.sessionState.newHadoopConf())
+    withTable(tabName) {
+      if (dirExists) {
+        assert(tmpDir.listFiles.isEmpty)
+      } else {
+        assert(!fs.exists(new Path(tmpDir.toString)))
+      }
+      sql(s"CREATE DATABASE $dbName Location '$tmpDir'")
+      val db1 = catalog.getDatabaseMetadata(dbName)
+      val dbPath = "file:" + tmpDir
+      assert(db1 == CatalogDatabase(
+        dbName,
+        "",
+        if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath,
+        Map.empty))
+      sql("USE db1")
+
+      sql(s"CREATE TABLE $tabName as SELECT 1")
+      assert(tableDirectoryExists(TableIdentifier(tabName), Option(tmpDir.toString)))
+
+      assert(tmpDir.listFiles.nonEmpty)
+      sql(s"DROP TABLE $tabName")
+
+      assert(tmpDir.listFiles.isEmpty)
+      sql(s"DROP DATABASE $dbName")
+      assert(!fs.exists(new Path(tmpDir.toString)))
+    }
+  }
+
+  test("create/drop database - location without pre-created directory") {
+     withTempPath { tmpDir =>
+       createDatabaseWithLocation(tmpDir, dirExists = false)
+    }
+  }
+
+  test("create/drop database - location with pre-created directory") {
+    withTempDir { tmpDir =>
+      createDatabaseWithLocation(tmpDir, dirExists = true)
+    }
+  }
+
+  private def appendTrailingSlash(path: String): String = {
+    if (!path.endsWith(File.separator)) path + File.separator else path
+  }
+
+  private def dropDatabase(cascade: Boolean, tableExists: Boolean): Unit = {
+    withTempPath { tmpDir =>
+      val path = tmpDir.toString
+      withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
+        val dbName = "db1"
+        val fs = new Path(path).getFileSystem(hiveContext.sessionState.newHadoopConf())
+        val dbPath = new Path(path)
+        // the database directory does not exist
+        assert(!fs.exists(dbPath))
+
+        sql(s"CREATE DATABASE $dbName")
+        val catalog = sqlContext.sessionState.catalog
+        val expectedDBLocation = "file:" + appendTrailingSlash(dbPath.toString) + s"$dbName.db"
+        val db1 = catalog.getDatabaseMetadata(dbName)
+        assert(db1 == CatalogDatabase(
+          dbName,
+          "",
+          expectedDBLocation,
+          Map.empty))
+        // the database directory was created
+        assert(fs.exists(dbPath) && fs.isDirectory(dbPath))
+        sql(s"USE $dbName")
+
+        val tabName = "tab1"
+        assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation)))
+        sql(s"CREATE TABLE $tabName as SELECT 1")
+        assert(tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation)))
+
+        if (!tableExists) {
+          sql(s"DROP TABLE $tabName")
+          assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation)))
+        }
+
+        val sqlDropDatabase = s"DROP DATABASE $dbName ${if (cascade) "CASCADE" else "RESTRICT"}"
+        if (tableExists && !cascade) {
+          val message = intercept[AnalysisException] {
+            sql(sqlDropDatabase)
+          }.getMessage
+          assert(message.contains(s"Database $dbName is not empty. One or more tables exist."))
+          // the database directory was not removed
+          assert(fs.exists(new Path(expectedDBLocation)))
+        } else {
+          sql(sqlDropDatabase)
+          // the database directory was removed and the inclusive table directories are also removed
+          assert(!fs.exists(new Path(expectedDBLocation)))
+        }
+      }
+    }
+  }
+
+  test("drop database containing tables - CASCADE") {
+    dropDatabase(cascade = true, tableExists = true)
+  }
+
+  test("drop an empty database - CASCADE") {
+    dropDatabase(cascade = true, tableExists = false)
+  }
+
+  test("drop database containing tables - RESTRICT") {
+    dropDatabase(cascade = false, tableExists = true)
+  }
+
+  test("drop an empty database - RESTRICT") {
+    dropDatabase(cascade = false, tableExists = false)
+  }
+
+  test("drop default database") {
+    val message = intercept[AnalysisException] {
+      sql("DROP DATABASE default")
+    }.getMessage
+    assert(message.contains("Can not drop default database"))
+  }
 }