diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
index ef72d97eae69d86e1edf12b8a952730df1e16d98..519a920279c9776866ff5bb994f1fc7a0d4870b8 100644
--- a/extras/kinesis-asl/pom.xml
+++ b/extras/kinesis-asl/pom.xml
@@ -64,6 +64,12 @@
       <artifactId>aws-java-sdk</artifactId>
       <version>${aws.java.sdk.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>amazon-kinesis-producer</artifactId>
+      <version>${aws.kinesis.producer.version}</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index 000897a4e7290e1acdc64d849145455c15893938..691c1790b207fd6e97b4955893790700d3881381 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -23,6 +23,7 @@ import scala.util.control.NonFatal
 
 import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
 import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
 import com.amazonaws.services.kinesis.model._
 
 import org.apache.spark._
@@ -210,7 +211,10 @@ class KinesisSequenceRangeIterator(
       s"getting records using shard iterator") {
         client.getRecords(getRecordsRequest)
       }
-    (getRecordsResult.getRecords.iterator().asScala, getRecordsResult.getNextShardIterator)
+    // De-aggregate records, if KPL was used in producing the records. The KCL automatically
+    // handles de-aggregation during regular operation. This code path is used during recovery
+    val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords)
+    (recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator)
   }
 
   /**
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 50993f157cd952f626c37ab1f4ec00e5b3a44873..97dbb918573a3d8c9c5a102ef5cbf7ae33d43577 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -216,7 +216,6 @@ private[kinesis] class KinesisReceiver[T](
       val metadata = SequenceNumberRange(streamName, shardId,
         records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
       blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
-
     }
   }
 
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index e381ffa0cbef44deeb0b5c1c38b6a73afe6cebcd..b5b76cb92d866ff942b610f5f0c56e055ca532cc 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -80,7 +80,7 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
            *     more than once.
            */
           logError(s"Exception:  WorkerId $workerId encountered and exception while storing " +
-              " or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
+              s" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
 
           /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
           throw e
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index 9f9e146a08d46983c54e8feef8337a403f902780..52c61dfb1c02398620198e7c06f2ec70c9aed252 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -22,7 +22,8 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
 import org.apache.spark.{SparkConf, SparkContext, SparkException}
 
-class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll {
+abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
+  extends KinesisFunSuite with BeforeAndAfterAll {
 
   private val testData = 1 to 8
 
@@ -37,13 +38,12 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
   private var sc: SparkContext = null
   private var blockManager: BlockManager = null
 
-
   override def beforeAll(): Unit = {
     runIfTestsEnabled("Prepare KinesisTestUtils") {
       testUtils = new KinesisTestUtils()
       testUtils.createStream()
 
-      shardIdToDataAndSeqNumbers = testUtils.pushData(testData)
+      shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData)
       require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards")
 
       shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
@@ -247,3 +247,9 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
     Array.tabulate(num) { i => new StreamBlockId(0, i) }
   }
 }
+
+class WithAggregationKinesisBackedBlockRDDSuite
+  extends KinesisBackedBlockRDDTests(aggregateTestData = true)
+
+class WithoutAggregationKinesisBackedBlockRDDSuite
+  extends KinesisBackedBlockRDDTests(aggregateTestData = false)
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index ba84e557dfcc26fbe8c18afe7ebd81628597bb0b..dee30444d8cc670a74f4178f5600f61974530c86 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
 import org.apache.spark.util.Utils
 import org.apache.spark.{SparkConf, SparkContext}
 
-class KinesisStreamSuite extends KinesisFunSuite
+abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite
   with Eventually with BeforeAndAfter with BeforeAndAfterAll {
 
   // This is the name that KCL will use to save metadata to DynamoDB
@@ -182,13 +182,13 @@ class KinesisStreamSuite extends KinesisFunSuite
     val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
     stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
       collected ++= rdd.collect()
-      logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
+      logInfo("Collected = " + collected.mkString(", "))
     }
     ssc.start()
 
     val testData = 1 to 10
     eventually(timeout(120 seconds), interval(10 second)) {
-      testUtils.pushData(testData)
+      testUtils.pushData(testData, aggregateTestData)
       assert(collected === testData.toSet, "\nData received does not match data sent")
     }
     ssc.stop(stopSparkContext = false)
@@ -207,13 +207,13 @@ class KinesisStreamSuite extends KinesisFunSuite
     val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
     stream.foreachRDD { rdd =>
       collected ++= rdd.collect()
-      logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
+      logInfo("Collected = " + collected.mkString(", "))
     }
     ssc.start()
 
     val testData = 1 to 10
     eventually(timeout(120 seconds), interval(10 second)) {
-      testUtils.pushData(testData)
+      testUtils.pushData(testData, aggregateTestData)
       val modData = testData.map(_ + 5)
       assert(collected === modData.toSet, "\nData received does not match data sent")
     }
@@ -254,7 +254,7 @@ class KinesisStreamSuite extends KinesisFunSuite
     // If this times out because numBatchesWithData is empty, then its likely that foreachRDD
     // function failed with exceptions, and nothing got added to `collectedData`
     eventually(timeout(2 minutes), interval(1 seconds)) {
-      testUtils.pushData(1 to 5)
+      testUtils.pushData(1 to 5, aggregateTestData)
       assert(isCheckpointPresent && numBatchesWithData > 10)
     }
     ssc.stop(stopSparkContext = true)  // stop the SparkContext so that the blocks are not reused
@@ -285,5 +285,8 @@ class KinesisStreamSuite extends KinesisFunSuite
     }
     ssc.stop()
   }
