diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 048a938507277d0f8bddadd3f00eac99dcec8be7..b977711e7d5adb8d27cfa8be88ff850fa0cbba04 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
 import javax.annotation.Nullable
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
 import com.esotericsoftware.kryo.{Kryo, KryoException}
@@ -38,7 +39,7 @@ import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
 import org.apache.spark.storage._
-import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
+import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
 import org.apache.spark.util.collection.CompactBuffer
 
 /**
@@ -131,6 +132,38 @@ class KryoSerializer(conf: SparkConf)
     // our code override the generic serializers in Chill for things like Seq
     new AllScalaRegistrar().apply(kryo)
 
+    // Register types missed by Chill.
+    // scalastyle:off
+    kryo.register(classOf[Array[Tuple1[Any]]])
+    kryo.register(classOf[Array[Tuple2[Any, Any]]])
+    kryo.register(classOf[Array[Tuple3[Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple4[Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple5[Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple6[Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple7[Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple8[Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple9[Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+    kryo.register(classOf[Array[Tuple22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+
+    // scalastyle:on
+
+    kryo.register(None.getClass)
+    kryo.register(Nil.getClass)
+    kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
+    kryo.register(classOf[ArrayBuffer[Any]])
+
     kryo.setClassLoader(classLoader)
     kryo
   }
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 8d1c9d17e977e9d544073d5682eb9bdb3457da8a..e428414cf6e852bace7f5cd5c673d0ff46a3420c 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -150,6 +150,36 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
       mutable.HashMap(1->"one", 2->"two", 3->"three")))
   }
 
+  test("Bug: SPARK-10251") {
+    val ser = new KryoSerializer(conf.clone.set("spark.kryo.registrationRequired", "true"))
+      .newInstance()
+    def check[T: ClassTag](t: T) {
+      assert(ser.deserialize[T](ser.serialize(t)) === t)
+    }
+    check((1, 3))
+    check(Array((1, 3)))
+    check(List((1, 3)))
+    check(List[Int]())
+    check(List[Int](1, 2, 3))
+    check(List[String]())
+    check(List[String]("x", "y", "z"))
+    check(None)
+    check(Some(1))
+    check(Some("hi"))
+    check(1 -> 1)
+    check(mutable.ArrayBuffer(1, 2, 3))
+    check(mutable.ArrayBuffer("1", "2", "3"))
+    check(mutable.Map())
+    check(mutable.Map(1 -> "one", 2 -> "two"))
+    check(mutable.Map("one" -> 1, "two" -> 2))
+    check(mutable.HashMap(1 -> "one", 2 -> "two"))
+    check(mutable.HashMap("one" -> 1, "two" -> 2))
+    check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4))))
+    check(List(
+      mutable.HashMap("one" -> 1, "two" -> 2),
+      mutable.HashMap(1->"one", 2->"two", 3->"three")))
+  }
+
   test("ranges") {
     val ser = new KryoSerializer(conf).newInstance()
     def check[T: ClassTag](t: T) {