diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 1fba552f7050196278a7f2841a3702beb634749d..0d26281fe1076bc14a07732899837dc3c1abd1fa 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -27,6 +27,7 @@ import scala.reflect.ClassTag
 
 import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
+import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutput => KryoUnsafeOutput}
 import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
 import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
 import org.apache.avro.generic.{GenericData, GenericRecord}
@@ -78,8 +79,15 @@ class KryoSerializer(conf: SparkConf)
     .filter(!_.isEmpty)
 
   private val avroSchemas = conf.getAvroSchema
+  // whether to use unsafe based IO for serialization
+  private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
 
-  def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
+  def newKryoOutput(): KryoOutput =
+    if (useUnsafe) {
+      new KryoUnsafeOutput(bufferSize, math.max(bufferSize, maxBufferSize))
+    } else {
+      new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
+    }
 
   def newKryo(): Kryo = {
     val instantiator = new EmptyScalaKryoInstantiator
@@ -172,7 +180,7 @@ class KryoSerializer(conf: SparkConf)
   }
 
   override def newInstance(): SerializerInstance = {
-    new KryoSerializerInstance(this)
+    new KryoSerializerInstance(this, useUnsafe)
   }
 
   private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = {
@@ -186,9 +194,12 @@ class KryoSerializer(conf: SparkConf)
 private[spark]
 class KryoSerializationStream(
     serInstance: KryoSerializerInstance,
-    outStream: OutputStream) extends SerializationStream {
+    outStream: OutputStream,
+    useUnsafe: Boolean) extends SerializationStream {
+
+  private[this] var output: KryoOutput =
+    if (useUnsafe) new KryoUnsafeOutput(outStream) else new KryoOutput(outStream)
 
-  private[this] var output: KryoOutput = new KryoOutput(outStream)
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
   override def writeObject[T: ClassTag](t: T): SerializationStream = {
@@ -219,9 +230,12 @@ class KryoSerializationStream(
 private[spark]
 class KryoDeserializationStream(
     serInstance: KryoSerializerInstance,
-    inStream: InputStream) extends DeserializationStream {
+    inStream: InputStream,
+    useUnsafe: Boolean) extends DeserializationStream {
+
+  private[this] var input: KryoInput =
+    if (useUnsafe) new KryoUnsafeInput(inStream) else new KryoInput(inStream)
 
-  private[this] var input: KryoInput = new KryoInput(inStream)
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
   override def readObject[T: ClassTag](): T = {
@@ -248,8 +262,8 @@ class KryoDeserializationStream(
   }
 }
 
-private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
-
+private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean)
+  extends SerializerInstance {
   /**
    * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do
    * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching
@@ -288,7 +302,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
 
   // Make these lazy vals to avoid creating a buffer unless we use them.
   private lazy val output = ks.newKryoOutput()
-  private lazy val input = new KryoInput()
+  private lazy val input = if (useUnsafe) new KryoUnsafeInput() else new KryoInput()
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
     output.clear()
@@ -329,11 +343,11 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
   }
 
   override def serializeStream(s: OutputStream): SerializationStream = {
-    new KryoSerializationStream(this, s)
+    new KryoSerializationStream(this, s, useUnsafe)
   }
 
   override def deserializeStream(s: InputStream): DeserializationStream = {
-    new KryoDeserializationStream(this, s)
+    new KryoDeserializationStream(this, s, useUnsafe)
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
new file mode 100644
index 0000000000000000000000000000000000000000..64be9662761408dd1c6875449b697a123293ec8e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.KryoTest._
+import org.apache.spark.util.Benchmark
+
+class KryoBenchmark extends SparkFunSuite {
+  val benchmark = new Benchmark("Benchmark Kryo Unsafe vs safe Serialization", 1024 * 1024 * 15, 10)
+
+  ignore(s"Benchmark Kryo Unsafe vs safe Serialization") {
+    Seq (true, false).foreach (runBenchmark)
+    benchmark.run()
+
+    // scalastyle:off
+    /*
+      Benchmark Kryo Unsafe vs safe Serialization: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+      ------------------------------------------------------------------------------------------------
+      basicTypes: Int with unsafe:true               151 /  170        104.2           9.6       1.0X
+      basicTypes: Long with unsafe:true              175 /  191         89.8          11.1       0.9X
+      basicTypes: Float with unsafe:true             177 /  184         88.8          11.3       0.9X
+      basicTypes: Double with unsafe:true            193 /  216         81.4          12.3       0.8X
+      Array: Int with unsafe:true                    513 /  587         30.7          32.6       0.3X
+      Array: Long with unsafe:true                  1211 / 1358         13.0          77.0       0.1X
+      Array: Float with unsafe:true                  890 /  964         17.7          56.6       0.2X
+      Array: Double with unsafe:true                1335 / 1428         11.8          84.9       0.1X
+      Map of string->Double  with unsafe:true        931 /  988         16.9          59.2       0.2X
+      basicTypes: Int with unsafe:false              197 /  217         79.9          12.5       0.8X
+      basicTypes: Long with unsafe:false             219 /  240         71.8          13.9       0.7X
+      basicTypes: Float with unsafe:false            208 /  217         75.7          13.2       0.7X
+      basicTypes: Double with unsafe:false           208 /  225         75.6          13.2       0.7X
+      Array: Int with unsafe:false                  2559 / 2681          6.1         162.7       0.1X
+      Array: Long with unsafe:false                 3425 / 3516          4.6         217.8       0.0X
+      Array: Float with unsafe:false                2025 / 2134          7.8         128.7       0.1X
+      Array: Double with unsafe:false               2241 / 2358          7.0         142.5       0.1X
+      Map of string->Double  with unsafe:false      1044 / 1085         15.1          66.4       0.1X
+    */
+    // scalastyle:on
+  }
+
+  private def runBenchmark(useUnsafe: Boolean): Unit = {
+    def check[T: ClassTag](t: T, ser: SerializerInstance): Int = {
+      if (ser.deserialize[T](ser.serialize(t)) === t) 1 else 0
+    }
+
+    // Benchmark Primitives
+    val basicTypeCount = 1000000
+    def basicTypes[T: ClassTag](name: String, gen: () => T): Unit = {
+      lazy val ser = createSerializer(useUnsafe)
+      val arrayOfBasicType: Array[T] = Array.fill(basicTypeCount)(gen())
+
+      benchmark.addCase(s"basicTypes: $name with unsafe:$useUnsafe") { _ =>
+        var sum = 0L
+        var i = 0
+        while (i < basicTypeCount) {
+          sum += check(arrayOfBasicType(i), ser)
+          i += 1
+        }
+        sum
+      }
+    }
+    basicTypes("Int", Random.nextInt)
+    basicTypes("Long", Random.nextLong)
+    basicTypes("Float", Random.nextFloat)
+    basicTypes("Double", Random.nextDouble)
+
+    // Benchmark Array of Primitives
+    val arrayCount = 10000
+    def basicTypeArray[T: ClassTag](name: String, gen: () => T): Unit = {
+      lazy val ser = createSerializer(useUnsafe)
+      val arrayOfArrays: Array[Array[T]] =
+        Array.fill(arrayCount)(Array.fill[T](Random.nextInt(arrayCount))(gen()))
+
+      benchmark.addCase(s"Array: $name with unsafe:$useUnsafe") { _ =>
+        var sum = 0L
+        var i = 0
+        while (i < arrayCount) {
+          val arr = arrayOfArrays(i)
+          sum += check(arr, ser)
+          i += 1
+        }
+        sum
+      }
+    }
+    basicTypeArray("Int", Random.nextInt)
+    basicTypeArray("Long", Random.nextLong)
+    basicTypeArray("Float", Random.nextFloat)
+    basicTypeArray("Double", Random.nextDouble)
+
+    // Benchmark Maps
+    val mapsCount = 1000
+    lazy val ser = createSerializer(useUnsafe)
+    val arrayOfMaps: Array[Map[String, Double]] = Array.fill(mapsCount) {
+      Array.fill(Random.nextInt(mapsCount)) {
+        (Random.nextString(mapsCount / 10), Random.nextDouble())
+      }.toMap
+    }
+
+    benchmark.addCase(s"Map of string->Double  with unsafe:$useUnsafe") { _ =>
+      var sum = 0L
+      var i = 0
+      while (i < mapsCount) {
+        val map = arrayOfMaps(i)
+        sum += check(map, ser)
+        i += 1
+      }
+      sum
+    }
+  }
+
+  def createSerializer(useUnsafe: Boolean): SerializerInstance = {
+    val conf = new SparkConf()
+    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
+    conf.set("spark.kryo.unsafe", useUnsafe.toString)
+
+    new KryoSerializer(conf).newInstance()
+  }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index bc6e98365daeff1e01ec3f8b08afdebe71ee982a..50408418110541577b1eb71af77de72f626822a3 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.util.Utils
 class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
   conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
+  conf.set("spark.kryo.unsafe", "false")
 
   test("SPARK-7392 configuration limits") {
     val kryoBufferProperty = "spark.kryoserializer.buffer"
diff --git a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..d63a45ae4a6a90419510aa318f629e4d66112a54
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+class UnsafeKryoSerializerSuite extends KryoSerializerSuite {
+
+  // This test suite should run all tests in KryoSerializerSuite with kryo unsafe.
+
+  override def beforeAll() {
+    conf.set("spark.kryo.unsafe", "true")
+    super.beforeAll()
+  }
+
+  override def afterAll() {
+    conf.set("spark.kryo.unsafe", "false")
+    super.afterAll()
+  }
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index a4a99d6fa46308a822b1c8bf5284bd5f93c2847a..b07867d99aa9d0341a524dde3bc715a5cd04d68e 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -799,6 +799,14 @@ Apart from these, the following properties are also available, and may be useful
     See the <a href="tuning.html#data-serialization">tuning guide</a> for more details.
   </td>
 </tr>
+<tr>
+  <td><code>spark.kryo.unsafe</code></td>
+  <td>false</td>
+  <td>
+    Whether to use unsafe based Kryo serializer. Can be
+    substantially faster by using Unsafe Based IO.
+  </td>
+</tr>
 <tr>
   <td><code>spark.kryoserializer.buffer.max</code></td>
   <td>64m</td>