diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 12af9e0c3261123ad21e2687af8634fc87579cc4..8008fcd639f148a35008c4d8dc2512889ff7c9cd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -331,7 +331,7 @@ class SessionCatalog(
   def loadPartition(
       name: TableIdentifier,
       loadPath: String,
-      partition: TablePartitionSpec,
+      spec: TablePartitionSpec,
       isOverwrite: Boolean,
       holdDDLTime: Boolean,
       inheritTableSpecs: Boolean,
@@ -340,8 +340,9 @@ class SessionCatalog(
     val table = formatTableName(name.table)
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Some(db)))
+    requireNonEmptyValueInPartitionSpec(Seq(spec))
     externalCatalog.loadPartition(
-      db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
+      db, table, loadPath, spec, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
   }
 
   def defaultTablePath(tableIdent: TableIdentifier): String = {
@@ -693,6 +694,7 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+    requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
     externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
   }
 
@@ -711,6 +713,7 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
+    requireNonEmptyValueInPartitionSpec(specs)
     externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
   }
 
@@ -731,6 +734,8 @@ class SessionCatalog(
     requireTableExists(TableIdentifier(table, Option(db)))
     requireExactMatchedPartitionSpec(specs, tableMetadata)
     requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
+    requireNonEmptyValueInPartitionSpec(specs)
+    requireNonEmptyValueInPartitionSpec(newSpecs)
     externalCatalog.renamePartitions(db, table, specs, newSpecs)
   }
 
@@ -749,6 +754,7 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+    requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
     externalCatalog.alterPartitions(db, table, parts)
   }
 
@@ -762,6 +768,7 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+    requireNonEmptyValueInPartitionSpec(Seq(spec))
     externalCatalog.getPartition(db, table, spec)
   }
 
@@ -781,6 +788,7 @@ class SessionCatalog(
     requireTableExists(TableIdentifier(table, Option(db)))
     partialSpec.foreach { spec =>
       requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+      requireNonEmptyValueInPartitionSpec(Seq(spec))
     }
     externalCatalog.listPartitionNames(db, table, partialSpec)
   }
@@ -801,6 +809,7 @@ class SessionCatalog(
     requireTableExists(TableIdentifier(table, Option(db)))
     partialSpec.foreach { spec =>
       requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+      requireNonEmptyValueInPartitionSpec(Seq(spec))
     }
     externalCatalog.listPartitions(db, table, partialSpec)
   }
@@ -819,6 +828,19 @@ class SessionCatalog(
     externalCatalog.listPartitionsByFilter(db, table, predicates)
   }
 