-
 }
+
+class WithAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = true)
+
+class WithoutAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = false)
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
similarity index 80%
rename from extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
rename to extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 634bf945210791b236594470236d732282e8bdb7..7487aa1c12639da08a2614f808d265ffe113e776 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -31,6 +31,8 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
 import com.amazonaws.services.dynamodbv2.document.DynamoDB
 import com.amazonaws.services.kinesis.AmazonKinesisClient
 import com.amazonaws.services.kinesis.model._
+import com.amazonaws.services.kinesis.producer.{KinesisProducer, KinesisProducerConfiguration, UserRecordResult}
+import com.google.common.util.concurrent.{FutureCallback, Futures}
 
 import org.apache.spark.Logging
 
@@ -64,6 +66,16 @@ private[kinesis] class KinesisTestUtils extends Logging {
     new DynamoDB(dynamoDBClient)
   }
 
+  private lazy val kinesisProducer: KinesisProducer = {
+    val conf = new KinesisProducerConfiguration()
+      .setRecordMaxBufferedTime(1000)
+      .setMaxConnections(1)
+      .setRegion(regionName)
+      .setMetricsLevel("none")
+
+    new KinesisProducer(conf)
+  }
+
   def streamName: String = {
     require(streamCreated, "Stream not yet created, call createStream() to create one")
     _streamName
@@ -90,22 +102,41 @@ private[kinesis] class KinesisTestUtils extends Logging {
    * Push data to Kinesis stream and return a map of
    * shardId -> seq of (data, seq number) pushed to corresponding shard
    */
-  def pushData(testData: Seq[Int]): Map[String, Seq[(Int, String)]] = {
+  def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]] = {
     require(streamCreated, "Stream not yet created, call createStream() to create one")
     val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
 
     testData.foreach { num =>
       val str = num.toString
-      val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
-        .withData(ByteBuffer.wrap(str.getBytes()))
-        .withPartitionKey(str)
-
-      val putRecordResult = kinesisClient.putRecord(putRecordRequest)
-      val shardId = putRecordResult.getShardId
-      val seqNumber = putRecordResult.getSequenceNumber()
-      val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
-        new ArrayBuffer[(Int, String)]())
-      sentSeqNumbers += ((num, seqNumber))
+      val data = ByteBuffer.wrap(str.getBytes())
+      if (aggregate) {
+        val future = kinesisProducer.addUserRecord(streamName, str, data)
+        val kinesisCallBack = new FutureCallback[UserRecordResult]() {
+          override def onFailure(t: Throwable): Unit = {} // do nothing
+
+          override def onSuccess(result: UserRecordResult): Unit = {
+            val shardId = result.getShardId
+            val seqNumber = result.getSequenceNumber()
+            val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+              new ArrayBuffer[(Int, String)]())
+            sentSeqNumbers += ((num, seqNumber))
+          }
+        }
+
+        Futures.addCallback(future, kinesisCallBack)
+        kinesisProducer.flushSync() // make sure we send all data before returning the map
+      } else {
+        val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
+          .withData(data)
+          .withPartitionKey(str)
+
+        val putRecordResult = kinesisClient.putRecord(putRecordRequest)
+        val shardId = putRecordResult.getShardId
+        val seqNumber = putRecordResult.getSequenceNumber()
+        val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+          new ArrayBuffer[(Int, String)]())
+        sentSeqNumbers += ((num, seqNumber))
+      }
     }
 
     logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
@@ -116,7 +147,7 @@ private[kinesis] class KinesisTestUtils extends Logging {
    * Expose a Python friendly API.
    */
   def pushData(testData: java.util.List[Int]): Unit = {
-    pushData(testData.asScala)
+    pushData(testData.asScala, aggregate = false)
   }
 
   def deleteStream(): Unit = {
diff --git a/pom.xml b/pom.xml
index 4ed1c0c82dee65742b2d088f5153c5918d37fdda..fd8c773513881c6f971d4a298f07aa2c1d9f1ea3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -154,6 +154,8 @@
     <jets3t.version>0.7.1</jets3t.version>
     <aws.java.sdk.version>1.9.40</aws.java.sdk.version>
     <aws.kinesis.client.version>1.4.0</aws.kinesis.client.version>
+    <!-- the producer is used in tests -->
+    <aws.kinesis.producer.version>0.10.1</aws.kinesis.producer.version>
     <!--  org.apache.httpcomponents/httpclient-->
     <commons.httpclient.version>4.3.2</commons.httpclient.version>
     <!--  commons-httpclient/commons-httpclient-->