diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 7f7dd51aa2650692fa483a11130b361da1a6ba0b..c4af284f73d16cb106dd6625926e398f4778977d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -470,14 +470,15 @@ object ScalaReflection extends ScalaReflection {
   private def serializerFor(
       inputObject: Expression,
       tpe: `Type`,
-      walkedTypePath: Seq[String]): Expression = ScalaReflectionLock.synchronized {
+      walkedTypePath: Seq[String],
+      seenTypeSet: Set[`Type`] = Set.empty): Expression = ScalaReflectionLock.synchronized {
 
     def toCatalystArray(input: Expression, elementType: `Type`): Expression = {
       dataTypeFor(elementType) match {
         case dt: ObjectType =>
           val clsName = getClassNameFromType(elementType)
           val newPath = s"""- array element class: "$clsName"""" +: walkedTypePath
-          MapObjects(serializerFor(_, elementType, newPath), input, dt)
+          MapObjects(serializerFor(_, elementType, newPath, seenTypeSet), input, dt)
 
          case dt @ (BooleanType | ByteType | ShortType | IntegerType | LongType |
                     FloatType | DoubleType) =>
@@ -511,7 +512,7 @@ object ScalaReflection extends ScalaReflection {
         val className = getClassNameFromType(optType)
         val newPath = s"""- option value class: "$className"""" +: walkedTypePath
         val unwrapped = UnwrapOption(dataTypeFor(optType), inputObject)
-        serializerFor(unwrapped, optType, newPath)
+        serializerFor(unwrapped, optType, newPath, seenTypeSet)
 
       // Since List[_] also belongs to localTypeOf[Product], we put this case before
       // "case t if definedByConstructorParams(t)" to make sure it will match to the
@@ -534,9 +535,9 @@ object ScalaReflection extends ScalaReflection {
         ExternalMapToCatalyst(
           inputObject,
           dataTypeFor(keyType),
-          serializerFor(_, keyType, keyPath),
+          serializerFor(_, keyType, keyPath, seenTypeSet),
           dataTypeFor(valueType),
-          serializerFor(_, valueType, valuePath),
+          serializerFor(_, valueType, valuePath, seenTypeSet),
           valueNullable = !valueType.typeSymbol.asClass.isPrimitive)
 
       case t if t <:< localTypeOf[String] =>
@@ -622,6 +623,11 @@ object ScalaReflection extends ScalaReflection {
         Invoke(obj, "serialize", udt, inputObject :: Nil)
 
       case t if definedByConstructorParams(t) =>
+        if (seenTypeSet.contains(t)) {
+          throw new UnsupportedOperationException(
+            s"cannot have circular references in class, but got the circular reference of class $t")
+        }
+
         val params = getConstructorParameters(t)
         val nonNullOutput = CreateNamedStruct(params.flatMap { case (fieldName, fieldType) =>
           if (javaKeywords.contains(fieldName)) {
@@ -634,7 +640,8 @@ object ScalaReflection extends ScalaReflection {
             returnNullable = !fieldType.typeSymbol.asClass.isPrimitive)
           val clsName = getClassNameFromType(fieldType)
           val newPath = s"""- field (class: "$clsName", name: "$fieldName")""" +: walkedTypePath
-          expressions.Literal(fieldName) :: serializerFor(fieldValue, fieldType, newPath) :: Nil
+          expressions.Literal(fieldName) ::
+          serializerFor(fieldValue, fieldType, newPath, seenTypeSet + t) :: Nil
         })
         val nullOutput = expressions.Literal.create(null, nonNullOutput.dataType)
         expressions.If(IsNull(inputObject), nullOutput, nonNullOutput)
@@ -643,7 +650,6 @@ object ScalaReflection extends ScalaReflection {
         throw new UnsupportedOperationException(
           s"No Encoder found for $tpe\n" + walkedTypePath.mkString("\n"))
     }
-
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index b37bf131e8dcee7911db08cadf0c7fce4861c39d..6417e7a8b60385e3aa56c8f5a8b91338c3327a8c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1136,6 +1136,24 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
     assert(spark.range(1).map { x => new java.sql.Timestamp(100000) }.head ==
       new java.sql.Timestamp(100000))
   }
+
+  test("SPARK-19896: cannot have circular references in in case class") {
+    val errMsg1 = intercept[UnsupportedOperationException] {
+      Seq(CircularReferenceClassA(null)).toDS
+    }
+    assert(errMsg1.getMessage.startsWith("cannot have circular references in class, but got the " +
+      "circular reference of class"))
+    val errMsg2 = intercept[UnsupportedOperationException] {
+      Seq(CircularReferenceClassC(null)).toDS
+    }
+    assert(errMsg2.getMessage.startsWith("cannot have circular references in class, but got the " +
+      "circular reference of class"))
+    val errMsg3 = intercept[UnsupportedOperationException] {
+      Seq(CircularReferenceClassD(null)).toDS
+    }
+    assert(errMsg3.getMessage.startsWith("cannot have circular references in class, but got the " +
+      "circular reference of class"))
+  }
 }
 
 case class WithImmutableMap(id: String, map_test: scala.collection.immutable.Map[Long, String])
@@ -1214,3 +1232,9 @@ object DatasetTransform {
 
 case class Route(src: String, dest: String, cost: Int)
 case class GroupedRoutes(src: String, dest: String, routes: Seq[Route])
+
+case class CircularReferenceClassA(cls: CircularReferenceClassB)
+case class CircularReferenceClassB(cls: CircularReferenceClassA)
+case class CircularReferenceClassC(ar: Array[CircularReferenceClassC])
+case class CircularReferenceClassD(map: Map[String, CircularReferenceClassE])
+case class CircularReferenceClassE(id: String, list: List[CircularReferenceClassD])