diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
index ef30ba0cdbf55d11587e6dbe7127285efa695f3a..b9542c717350501edbdf5ddd08b0f38fbc4ba305 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
@@ -128,6 +128,20 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
         }.toMap
         CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)(node.source)
 
+      // DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];
+      case Token("TOK_DROPDATABASE", Token(dbName, Nil) :: otherArgs) =>
+        // Example format:
+        //
+        //   TOK_DROPDATABASE
+        //   :- database_name
+        //   :- TOK_IFEXISTS
+        //   +- TOK_RESTRICT/TOK_CASCADE
+        val databaseName = unquoteString(dbName)
+        // The default is RESTRICT
+        val Seq(ifExists, _, cascade) = getClauses(Seq(
+          "TOK_IFEXISTS", "TOK_RESTRICT", "TOK_CASCADE"), otherArgs)
+        DropDatabase(databaseName, ifExists.isDefined, restrict = cascade.isEmpty)(node.source)
+
       // CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
       // [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
       case Token("TOK_CREATEFUNCTION", args) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 07c89afafb6b6b36f9ba3860b82110492c8f0685..373b557683f15225e6764fa246341954d74b777e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -52,6 +52,24 @@ case class CreateDatabase(
     props: Map[String, String])(sql: String)
   extends NativeDDLCommand(sql) with Logging
 
+/**
+ * Drop Database: Removes a database from the system.
+ *
+ * 'ifExists':
+ * - true, if database_name does't exist, no action
+ * - false (default), if database_name does't exist, a warning message will be issued
+ * 'restric':
+ * - true (default), the database cannot be dropped if it is not empty. The inclusive
+ * tables must be dropped at first.
+ * - false, it is in the Cascade mode. The dependent objects are automatically dropped
+ * before dropping database.
+ */
+case class DropDatabase(
+    databaseName: String,
+    ifExists: Boolean,
+    restrict: Boolean)(sql: String)
+  extends NativeDDLCommand(sql) with Logging
+
 case class CreateFunction(
     functionName: String,
     alias: String,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 6f1eea273fafa5de3812f804938cac2f10214ec0..a33175aa60b5b5552e705568a51f8fe9ef4fd93c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -44,6 +44,63 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed, expected)
   }
 
+  test("drop database") {
+    val sql1 = "DROP DATABASE IF EXISTS database_name RESTRICT"
+    val sql2 = "DROP DATABASE IF EXISTS database_name CASCADE"
+    val sql3 = "DROP SCHEMA IF EXISTS database_name RESTRICT"
+    val sql4 = "DROP SCHEMA IF EXISTS database_name CASCADE"
+    // The default is restrict=true
+    val sql5 = "DROP DATABASE IF EXISTS database_name"
+    // The default is ifExists=false
+    val sql6 = "DROP DATABASE database_name"
+    val sql7 = "DROP DATABASE database_name CASCADE"
+
+    val parsed1 = parser.parsePlan(sql1)
+    val parsed2 = parser.parsePlan(sql2)
+    val parsed3 = parser.parsePlan(sql3)
+    val parsed4 = parser.parsePlan(sql4)
+    val parsed5 = parser.parsePlan(sql5)
+    val parsed6 = parser.parsePlan(sql6)
+    val parsed7 = parser.parsePlan(sql7)
+
+    val expected1 = DropDatabase(
+      "database_name",
+      ifExists = true,
+      restrict = true)(sql1)
+    val expected2 = DropDatabase(
+      "database_name",
+      ifExists = true,
+      restrict = false)(sql2)
+    val expected3 = DropDatabase(
+      "database_name",
+      ifExists = true,
+      restrict = true)(sql3)
+    val expected4 = DropDatabase(
+      "database_name",
+      ifExists = true,
+      restrict = false)(sql4)
+    val expected5 = DropDatabase(
+      "database_name",
+      ifExists = true,
+      restrict = true)(sql5)
+    val expected6 = DropDatabase(
+      "database_name",
+      ifExists = false,
+      restrict = true)(sql6)
+    val expected7 = DropDatabase(
+      "database_name",
+      ifExists = false,
+      restrict = false)(sql7)
+
+    comparePlans(parsed1, expected1)
+    comparePlans(parsed2, expected2)
+    comparePlans(parsed3, expected3)
+    comparePlans(parsed4, expected4)
+    comparePlans(parsed5, expected5)
+    comparePlans(parsed6, expected6)
+    comparePlans(parsed7, expected7)
+  }
+
   test("create function") {
     val sql1 =
       """
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index e802d3dfc3866e5d2051ea06adac36d2df8230f2..6586b90377eb74bdfeb5faad97be361889f71136 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -102,7 +102,6 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
 
     "TOK_DESCDATABASE",
 
-    "TOK_DROPDATABASE",
     "TOK_DROPFUNCTION",
     "TOK_DROPINDEX",
     "TOK_DROPMACRO",