diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index dd8e46da4555ad973c29e8872ae324ac7047664f..a5cf7196b21e39627735892d09a79d8d49c20c50 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -327,7 +327,7 @@ class SessionCatalog(
   def loadPartition(
       name: TableIdentifier,
       loadPath: String,
-      partition: TablePartitionSpec,
+      spec: TablePartitionSpec,
       isOverwrite: Boolean,
       holdDDLTime: Boolean,
       inheritTableSpecs: Boolean): Unit = {
@@ -335,8 +335,9 @@ class SessionCatalog(
     val table = formatTableName(name.table)
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Some(db)))
+    requireNonEmptyValueInPartitionSpec(Seq(spec))
     externalCatalog.loadPartition(
-      db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs)
+      db, table, loadPath, spec, isOverwrite, holdDDLTime, inheritTableSpecs)
   }
 
   def defaultTablePath(tableIdent: TableIdentifier): String = {
@@ -676,6 +677,7 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+    requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
     externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
   }
 
@@ -694,6 +696,7 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
+    requireNonEmptyValueInPartitionSpec(specs)
     externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
   }
 
@@ -714,6 +717,8 @@ class SessionCatalog(
     requireTableExists(TableIdentifier(table, Option(db)))
     requireExactMatchedPartitionSpec(specs, tableMetadata)
     requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
+    requireNonEmptyValueInPartitionSpec(specs)
+    requireNonEmptyValueInPartitionSpec(newSpecs)
     externalCatalog.renamePartitions(db, table, specs, newSpecs)
   }
 
@@ -732,6 +737,7 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+    requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
     externalCatalog.alterPartitions(db, table, parts)
   }
 
@@ -745,6 +751,7 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+    requireNonEmptyValueInPartitionSpec(Seq(spec))
     externalCatalog.getPartition(db, table, spec)
   }
 
@@ -764,6 +771,7 @@ class SessionCatalog(
     requireTableExists(TableIdentifier(table, Option(db)))
     partialSpec.foreach { spec =>
       requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+      requireNonEmptyValueInPartitionSpec(Seq(spec))
     }
     externalCatalog.listPartitionNames(db, table, partialSpec)
   }
@@ -784,6 +792,7 @@ class SessionCatalog(
     requireTableExists(TableIdentifier(table, Option(db)))
     partialSpec.foreach { spec =>
       requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+      requireNonEmptyValueInPartitionSpec(Seq(spec))
     }
     externalCatalog.listPartitions(db, table, partialSpec)
   }
@@ -802,6 +811,19 @@ class SessionCatalog(
     externalCatalog.listPartitionsByFilter(db, table, predicates)
   }
 
+  /**
+   * Verify if the input partition spec has any empty value.
+   */
+  private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
+    specs.foreach { s =>
+      if (s.values.exists(_.isEmpty)) {
+        val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
+        throw new AnalysisException(
+          s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
+      }
+    }
+  }
+
   /**
    * Verify if the input partition spec exactly matches the existing defined partition spec
    * The columns must be the same but the orders could be different.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 9d20602ef81cd9c61ad6c89be7a51923ec2d58cd..59b52651a9fbd44434128734a68edbf5b29a60df 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -848,6 +848,8 @@ abstract class CatalogTestUtils {
     CatalogTablePartition(Map("a" -> "5", "b" -> "6", "c" -> "7"), storageFormat)
   lazy val partWithUnknownColumns =
     CatalogTablePartition(Map("a" -> "5", "unknown" -> "6"), storageFormat)
+  lazy val partWithEmptyValue =
+    CatalogTablePartition(Map("a" -> "3", "b" -> ""), storageFormat)
   lazy val funcClass = "org.apache.spark.myFunc"
 
   /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 5cc772d8e9a1eb67b1a5e5593acacbfc4903f6fe..41ec40512cb5786b9b12b5fb53f22fd5adcc2523 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -608,6 +608,13 @@ class SessionCatalogSuite extends SparkFunSuite {
     }
     assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
       "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.createPartitions(
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(partWithEmptyValue, part1), ignoreIfExists = true)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("drop partitions") {
@@ -705,6 +712,16 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(e.getMessage.contains(
       "Partition spec is invalid. The spec (a, unknown) must be contained within " +
         "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.dropPartitions(
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(partWithEmptyValue.spec, part1.spec),
+        ignoreIfNotExists = false,
+        purge = false,
+        retainData = false)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("get partition") {
@@ -750,6 +767,11 @@ class SessionCatalogSuite extends SparkFunSuite {
     }
     assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
       "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithEmptyValue.spec)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("rename partitions") {
@@ -817,6 +839,13 @@ class SessionCatalogSuite extends SparkFunSuite {
     }
     assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
       "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.renamePartitions(
+        TableIdentifier("tbl1", Some("db2")),
+        Seq(part1.spec), Seq(partWithEmptyValue.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("alter partitions") {
@@ -876,6 +905,11 @@ class SessionCatalogSuite extends SparkFunSuite {
     }
     assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
       "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithEmptyValue))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("list partition names") {
@@ -897,10 +931,24 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("list partition names with invalid partial partition spec") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[AnalysisException] {
+    var e = intercept[AnalysisException] {
+      catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+        Some(partWithMoreColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
+      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+        Some(partWithUnknownColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
+      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
       catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
-        Some(Map("unknown" -> "unknown")))
+        Some(partWithEmptyValue.spec))
     }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("list partitions") {
@@ -920,10 +968,22 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("list partitions with invalid partial partition spec") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[AnalysisException] {
-      catalog.listPartitions(
-        TableIdentifier("tbl2", Some("db2")), Some(Map("unknown" -> "unknown")))
+    var e = intercept[AnalysisException] {
+      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithMoreColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
+      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")),
+        Some(partWithUnknownColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
+      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithEmptyValue.spec))
     }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("list partitions when database/table does not exist") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index e0f71560f33028a3a6f6749222075c95eed4faa8..a9ca1a42495137150674a99e96c47a1b3ea2cb8f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -464,6 +464,7 @@ private[hive] class HiveClientImpl(
     // do the check at first and collect all the matching partitions
     val matchingParts =
       specs.flatMap { s =>
+        assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
         // The provided spec here can be a partial spec, i.e. it will match all partitions
         // whose specs are supersets of this partial spec. E.g. If a table has partitions
         // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
@@ -538,6 +539,7 @@ private[hive] class HiveClientImpl(
           // -1 for result limit means "no limit/return all"
           client.getPartitionNames(table.database, table.identifier.table, -1)
         case Some(s) =>
+          assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
           client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1)
       }
     hivePartitionNames.asScala.sorted
@@ -561,7 +563,9 @@ private[hive] class HiveClientImpl(
     val hiveTable = toHiveTable(table)
     val parts = spec match {
       case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
-      case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
+      case Some(s) =>
+        assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
+        client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
     }
     HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
     parts
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 8b3421953025919c32d7fa2099b63ceeed194d16..3b9437da372c23cdae479e17d3135d5519459ffb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -245,6 +245,16 @@ class HiveDDLSuite
     }
   }
 
+  test("SPARK-19129: drop partition with a empty string will drop the whole table") {
+    val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name")
+    df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable")
+    val e = intercept[AnalysisException] {
+      spark.sql("alter table partitionedTable drop partition(partCol1='')")
+    }.getMessage
+    assert(e.contains("Partition spec is invalid. The spec ([partCol1=]) contains an empty " +
+      "partition column value"))
+  }
+
   test("add/drop partitions - external table") {
     val catalog = spark.sessionState.catalog
     withTempDir { tmpDir =>