+  /**
+   * Verify if the input partition spec has any empty value.
+   */
+  private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
+    specs.foreach { s =>
+      if (s.values.exists(_.isEmpty)) {
+        val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
+        throw new AnalysisException(
+          s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
+      }
+    }
+  }
+
   /**
    * Verify if the input partition spec exactly matches the existing defined partition spec
    * The columns must be the same but the orders could be different.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 91f464b47aae2d16df98a7b2dadb52199c4b02f6..acf3bcfdaa955b2c37346e3880e5719fea2da57c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -848,6 +848,8 @@ abstract class CatalogTestUtils {
     CatalogTablePartition(Map("a" -> "5", "b" -> "6", "c" -> "7"), storageFormat)
   lazy val partWithUnknownColumns =
     CatalogTablePartition(Map("a" -> "5", "unknown" -> "6"), storageFormat)
+  lazy val partWithEmptyValue =
+    CatalogTablePartition(Map("a" -> "3", "b" -> ""), storageFormat)
   lazy val funcClass = "org.apache.spark.myFunc"
 
   /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index ae93dffbab8d227c630cfcd87c614da94a859e76..7a7de25acb07013241665c6fefc58a289276d873 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -625,6 +625,13 @@ class SessionCatalogSuite extends PlanTest {
     }
     assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
       "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.createPartitions(
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(partWithEmptyValue, part1), ignoreIfExists = true)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("drop partitions") {
@@ -722,6 +729,16 @@ class SessionCatalogSuite extends PlanTest {
     assert(e.getMessage.contains(
       "Partition spec is invalid. The spec (a, unknown) must be contained within " +
         "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.dropPartitions(
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(partWithEmptyValue.spec, part1.spec),
+        ignoreIfNotExists = false,
+        purge = false,
+        retainData = false)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("get partition") {
@@ -767,6 +784,11 @@ class SessionCatalogSuite extends PlanTest {
     }
     assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
       "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithEmptyValue.spec)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("rename partitions") {
@@ -834,6 +856,13 @@ class SessionCatalogSuite extends PlanTest {
     }
     assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
       "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.renamePartitions(
+        TableIdentifier("tbl1", Some("db2")),
+        Seq(part1.spec), Seq(partWithEmptyValue.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("alter partitions") {
@@ -893,6 +922,11 @@ class SessionCatalogSuite extends PlanTest {
     }
     assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
       "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithEmptyValue))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("list partition names") {
@@ -914,10 +948,24 @@ class SessionCatalogSuite extends PlanTest {
 
   test("list partition names with invalid partial partition spec") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[AnalysisException] {
+    var e = intercept[AnalysisException] {
+      catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+        Some(partWithMoreColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
+      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+        Some(partWithUnknownColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
+      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
       catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
-        Some(Map("unknown" -> "unknown")))
+        Some(partWithEmptyValue.spec))
     }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("list partitions") {
@@ -937,10 +985,22 @@ class SessionCatalogSuite extends PlanTest {
 
   test("list partitions with invalid partial partition spec") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[AnalysisException] {
-      catalog.listPartitions(
-        TableIdentifier("tbl2", Some("db2")), Some(Map("unknown" -> "unknown")))
+    var e = intercept[AnalysisException] {
+      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithMoreColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
+      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")),
+        Some(partWithUnknownColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
+      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithEmptyValue.spec))
     }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("list partitions when database/table does not exist") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 5c0e2f6ec49413b97ec85ffdf09251e08b4a1cb7..9a6144c5e3cc8193dc51408a255026f9087fe2da 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -471,6 +471,7 @@ private[hive] class HiveClientImpl(
     // do the check at first and collect all the matching partitions
     val matchingParts =
       specs.flatMap { s =>
+        assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
         // The provided spec here can be a partial spec, i.e. it will match all partitions
         // whose specs are supersets of this partial spec. E.g. If a table has partitions
         // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
@@ -545,6 +546,7 @@ private[hive] class HiveClientImpl(
           // -1 for result limit means "no limit/return all"
           client.getPartitionNames(table.database, table.identifier.table, -1)
         case Some(s) =>
+          assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
           client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1)
       }
     hivePartitionNames.asScala.sorted
@@ -568,7 +570,9 @@ private[hive] class HiveClientImpl(
     val hiveTable = toHiveTable(table)
     val parts = spec match {
       case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
-      case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
+      case Some(s) =>
+        assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
+        client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
     }
     HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
     parts
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index e3f166724968471c397d057a5383131d4581b2ee..ef62be39cd3495d4e33b865a0c07d2337cb05380 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -247,6 +247,16 @@ class HiveDDLSuite
     }
   }
 
+  test("SPARK-19129: drop partition with a empty string will drop the whole table") {
+    val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name")
+    df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable")
+    val e = intercept[AnalysisException] {
+      spark.sql("alter table partitionedTable drop partition(partCol1='')")
+    }.getMessage
+    assert(e.contains("Partition spec is invalid. The spec ([partCol1=]) contains an empty " +
+      "partition column value"))
+  }
+
   test("add/drop partitions - external table") {
     val catalog = spark.sessionState.catalog
     withTempDir { tmpDir =>