diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 2642d9395ba88e6b4536e4b1552e8bdfb04c1955..26871259c6b6e27a3e7712e1500cdc85bbb76201 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -115,7 +115,10 @@ object DataType {
     name match {
       case "decimal" => DecimalType.USER_DEFAULT
       case FIXED_DECIMAL(precision, scale) => DecimalType(precision.toInt, scale.toInt)
-      case other => nonDecimalNameToType(other)
+      case other => nonDecimalNameToType.getOrElse(
+        other,
+        throw new IllegalArgumentException(
+          s"Failed to convert the JSON string '$name' to a data type."))
     }
   }
 
@@ -164,6 +167,10 @@ object DataType {
     ("sqlType", v: JValue),
     ("type", JString("udt"))) =>
         new PythonUserDefinedType(parseDataType(v), pyClass, serialized)
+
+    case other =>
+      throw new IllegalArgumentException(
+        s"Failed to convert the JSON string '${compact(render(other))}' to a data type.")
   }
 
   private def parseStructField(json: JValue): StructField = json match {
@@ -179,6 +186,9 @@ object DataType {
     ("nullable", JBool(nullable)),
     ("type", dataType: JValue)) =>
       StructField(name, parseDataType(dataType), nullable)
+    case other =>
+      throw new IllegalArgumentException(
+        s"Failed to convert the JSON string '${compact(render(other))}' to a field.")
   }
 
   protected[types] def buildFormattedString(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index 05cb999af6a50d07234a8815052e4b3abfb02c9e..f078ef013387bf32bbae5d0ce1adda13b57e7faf 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.types
 
+import com.fasterxml.jackson.core.JsonParseException
+
 import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 
@@ -246,6 +248,32 @@ class DataTypeSuite extends SparkFunSuite {
   checkDataTypeFromJson(structType)
   checkDataTypeFromDDL(structType)
 
+  test("fromJson throws an exception when given type string is invalid") {
+    var message = intercept[IllegalArgumentException] {
+      DataType.fromJson(""""abcd"""")
+    }.getMessage
+    assert(message.contains(
+      "Failed to convert the JSON string 'abcd' to a data type."))
+
+    message = intercept[IllegalArgumentException] {
+      DataType.fromJson("""{"abcd":"a"}""")
+    }.getMessage
+    assert(message.contains(
+      """Failed to convert the JSON string '{"abcd":"a"}' to a data type"""))
+
+    message = intercept[IllegalArgumentException] {
+      DataType.fromJson("""{"fields": [{"a":123}], "type": "struct"}""")
+    }.getMessage
+    assert(message.contains(
+      """Failed to convert the JSON string '{"a":123}' to a field."""))
+
+    // Malformed JSON string
+    message = intercept[JsonParseException] {
+      DataType.fromJson("abcd")
+    }.getMessage
+    assert(message.contains("Unrecognized token 'abcd'"))
+  }
+
   def checkDefaultSize(dataType: DataType, expectedDefaultSize: Int): Unit = {
     test(s"Check the default size of $dataType") {
       assert(dataType.defaultSize === expectedDefaultSize)