diff --git a/bin/run-example b/bin/run-example
index 942706d7331222190979434b00314be82a061c50..68a35702eddd39c4412f7aea99a2553541037fb5 100755
--- a/bin/run-example
+++ b/bin/run-example
@@ -29,7 +29,8 @@ if [ -n "$1" ]; then
 else
   echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2
   echo "  - set MASTER=XX to use a specific master" 1>&2
-  echo "  - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)" 1>&2
+  echo "  - can use abbreviated example class name relative to com.apache.spark.examples" 1>&2
+  echo "     (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)" 1>&2
   exit 1
 fi
 
diff --git a/bin/run-example2.cmd b/bin/run-example2.cmd
index eadedd7fa61ffa6d5d4dba8aa535f89ae54a64e7..b29bf90c64e90d199ad1ad2368c0e65f19b2ef23 100644
--- a/bin/run-example2.cmd
+++ b/bin/run-example2.cmd
@@ -32,7 +32,8 @@ rem Test that an argument was given
 if not "x%1"=="x" goto arg_given
   echo Usage: run-example ^<example-class^> [example-args]
   echo   - set MASTER=XX to use a specific master
-  echo   - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)
+  echo   - can use abbreviated example class name relative to com.apache.spark.examples
+  echo      (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)
   goto exit
 :arg_given
 
diff --git a/dev/audit-release/audit_release.py b/dev/audit-release/audit_release.py
index 230e900ecd4de9eaec95dee933d9408572db60f7..16ea1a71290dc352e12fc2d22ae7d9230e023a58 100755
--- a/dev/audit-release/audit_release.py
+++ b/dev/audit-release/audit_release.py
@@ -105,7 +105,7 @@ modules = [
     "spark-core", "spark-bagel", "spark-mllib", "spark-streaming", "spark-repl",
     "spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
     "spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq",
-    "spark-catalyst", "spark-sql", "spark-hive"
+    "spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl"
 ]
 modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules)
 
@@ -136,7 +136,7 @@ for module in modules:
 os.chdir(original_dir)
 
 # SBT application tests
-for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", "sbt_app_sql", "sbt_app_hive"]:
+for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", "sbt_app_sql", "sbt_app_hive", "sbt_app_kinesis"]:
     os.chdir(app)
     ret = run_cmd("sbt clean run", exit_on_failure=False)
     test(ret == 0, "sbt application (%s)" % app)
diff --git a/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala b/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala
index 77bbd167b199a722dbf75977c34fe81cc3fa4378..fc03fec9866a6287ffe3c4bca9b598556feb4749 100644
--- a/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala
+++ b/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala
@@ -50,5 +50,12 @@ object SimpleApp {
       println("Ganglia sink was loaded via spark-core")
       System.exit(-1)
     }
+
+    // Remove kinesis from default build due to ASL license issue
+    val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess
+    if (foundKinesis) {
+      println("Kinesis was loaded via spark-core")
+      System.exit(-1)
+    }
   }
 }
diff --git a/dev/audit-release/sbt_app_kinesis/build.sbt b/dev/audit-release/sbt_app_kinesis/build.sbt
new file mode 100644
index 0000000000000000000000000000000000000000..981bc7957b5ed725fee5ba89bf96f606f50d04bd
--- /dev/null
+++ b/dev/audit-release/sbt_app_kinesis/build.sbt
@@ -0,0 +1,28 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+name := "Kinesis Test"
+
+version := "1.0"
+
+scalaVersion := System.getenv.get("SCALA_VERSION")
+
+libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % System.getenv.get("SPARK_VERSION")
+
+resolvers ++= Seq(
+  "Spark Release Repository" at System.getenv.get("SPARK_RELEASE_REPOSITORY"),
+  "Spray Repository" at "http://repo.spray.cc/")
diff --git a/dev/audit-release/sbt_app_kinesis/src/main/scala/SparkApp.scala b/dev/audit-release/sbt_app_kinesis/src/main/scala/SparkApp.scala
new file mode 100644
index 0000000000000000000000000000000000000000..9f8506650147246a52741c578a0cd81cd4c7b9c7
--- /dev/null
+++ b/dev/audit-release/sbt_app_kinesis/src/main/scala/SparkApp.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main.scala
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+
+object SimpleApp {
+  def main(args: Array[String]) {
+    val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess
+    if (!foundKinesis) {
+      println("Kinesis not loaded via kinesis-asl")
+      System.exit(-1)
+    }
+  }
+}
diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh
index af46572e6602b0cb410c6faae9fe67c22f9e90d5..42473629d4f15aea4ebb8be26ba5601aea69fa7b 100755
--- a/dev/create-release/create-release.sh
+++ b/dev/create-release/create-release.sh
@@ -53,15 +53,15 @@ if [[ ! "$@" =~ --package-only ]]; then
     -Dusername=$GIT_USERNAME -Dpassword=$GIT_PASSWORD \
     -Dmaven.javadoc.skip=true \
     -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-    -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
     -Dtag=$GIT_TAG -DautoVersionSubmodules=true \
+    -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
     --batch-mode release:prepare
 
   mvn -DskipTests \
     -Darguments="-DskipTests=true -Dmaven.javadoc.skip=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \
     -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
     -Dmaven.javadoc.skip=true \
-    -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
+    -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
     release:perform
 
   cd ..
diff --git a/dev/run-tests b/dev/run-tests
index daa85bc750c0765dba6fc15f36ffc8c7c2d22c6c..d401c90f41d7bad597b702dcd249eb579e93a6fe 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -36,6 +36,9 @@ fi
 if [ -z "$SBT_MAVEN_PROFILES_ARGS" ]; then
   export SBT_MAVEN_PROFILES_ARGS="-Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0"
 fi
+
+export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl"
+
 echo "SBT_MAVEN_PROFILES_ARGS=\"$SBT_MAVEN_PROFILES_ARGS\""
 
 # Remove work directory
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index a2dc3a8961dfcff1da5e9c54a798c0eb3cddd634..1e045a3dd0ca935b66283b825404badb13cc78a2 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -4,7 +4,7 @@ title: Spark Streaming Custom Receivers
 ---
 
 Spark Streaming can receive streaming data from any arbitrary data source beyond
-the one's for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.).
+the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.).
 This requires the developer to implement a *receiver* that is customized for receiving data from
 the concerned data source. This guide walks through the process of implementing a custom receiver
 and using it in a Spark Streaming application.
@@ -174,7 +174,7 @@ val words = lines.flatMap(_.split(" "))
 ...
 {% endhighlight %}
 
-The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala).
+The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala).
 
 </div>
 <div data-lang="java" markdown="1">
diff --git a/docs/streaming-kinesis.md b/docs/streaming-kinesis.md
new file mode 100644
index 0000000000000000000000000000000000000000..801c905c88df88d664920359456cbe9e86932645
--- /dev/null
+++ b/docs/streaming-kinesis.md
@@ -0,0 +1,58 @@
+---
+layout: global
+title: Spark Streaming Kinesis Receiver
+---
+
+### Kinesis
+Build notes:
+<li>Spark supports a Kinesis Streaming Receiver which is not included in the default build due to licensing restrictions.</li>
+<li>_**Note that by embedding this library you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li>
+<li>The Spark Kinesis Streaming Receiver source code, examples, tests, and artifacts live in $SPARK_HOME/extras/kinesis-asl.</li>
+<li>To build with Kinesis, you must run the maven or sbt builds with -Pkinesis-asl`.</li>
+<li>Applications will need to link to the 'spark-streaming-kinesis-asl` artifact.</li>
+
+Kinesis examples notes:
+<li>To build the Kinesis examples, you must run the maven or sbt builds with -Pkinesis-asl`.</li>
+<li>These examples automatically determine the number of local threads and KinesisReceivers to spin up based on the number of shards for the stream.</li>
+<li>KinesisWordCountProducerASL will generate random data to put onto the Kinesis stream for testing.</li>
+<li>Checkpointing is disabled (no checkpoint dir is set).  The examples as written will not recover from a driver failure.</li>
+
+Deployment and runtime notes:
+<li>A single KinesisReceiver can process many shards of a stream.</li>
+<li>Each shard of a stream is processed by one or more KinesisReceiver's managed by the Kinesis Client Library (KCL) Worker.</li>
+<li>You never need more KinesisReceivers than the number of shards in your stream.</li>
+<li>You can horizontally scale the receiving by creating more KinesisReceiver/DStreams (up to the number of shards for a given stream)</li>
+<li>The Kinesis libraries must be present on all worker nodes, as they will need access to the Kinesis Client Library.</li>
+<li>This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence:<br/>
+    1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/>
+    2) Java System Properties - aws.accessKeyId and aws.secretKey<br/>
+    3) Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs<br/>
+    4) Instance profile credentials - delivered through the Amazon EC2 metadata service<br/>
+</li>
+<li>You need to setup a Kinesis stream with 1 or more shards per the following:<br/>
+ http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li>
+<li>Valid Kinesis endpoint urls can be found here:  Valid endpoint urls:  http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li>
+<li>When you first start up the KinesisReceiver, the Kinesis Client Library (KCL) needs ~30s to establish connectivity with the AWS Kinesis service,
+retrieve any checkpoint data, and negotiate with other KCL's reading from the same stream.</li>
+<li>Be careful when changing the app name.  Kinesis maintains a mapping table in DynamoDB based on this app name (http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization).  
+Changing the app name could lead to Kinesis errors as only 1 logical application can process a stream.  In order to start fresh, 
+it's always best to delete the DynamoDB table that matches your app name.  This DynamoDB table lives in us-east-1 regardless of the Kinesis endpoint URL.</li>
+
+Failure recovery notes:
+<li>The combination of Spark Streaming and Kinesis creates 3 different checkpoints as follows:<br/>
+  1) RDD data checkpoint (Spark Streaming) - frequency is configurable with DStream.checkpoint(Duration)<br/>
+  2) RDD metadata checkpoint (Spark Streaming) - frequency is every DStream batch<br/>
+  3) Kinesis checkpointing (Kinesis) - frequency is controlled by the developer calling ICheckpointer.checkpoint() directly<br/>
+</li>
+<li>Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling</li>
+<li>Upon startup, a KinesisReceiver will begin processing records with sequence numbers greater than the last checkpoint sequence number recorded per shard.</li>
+<li>If no checkpoint info exists, the worker will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON)
+or from the tip/latest (InitialPostitionInStream.LATEST).  This is configurable.</li>
+<li>When pulling from the stream tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts.</li>
+<li>InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running.</li>
+<li>In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data
+depending on the checkpoint frequency.</li>
+<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency.</li>
+<li>Record processing should be idempotent when possible.</li>
+<li>Failed or latent KinesisReceivers will be detected and automatically shutdown/load-balanced by the KCL.</li>
+<li>If possible, explicitly shutdown the worker if a failure occurs in order to trigger the final checkpoint.</li>
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 7b8b7933434c470defc1e870bdd4f58868bc8e70..9f331ed50d2a4c4655ed8384aa906b0569c2f1d1 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -9,7 +9,7 @@ title: Spark Streaming Programming Guide
 # Overview
 Spark Streaming is an extension of the core Spark API that allows enables high-throughput,
 fault-tolerant stream processing of live data streams. Data can be ingested from many sources
-like Kafka, Flume, Twitter, ZeroMQ or plain old TCP sockets and be processed using complex
+like Kafka, Flume, Twitter, ZeroMQ, Kinesis or plain old TCP sockets and be processed using complex
 algorithms expressed with high-level functions like `map`, `reduce`, `join` and `window`.
 Finally, processed data can be pushed out to filesystems, databases,
 and live dashboards. In fact, you can apply Spark's in-built
@@ -38,7 +38,7 @@ stream of results in batches.
 
 Spark Streaming provides a high-level abstraction called *discretized stream* or *DStream*,
 which represents a continuous stream of data. DStreams can be created either from input data
-stream from sources such as Kafka and Flume, or by applying high-level
+stream from sources such as Kafka, Flume, and Kinesis, or by applying high-level
 operations on other DStreams. Internally, a DStream is represented as a sequence of
 [RDDs](api/scala/index.html#org.apache.spark.rdd.RDD).
 
@@ -313,7 +313,7 @@ To write your own Spark Streaming program, you will have to add the following de
     artifactId = spark-streaming_{{site.SCALA_BINARY_VERSION}}
     version = {{site.SPARK_VERSION}}
 
-For ingesting data from sources like Kafka and Flume that are not present in the Spark
+For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark
 Streaming core
  API, you will have to add the corresponding
 artifact `spark-streaming-xyz_{{site.SCALA_BINARY_VERSION}}` to the dependencies. For example,
@@ -327,6 +327,7 @@ some of the common ones are as follows.
 <tr><td> Twitter </td><td> spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}} </td></tr>
 <tr><td> ZeroMQ </td><td> spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}} </td></tr>
 <tr><td> MQTT </td><td> spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}} </td></tr>
+<tr><td> Kinesis<br/>(built separately)</td><td> kinesis-asl_{{site.SCALA_BINARY_VERSION}} </td></tr>
 <tr><td> </td><td></td></tr>
 </table>
 
@@ -442,7 +443,7 @@ see the API documentations of the relevant functions in
 Scala and [JavaStreamingContext](api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext)
  for Java.
 
-Additional functionality for creating DStreams from sources such as Kafka, Flume, and Twitter
+Additional functionality for creating DStreams from sources such as Kafka, Flume, Kinesis, and Twitter
 can be imported by adding the right dependencies as explained in an
 [earlier](#linking) section. To take the
 case of Kafka, after adding the artifact `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` to the
@@ -467,6 +468,9 @@ For more details on these additional sources, see the corresponding [API documen
 Furthermore, you can also implement your own custom receiver for your sources. See the
 [Custom Receiver Guide](streaming-custom-receivers.html).
 
+### Kinesis
+[Kinesis](streaming-kinesis.html)
+
 ## Operations
 There are two kinds of DStream operations - _transformations_ and _output operations_. Similar to
 RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams
diff --git a/examples/pom.xml b/examples/pom.xml
index c4ed0f5a6a02bde4a9f6682ad91f749ebb2b23c8..8c4c128bb484d76ee25d13816c2bdf306c679f7a 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -34,6 +34,19 @@
   <name>Spark Project Examples</name>
   <url>http://spark.apache.org/</url>
 
+  <profiles>
+    <profile>
+      <id>kinesis-asl</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+  
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..a54b34235dfb4023452efd3af01e7c377a069e30
--- /dev/null
+++ b/extras/kinesis-asl/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~    http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent</artifactId>
+    <version>1.1.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <!-- Kinesis integration is not included by default due to ASL-licensed code. -->
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Kinesis Integration</name>
+
+  <properties>
+    <sbt.project.name>kinesis-asl</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>amazon-kinesis-client</artifactId>
+      <version>${aws.kinesis.client.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk</artifactId>
+      <version>${aws.java.sdk.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalacheck</groupId>
+      <artifactId>scalacheck_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymockclassextension</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.novocode</groupId>
+      <artifactId>junit-interface</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
new file mode 100644
index 0000000000000000000000000000000000000000..a8b907b241893c10a1208efa16b51a2df0a497ad
--- /dev/null
+++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.streaming;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kinesis.KinesisUtils;
+
+import scala.Tuple2;
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.google.common.collect.Lists;
+
+/**
+ * Java-friendly Kinesis Spark Streaming WordCount example
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details
+ * on the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard
+ *   for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the given
+ *   <stream-name> and <endpoint-url>. 
+ *
+ * Valid endpoint urls:  http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials 
+ *  in the following order of precedence: 
+ *         Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ *         Java System Properties - aws.accessKeyId and aws.secretKey
+ *         Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ *         Instance profile credentials - delivered through the Amazon EC2 metadata service
+ *
+ * Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>
+ *         <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
+ *         <endpoint-url> is the endpoint of the Kinesis service 
+ *           (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *      $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ *      $ export AWS_SECRET_KEY=<your-secret-key>
+ *      $ $SPARK_HOME/bin/run-example \
+ *            org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \
+ *            https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data 
+ *   onto the Kinesis stream. 
+ * Usage instructions for KinesisWordCountProducerASL are provided in the class definition.
+ */
+public final class JavaKinesisWordCountASL {
+    private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
+    private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
+
+    /* Make the constructor private to enforce singleton */
+    private JavaKinesisWordCountASL() {
+    }
+
+    public static void main(String[] args) {
+        /* Check that all required args were passed in. */
+        if (args.length < 2) {
+          System.err.println(
+              "|Usage: KinesisWordCount <stream-name> <endpoint-url>\n" +
+              "|    <stream-name> is the name of the Kinesis stream\n" +
+              "|    <endpoint-url> is the endpoint of the Kinesis service\n" +
+              "|                   (e.g. https://kinesis.us-east-1.amazonaws.com)\n");
+          System.exit(1);
+        }
+
+        StreamingExamples.setStreamingLogLevels();
+
+        /* Populate the appropriate variables from the given args */
+        String streamName = args[0];
+        String endpointUrl = args[1];
+        /* Set the batch interval to a fixed 2000 millis (2 seconds) */
+        Duration batchInterval = new Duration(2000);
+
+        /* Create a Kinesis client in order to determine the number of shards for the given stream */
+        AmazonKinesisClient kinesisClient = new AmazonKinesisClient(
+                new DefaultAWSCredentialsProviderChain());
+        kinesisClient.setEndpoint(endpointUrl);
+
+        /* Determine the number of shards from the stream */
+        int numShards = kinesisClient.describeStream(streamName)
+                .getStreamDescription().getShards().size();
+
+        /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard */ 
+        int numStreams = numShards;
+
+        /* Must add 1 more thread than the number of receivers or the output won't show properly from the driver */
+        int numSparkThreads = numStreams + 1;
+
+        /* Setup the Spark config. */
+        SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount").setMaster(
+                "local[" + numSparkThreads + "]");
+
+        /* Kinesis checkpoint interval.  Same as batchInterval for this example. */
+        Duration checkpointInterval = batchInterval;
+
+        /* Setup the StreamingContext */
+        JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);
+
+        /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
+        List<JavaDStream<byte[]>> streamsList = new ArrayList<JavaDStream<byte[]>>(numStreams);
+        for (int i = 0; i < numStreams; i++) {
+        	streamsList.add(
+                KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval, 
+                InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2())
+            );
+        }
+
+        /* Union all the streams if there is more than 1 stream */
+        JavaDStream<byte[]> unionStreams;
+        if (streamsList.size() > 1) {
+            unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
+        } else {
+            /* Otherwise, just use the 1 stream */
+            unionStreams = streamsList.get(0);
+        }
+
+        /*
+         * Split each line of the union'd DStreams into multiple words using flatMap to produce the collection.
+         * Convert lines of byte[] to multiple Strings by first converting to String, then splitting on WORD_SEPARATOR.
+         */
+        JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
+                @Override
+                public Iterable<String> call(byte[] line) {
+                    return Lists.newArrayList(WORD_SEPARATOR.split(new String(line)));
+                }
+            });
+
+        /* Map each word to a (word, 1) tuple, then reduce/aggregate by word. */
+        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+            new PairFunction<String, String, Integer>() {
+                @Override
+                public Tuple2<String, Integer> call(String s) {
+                    return new Tuple2<String, Integer>(s, 1);
+                }
+            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+                @Override
+                public Integer call(Integer i1, Integer i2) {
+                  return i1 + i2;
+                }
+            });
+
+        /* Print the first 10 wordCounts */
+        wordCounts.print();
+
+        /* Start the streaming context and await termination */
+        jssc.start();
+        jssc.awaitTermination();
+    }
+}
diff --git a/extras/kinesis-asl/src/main/resources/log4j.properties b/extras/kinesis-asl/src/main/resources/log4j.properties
new file mode 100644
index 0000000000000000000000000000000000000000..97348fb5b61235af4f0b3a0101ee0d24f8671bea
--- /dev/null
+++ b/extras/kinesis-asl/src/main/resources/log4j.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootCategory=WARN, console
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Console appender
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
\ No newline at end of file
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
new file mode 100644
index 0000000000000000000000000000000000000000..d03edf8b30a9fa7ac3e8964cd6b726c90bce0b88
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+import scala.util.Random
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisUtils
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details on
+ *   the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard 
+ *   for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the given 
+ *   <stream-name> and <endpoint-url>. 
+ *
+ * Valid endpoint urls:  http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ * 
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials
+ *   in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 metadata service
+ *
+ * Usage: KinesisWordCountASL <stream-name> <endpoint-url>
+ *   <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
+ *   <endpoint-url> is the endpoint of the Kinesis service
+ *     (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *    $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ *    $ export AWS_SECRET_KEY=<your-secret-key>
+ *    $ $SPARK_HOME/bin/run-example \
+ *        org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \
+ *        https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class below called KinesisWordCountProducerASL which puts
+ *   dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducerASL are provided in that class definition.
+ */
+object KinesisWordCountASL extends Logging {
+  def main(args: Array[String]) {
+    /* Check that all required args were passed in. */
+    if (args.length < 2) {
+      System.err.println(
+        """
+          |Usage: KinesisWordCount <stream-name> <endpoint-url>
+          |    <stream-name> is the name of the Kinesis stream
+          |    <endpoint-url> is the endpoint of the Kinesis service
+          |                   (e.g. https://kinesis.us-east-1.amazonaws.com)
+        """.stripMargin)
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    /* Populate the appropriate variables from the given args */
+    val Array(streamName, endpointUrl) = args
+
+    /* Determine the number of shards from the stream */
+    val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
+    kinesisClient.setEndpoint(endpointUrl)
+    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards()
+      .size()
+
+    /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */
+    val numStreams = numShards
+
+    /* 
+     *  numSparkThreads should be 1 more thread than the number of receivers.
+     *  This leaves one thread available for actually processing the data.
+     */
+    val numSparkThreads = numStreams + 1
+
+    /* Setup the and SparkConfig and StreamingContext */
+    /* Spark Streaming batch interval */
+    val batchInterval = Milliseconds(2000)    
+    val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
+      .setMaster(s"local[$numSparkThreads]")
+    val ssc = new StreamingContext(sparkConfig, batchInterval)
+
+    /* Kinesis checkpoint interval.  Same as batchInterval for this example. */
+    val kinesisCheckpointInterval = batchInterval
+
+    /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
+    val kinesisStreams = (0 until numStreams).map { i =>
+      KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval,
+          InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+    }
+
+    /* Union all the streams */
+    val unionStreams = ssc.union(kinesisStreams)
+
+    /* Convert each line of Array[Byte] to String, split into words, and count them */
+    val words = unionStreams.flatMap(byteArray => new String(byteArray)
+      .split(" "))
+
+    /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
+    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
+
+    /* Print the first 10 wordCounts */
+    wordCounts.print()
+
+    /* Start the streaming context and await termination */
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+
+/**
+ * Usage: KinesisWordCountProducerASL <stream-name> <kinesis-endpoint-url>
+ *     <recordsPerSec> <wordsPerRecord>
+ *   <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
+ *   <kinesis-endpoint-url> is the endpoint of the Kinesis service
+ *     (ie. https://kinesis.us-east-1.amazonaws.com)
+ *   <records-per-sec> is the rate of records per second to put onto the stream
+ *   <words-per-record> is the rate of records per second to put onto the stream
+ *
+ * Example:
+ *    $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ *    $ export AWS_SECRET_KEY=<your-secret-key>
+ *    $ $SPARK_HOME/bin/run-example \
+ *         org.apache.spark.examples.streaming.KinesisWordCountProducerASL mySparkStream \
+ *         https://kinesis.us-east-1.amazonaws.com 10 5
+ */
+object KinesisWordCountProducerASL {
+  def main(args: Array[String]) {
+    if (args.length < 4) {
+      System.err.println("Usage: KinesisWordCountProducerASL <stream-name> <endpoint-url>" +
+          " <records-per-sec> <words-per-record>")
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    /* Populate the appropriate variables from the given args */
+    val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args
+
+    /* Generate the records and return the totals */
+    val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt)
+
+    /* Print the array of (index, total) tuples */
+    println("Totals")
+    totals.foreach(total => println(total.toString()))
+  }
+
+  def generate(stream: String,
+      endpoint: String,
+      recordsPerSecond: Int,
+      wordsPerRecord: Int): Seq[(Int, Int)] = {
+
+    val MaxRandomInts = 10
+
+    /* Create the Kinesis client */
+    val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
+    kinesisClient.setEndpoint(endpoint)
+
+    println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
+      s" $recordsPerSecond records per second and $wordsPerRecord words per record");
+
+    val totals = new Array[Int](MaxRandomInts)
+    /* Put String records onto the stream per the given recordPerSec and wordsPerRecord */
+    for (i <- 1 to 5) {
+
+      /* Generate recordsPerSec records to put onto the stream */
+      val records = (1 to recordsPerSecond.toInt).map { recordNum =>
+        /* 
+         *  Randomly generate each wordsPerRec words between 0 (inclusive)
+         *  and MAX_RANDOM_INTS (exclusive) 
+         */
+        val data = (1 to wordsPerRecord.toInt).map(x => {
+          /* Generate the random int */
+          val randomInt = Random.nextInt(MaxRandomInts)
+
+          /* Keep track of the totals */
+          totals(randomInt) += 1
+
+          randomInt.toString()
+        }).mkString(" ")
+
+        /* Create a partitionKey based on recordNum */
+        val partitionKey = s"partitionKey-$recordNum"
+
+        /* Create a PutRecordRequest with an Array[Byte] version of the data */
+        val putRecordRequest = new PutRecordRequest().withStreamName(stream)
+            .withPartitionKey(partitionKey)
+            .withData(ByteBuffer.wrap(data.getBytes()));
+
+        /* Put the record onto the stream and capture the PutRecordResult */
+        val putRecordResult = kinesisClient.putRecord(putRecordRequest);
+      }
+
+      /* Sleep for a second */
+      Thread.sleep(1000)
+      println("Sent " + recordsPerSecond + " records")
+    }
+
+    /* Convert the totals to (index, total) tuple */
+    (0 to (MaxRandomInts - 1)).zip(totals)
+  }
+}
+
+/** 
+ *  Utility functions for Spark Streaming examples. 
+ *  This has been lifted from the examples/ project to remove the circular dependency.
+ */
+object StreamingExamples extends Logging {
+
+  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
+  def setStreamingLogLevels() {
+    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
+    if (!log4jInitialized) {
+      // We first log something to initialize Spark's default logging, then we override the
+      // logging level.
+      logInfo("Setting log level to [WARN] for streaming example." +
+        " To override add a custom log4j.properties to the classpath.")
+      Logger.getRootLogger.setLevel(Level.WARN)
+    }
+  }
+}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
new file mode 100644
index 0000000000000000000000000000000000000000..0b80b611cdce79842e3e151942db8eaa2800f8b9
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * This is a helper class for managing checkpoint clocks.
+ *
+ * @param checkpointInterval 
+ * @param currentClock.  Default to current SystemClock if none is passed in (mocking purposes)
+ */
+private[kinesis] class KinesisCheckpointState(
+    checkpointInterval: Duration, 
+    currentClock: Clock = new SystemClock())
+  extends Logging {
+  
+  /* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */
+  val checkpointClock = new ManualClock()
+  checkpointClock.setTime(currentClock.currentTime() + checkpointInterval.milliseconds)
+
+  /**
+   * Check if it's time to checkpoint based on the current time and the derived time 
+   *   for the next checkpoint
+   *
+   * @return true if it's time to checkpoint
+   */
+  def shouldCheckpoint(): Boolean = {
+    new SystemClock().currentTime() > checkpointClock.currentTime()
+  }
+
+  /**
+   * Advance the checkpoint clock by the checkpoint interval.
+   */
+  def advanceCheckpoint() = {
+    checkpointClock.addToTime(checkpointInterval.milliseconds)
+  }
+}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
new file mode 100644
index 0000000000000000000000000000000000000000..1bd1f324298e7307264c7136d2fbaea10acdf17e
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.net.InetAddress
+import java.util.UUID
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.receiver.Receiver
+
+import com.amazonaws.auth.AWSCredentialsProvider
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+
+/**
+ * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
+ * This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
+ * https://github.com/awslabs/amazon-kinesis-client
+ * This is a custom receiver used with StreamingContext.receiverStream(Receiver) 
+ *   as described here:
+ *     http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * Instances of this class will get shipped to the Spark Streaming Workers 
+ *   to run within a Spark Executor.
+ *
+ * @param appName  Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
+ *                 by the Kinesis Client Library.  If you change the App name or Stream name,
+ *                 the KCL will throw errors.  This usually requires deleting the backing  
+ *                 DynamoDB table with the same name this Kinesis application.
+ * @param streamName   Kinesis stream name
+ * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+ *                            See the Kinesis Spark Streaming documentation for more
+ *                            details on the different types of checkpoints.
+ * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
+ *                                 worker's initial starting position in the stream.
+ *                                 The values are either the beginning of the stream
+ *                                 per Kinesis' limit of 24 hours
+ *                                 (InitialPositionInStream.TRIM_HORIZON) or
+ *                                 the tip of the stream (InitialPositionInStream.LATEST).
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @return ReceiverInputDStream[Array[Byte]]   
+ */
+private[kinesis] class KinesisReceiver(
+    appName: String,
+    streamName: String,
+    endpointUrl: String,
+    checkpointInterval: Duration,
+    initialPositionInStream: InitialPositionInStream,
+    storageLevel: StorageLevel)
+  extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>
+
+  /*
+   * The following vars are built in the onStart() method which executes in the Spark Worker after
+   *   this code is serialized and shipped remotely.
+   */
+
+  /*
+   *  workerId should be based on the ip address of the actual Spark Worker where this code runs
+   *   (not the Driver's ip address.)
+   */
+  var workerId: String = null
+
+  /*
+   * This impl uses the DefaultAWSCredentialsProviderChain and searches for credentials 
+   *   in the following order of precedence:
+   * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+   * Java System Properties - aws.accessKeyId and aws.secretKey
+   * Credential profiles file at the default location (~/.aws/credentials) shared by all 
+   *   AWS SDKs and the AWS CLI
+   * Instance profile credentials delivered through the Amazon EC2 metadata service
+   */
+  var credentialsProvider: AWSCredentialsProvider = null
+
+  /* KCL config instance. */
+  var kinesisClientLibConfiguration: KinesisClientLibConfiguration = null
+
+  /*
+   *  RecordProcessorFactory creates impls of IRecordProcessor.
+   *  IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the 
+   *    IRecordProcessor.processRecords() method.
+   *  We're using our custom KinesisRecordProcessor in this case.
+   */
+  var recordProcessorFactory: IRecordProcessorFactory = null
+
+  /*
+   * Create a Kinesis Worker.
+   * This is the core client abstraction from the Kinesis Client Library (KCL).
+   * We pass the RecordProcessorFactory from above as well as the KCL config instance.
+   * A Kinesis Worker can process 1..* shards from the given stream - each with its 
+   *   own RecordProcessor.
+   */
+  var worker: Worker = null
+
+  /**
+   *  This is called when the KinesisReceiver starts and must be non-blocking.
+   *  The KCL creates and manages the receiving/processing thread pool through the Worker.run() 
+   *    method.
+   */
+  override def onStart() {
+    workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID()
+    credentialsProvider = new DefaultAWSCredentialsProviderChain()
+    kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName,
+      credentialsProvider, workerId).withKinesisEndpoint(endpointUrl)
+      .withInitialPositionInStream(initialPositionInStream).withTaskBackoffTimeMillis(500)
+    recordProcessorFactory = new IRecordProcessorFactory {
+      override def createProcessor: IRecordProcessor = new KinesisRecordProcessor(receiver,
+        workerId, new KinesisCheckpointState(checkpointInterval))
+    }
+    worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
+    worker.run()
+    logInfo(s"Started receiver with workerId $workerId")
+  }
+
+  /**
+   *  This is called when the KinesisReceiver stops.
+   *  The KCL worker.shutdown() method stops the receiving/processing threads.
+   *  The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
+   */
+  override def onStop() {
+    worker.shutdown()
+    logInfo(s"Shut down receiver with workerId $workerId")
+    workerId = null
+    credentialsProvider = null
+    kinesisClientLibConfiguration = null
+    recordProcessorFactory = null
+    worker = null
+  }
+}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
new file mode 100644
index 0000000000000000000000000000000000000000..8ecc2d90160b164a4ac1730c2fc21397230a7e62
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.List
+
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.util.Random
+
+import org.apache.spark.Logging
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+import com.amazonaws.services.kinesis.model.Record
+
+/**
+ * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
+ * This implementation operates on the Array[Byte] from the KinesisReceiver.
+ * The Kinesis Worker creates an instance of this KinesisRecordProcessor upon startup.
+ *
+ * @param receiver Kinesis receiver
+ * @param workerId for logging purposes
+ * @param checkpointState represents the checkpoint state including the next checkpoint time.
+ *   It's injected here for mocking purposes.
+ */
+private[kinesis] class KinesisRecordProcessor(
+    receiver: KinesisReceiver,
+    workerId: String,
+    checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging {
+
+  /* shardId to be populated during initialize() */
+  var shardId: String = _
+
+  /**
+   * The Kinesis Client Library calls this method during IRecordProcessor initialization.
+   *
+   * @param shardId assigned by the KCL to this particular RecordProcessor.
+   */
+  override def initialize(shardId: String) {
+    logInfo(s"Initialize:  Initializing workerId $workerId with shardId $shardId")
+    this.shardId = shardId
+  }
+
+  /**
+   * This method is called by the KCL when a batch of records is pulled from the Kinesis stream.
+   * This is the record-processing bridge between the KCL's IRecordProcessor.processRecords()
+   * and Spark Streaming's Receiver.store().
+   *
+   * @param batch list of records from the Kinesis stream shard
+   * @param checkpointer used to update Kinesis when this batch has been processed/stored 
+   *   in the DStream
+   */
+  override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
+    if (!receiver.isStopped()) {
+      try {
+        /*
+         * Note:  If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
+         * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
+         *   internally-configured Spark serializer (kryo, etc).
+         * This is not desirable, so we instead store a raw Array[Byte] and decouple
+         *   ourselves from Spark's internal serialization strategy.
+         */
+        batch.foreach(record => receiver.store(record.getData().array()))
+        
+        logDebug(s"Stored:  Worker $workerId stored ${batch.size} records for shardId $shardId")
+
+        /*
+         * Checkpoint the sequence number of the last record successfully processed/stored 
+         *   in the batch.
+         * In this implementation, we're checkpointing after the given checkpointIntervalMillis.
+         * Note that this logic requires that processRecords() be called AND that it's time to 
+         *   checkpoint.  I point this out because there is no background thread running the 
+         *   checkpointer.  Checkpointing is tested and trigger only when a new batch comes in.
+         * If the worker is shutdown cleanly, checkpoint will happen (see shutdown() below).
+         * However, if the worker dies unexpectedly, a checkpoint may not happen.
+         * This could lead to records being processed more than once.
+         */
+        if (checkpointState.shouldCheckpoint()) {
+          /* Perform the checkpoint */
+          KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+
+          /* Update the next checkpoint time */
+          checkpointState.advanceCheckpoint()
+
+          logDebug(s"Checkpoint:  WorkerId $workerId completed checkpoint of ${batch.size}" +
+              s" records for shardId $shardId")
+          logDebug(s"Checkpoint:  Next checkpoint is at " +
+              s" ${checkpointState.checkpointClock.currentTime()} for shardId $shardId")
+        }
+      } catch {
+        case e: Throwable => {
+          /*
+           *  If there is a failure within the batch, the batch will not be checkpointed.
+           *  This will potentially cause records since the last checkpoint to be processed
+           *     more than once.
+           */
+          logError(s"Exception:  WorkerId $workerId encountered and exception while storing " +
+              " or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
+
+          /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor.*/
+          throw e
+        }
+      }
+    } else {
+      /* RecordProcessor has been stopped. */
+      logInfo(s"Stopped:  The Spark KinesisReceiver has stopped for workerId $workerId" + 
+          s" and shardId $shardId.  No more records will be processed.")
+    }
+  }
+
+  /**
+   * Kinesis Client Library is shutting down this Worker for 1 of 2 reasons:
+   * 1) the stream is resharding by splitting or merging adjacent shards 
+   *     (ShutdownReason.TERMINATE)
+   * 2) the failed or latent Worker has stopped sending heartbeats for whatever reason 
+   *     (ShutdownReason.ZOMBIE)
+   *
+   * @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE
+   * @param reason for shutdown (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE)
+   */
+  override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) {
+    logInfo(s"Shutdown:  Shutting down workerId $workerId with reason $reason")
+    reason match {
+      /*
+       * TERMINATE Use Case.  Checkpoint.
+       * Checkpoint to indicate that all records from the shard have been drained and processed.
+       * It's now OK to read from the new shards that resulted from a resharding event.
+       */
+      case ShutdownReason.TERMINATE => 
+        KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+
+      /*
+       * ZOMBIE Use Case.  NoOp.
+       * No checkpoint because other workers may have taken over and already started processing
+       *    the same records.
+       * This may lead to records being processed more than once.
+       */
+      case ShutdownReason.ZOMBIE =>
+
+      /* Unknown reason.  NoOp */
+      case _ =>
+    }
+  }
+}
+
+private[kinesis] object KinesisRecordProcessor extends Logging {
+  /**
+   * Retry the given amount of times with a random backoff time (millis) less than the
+   *   given maxBackOffMillis
+   *
+   * @param expression expression to evalute
+   * @param numRetriesLeft number of retries left
+   * @param maxBackOffMillis: max millis between retries
+   *
+   * @return evaluation of the given expression
+   * @throws Unretryable exception, unexpected exception,
+   *  or any exception that persists after numRetriesLeft reaches 0
+   */
+  @annotation.tailrec
+  def retryRandom[T](expression: => T, numRetriesLeft: Int, maxBackOffMillis: Int): T = {
+    util.Try { expression } match {
+      /* If the function succeeded, evaluate to x. */
+      case util.Success(x) => x
+      /* If the function failed, either retry or throw the exception */
+      case util.Failure(e) => e match {
+        /* Retry:  Throttling or other Retryable exception has occurred */
+        case _: ThrottlingException | _: KinesisClientLibDependencyException if numRetriesLeft > 1
+          => {
+               val backOffMillis = Random.nextInt(maxBackOffMillis)
+               Thread.sleep(backOffMillis)
+               logError(s"Retryable Exception:  Random backOffMillis=${backOffMillis}", e)
+               retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
+             }
+        /* Throw:  Shutdown has been requested by the Kinesis Client Library.*/
+        case _: ShutdownException => {
+          logError(s"ShutdownException:  Caught shutdown exception, skipping checkpoint.", e)
+          throw e
+        }
+        /* Throw:  Non-retryable exception has occurred with the Kinesis Client Library */
+        case _: InvalidStateException => {
+          logError(s"InvalidStateException:  Cannot save checkpoint to the DynamoDB table used" +
+              s" by the Amazon Kinesis Client Library.  Table likely doesn't exist.", e)
+          throw e
+        }
+        /* Throw:  Unexpected exception has occurred */
+        case _ => {
+          logError(s"Unexpected, non-retryable exception.", e)
+          throw e
+        }
+      }
+    }
+  }
+}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
new file mode 100644
index 0000000000000000000000000000000000000000..713cac0e293c0e4f8aa5c7976f7269b9151d065a
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Helper class to create Amazon Kinesis Input Stream
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param ssc    StreamingContext object
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation for more
+   *                            details on the different types of checkpoints.
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
+   *                                 worker's initial starting position in the stream.
+   *                                 The values are either the beginning of the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream (InitialPositionInStream.LATEST).
+   * @param storageLevel Storage level to use for storing the received objects
+   *
+   * @return ReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+      ssc: StreamingContext,
+      streamName: String,
+      endpointUrl: String,
+      checkpointInterval: Duration,
+      initialPositionInStream: InitialPositionInStream,
+      storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
+    ssc.receiverStream(new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl,
+        checkpointInterval, initialPositionInStream, storageLevel))
+  }
+
+  /**
+   * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param jssc Java StreamingContext object
+   * @param ssc    StreamingContext object
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation for more
+   *                            details on the different types of checkpoints.
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
+   *                                 worker's initial starting position in the stream.
+   *                                 The values are either the beginning of the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream (InitialPositionInStream.LATEST).
+   * @param storageLevel Storage level to use for storing the received objects
+   *
+   * @return JavaReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+      jssc: JavaStreamingContext, 
+      streamName: String, 
+      endpointUrl: String, 
+      checkpointInterval: Duration,
+      initialPositionInStream: InitialPositionInStream,
+      storageLevel: StorageLevel): JavaReceiverInputDStream[Array[Byte]] = {
+    jssc.receiverStream(new KinesisReceiver(jssc.ssc.sc.appName, streamName,
+        endpointUrl, checkpointInterval, initialPositionInStream, storageLevel))
+  }
+}
diff --git a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
new file mode 100644
index 0000000000000000000000000000000000000000..87954a31f60cecd3d7661f7d9a7e0aaa991e62c8
--- /dev/null
+++ b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.junit.Test;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+/**
+ * Demonstrate the use of the KinesisUtils Java API
+ */
+public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
+  @Test
+  public void testKinesisStream() {
+    // Tests the API, does not actually test data receiving
+    JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
+        "https://kinesis.us-west-2.amazonaws.com", new Duration(2000), 
+        InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
+    
+    ssc.stop();
+  }
+}
diff --git a/extras/kinesis-asl/src/test/resources/log4j.properties b/extras/kinesis-asl/src/test/resources/log4j.properties
new file mode 100644
index 0000000000000000000000000000000000000000..e01e0495954759baea5690058a5d01233c6df43c
--- /dev/null
+++ b/extras/kinesis-asl/src/test/resources/log4j.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..41dbd64c2b1fa087ef45fef9ab317aea6fded36a
--- /dev/null
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConversions.seqAsJavaList
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.Seconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.TestSuiteBase
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.ManualClock
+import org.scalatest.BeforeAndAfter
+import org.scalatest.Matchers
+import org.scalatest.mock.EasyMockSugar
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+import com.amazonaws.services.kinesis.model.Record
+
+/**
+ *  Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor 
+ */
+class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter
+    with EasyMockSugar {
+
+  val app = "TestKinesisReceiver"
+  val stream = "mySparkStream"
+  val endpoint = "endpoint-url"
+  val workerId = "dummyWorkerId"
+  val shardId = "dummyShardId"
+
+  val record1 = new Record()
+  record1.setData(ByteBuffer.wrap("Spark In Action".getBytes()))
+  val record2 = new Record()
+  record2.setData(ByteBuffer.wrap("Learning Spark".getBytes()))
+  val batch = List[Record](record1, record2)
+
+  var receiverMock: KinesisReceiver = _
+  var checkpointerMock: IRecordProcessorCheckpointer = _
+  var checkpointClockMock: ManualClock = _
+  var checkpointStateMock: KinesisCheckpointState = _
+  var currentClockMock: Clock = _
+
+  override def beforeFunction() = {
+    receiverMock = mock[KinesisReceiver]
+    checkpointerMock = mock[IRecordProcessorCheckpointer]
+    checkpointClockMock = mock[ManualClock]
+    checkpointStateMock = mock[KinesisCheckpointState]
+    currentClockMock = mock[Clock]
+  }
+
+  test("kinesis utils api") {
+    val ssc = new StreamingContext(master, framework, batchDuration)
+    // Tests the API, does not actually test data receiving
+    val kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
+      "https://kinesis.us-west-2.amazonaws.com", Seconds(2),
+      InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2);
+    ssc.stop()
+  }
+
+  test("process records including store and checkpoint") {
+    val expectedCheckpointIntervalMillis = 10
+    expecting {
+      receiverMock.isStopped().andReturn(false).once()
+      receiverMock.store(record1.getData().array()).once()
+      receiverMock.store(record2.getData().array()).once()
+      checkpointStateMock.shouldCheckpoint().andReturn(true).once()
+      checkpointerMock.checkpoint().once()
+      checkpointStateMock.advanceCheckpoint().once()
+    }
+    whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
+      val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+          checkpointStateMock)
+      recordProcessor.processRecords(batch, checkpointerMock)
+    }
+  }
+
+  test("shouldn't store and checkpoint when receiver is stopped") {
+    expecting {
+      receiverMock.isStopped().andReturn(true).once()
+    }
+    whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
+      val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+          checkpointStateMock)
+      recordProcessor.processRecords(batch, checkpointerMock)
+    }
+  }
+
+  test("shouldn't checkpoint when exception occurs during store") {
+    expecting {
+      receiverMock.isStopped().andReturn(false).once()
+      receiverMock.store(record1.getData().array()).andThrow(new RuntimeException()).once()
+    }
+    whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
+      intercept[RuntimeException] {
+        val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+            checkpointStateMock)
+        recordProcessor.processRecords(batch, checkpointerMock)
+      }
+    }
+  }
+
+  test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
+    expecting {
+      currentClockMock.currentTime().andReturn(0).once()
+    }
+    whenExecuting(currentClockMock) {
+    val checkpointIntervalMillis = 10
+    val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
+    assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+    }
+  }
+
+  test("should checkpoint if we have exceeded the checkpoint interval") {
+    expecting {
+      currentClockMock.currentTime().andReturn(0).once()
+    }
+    whenExecuting(currentClockMock) {
+      val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
+      assert(checkpointState.shouldCheckpoint())
+    }
+  }
+
+  test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
+    expecting {
+      currentClockMock.currentTime().andReturn(0).once()
+    }
+    whenExecuting(currentClockMock) {
+      val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
+      assert(!checkpointState.shouldCheckpoint())
+    }
+  }
+
+  test("should add to time when advancing checkpoint") {
+    expecting {
+      currentClockMock.currentTime().andReturn(0).once()
+    }
+    whenExecuting(currentClockMock) {
+      val checkpointIntervalMillis = 10
+      val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
+      assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+      checkpointState.advanceCheckpoint()
+      assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))
+    }
+  }
+
+  test("shutdown should checkpoint if the reason is TERMINATE") {
+    expecting {
+      checkpointerMock.checkpoint().once()
+    }
+    whenExecuting(checkpointerMock, checkpointStateMock) {
+      val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, 
+          checkpointStateMock)
+      val reason = ShutdownReason.TERMINATE
+      recordProcessor.shutdown(checkpointerMock, reason)
+    }
+  }
+
+  test("shutdown should not checkpoint if the reason is something other than TERMINATE") {
+    expecting {
+    }
+    whenExecuting(checkpointerMock, checkpointStateMock) {
+      val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, 
+          checkpointStateMock)
+      recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
+      recordProcessor.shutdown(checkpointerMock, null)
+    }
+  }
+
+  test("retry success on first attempt") {
+    val expectedIsStopped = false
+    expecting {
+      receiverMock.isStopped().andReturn(expectedIsStopped).once()
+    }
+    whenExecuting(receiverMock) {
+      val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+      assert(actualVal == expectedIsStopped)
+    }
+  }
+
+  test("retry success on second attempt after a Kinesis throttling exception") {
+    val expectedIsStopped = false
+    expecting {
+      receiverMock.isStopped().andThrow(new ThrottlingException("error message"))
+        .andReturn(expectedIsStopped).once()
+    }
+    whenExecuting(receiverMock) {
+      val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+      assert(actualVal == expectedIsStopped)
+    }
+  }
+
+  test("retry success on second attempt after a Kinesis dependency exception") {
+    val expectedIsStopped = false
+    expecting {
+      receiverMock.isStopped().andThrow(new KinesisClientLibDependencyException("error message"))
+        .andReturn(expectedIsStopped).once()
+    }
+    whenExecuting(receiverMock) {
+      val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+      assert(actualVal == expectedIsStopped)
+    }
+  }
+
+  test("retry failed after a shutdown exception") {
+    expecting {
+      checkpointerMock.checkpoint().andThrow(new ShutdownException("error message")).once()
+    }
+    whenExecuting(checkpointerMock) {
+      intercept[ShutdownException] {
+        KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+      }
+    }
+  }
+
+  test("retry failed after an invalid state exception") {
+    expecting {
+      checkpointerMock.checkpoint().andThrow(new InvalidStateException("error message")).once()
+    }
+    whenExecuting(checkpointerMock) {
+      intercept[InvalidStateException] {
+        KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+      }
+    }
+  }
+
+  test("retry failed after unexpected exception") {
+    expecting {
+      checkpointerMock.checkpoint().andThrow(new RuntimeException("error message")).once()
+    }
+    whenExecuting(checkpointerMock) {
+      intercept[RuntimeException] {
+        KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+      }
+    }
+  }
+
+  test("retry failed after exhausing all retries") {
+    val expectedErrorMessage = "final try error message"
+    expecting {
+      checkpointerMock.checkpoint().andThrow(new ThrottlingException("error message"))
+        .andThrow(new ThrottlingException(expectedErrorMessage)).once()
+    }
+    whenExecuting(checkpointerMock) {
+      val exception = intercept[RuntimeException] {
+        KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+      }
+      exception.getMessage().shouldBe(expectedErrorMessage)
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 99ae4b8b33f94a49af0efe02405af87e828a7fbc..a42759169149be71bd1f248f9e58b916bd231890 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,8 @@
     <codahale.metrics.version>3.0.0</codahale.metrics.version>
     <avro.version>1.7.6</avro.version>
     <jets3t.version>0.7.1</jets3t.version>
+    <aws.java.sdk.version>1.8.3</aws.java.sdk.version>
+    <aws.kinesis.client.version>1.1.0</aws.kinesis.client.version>
 
     <PermGen>64m</PermGen>
     <MaxPermGen>512m</MaxPermGen>
@@ -1011,6 +1013,14 @@
       </modules>
     </profile>
 
+    <!-- Kinesis integration is not included by default due to ASL-licensed code -->
+    <profile>
+      <id>kinesis-asl</id>
+      <modules>
+        <module>extras/kinesis-asl</module>
+      </modules>
+    </profile>
+
     <profile>
       <id>java8-tests</id>
       <build>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 1d7cc6dd6aef3a30306d6c4c284b4c95964d80d2..aac621fe53938d447e95ce7a06d7656045cb57f2 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -37,8 +37,8 @@ object BuildCommons {
       "spark", "sql", "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka",
       "streaming-mqtt", "streaming-twitter", "streaming-zeromq").map(ProjectRef(buildLocation, _))
 
-  val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl) =
-    Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl")
+  val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl, sparkKinesisAsl) =
+    Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl", "kinesis-asl")
       .map(ProjectRef(buildLocation, _))
 
   val assemblyProjects@Seq(assembly, examples) = Seq("assembly", "examples")
@@ -62,7 +62,7 @@ object SparkBuild extends PomBuild {
     var isAlphaYarn = false
     var profiles: mutable.Seq[String] = mutable.Seq.empty
     if (Properties.envOrNone("SPARK_GANGLIA_LGPL").isDefined) {
-      println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.")
+      println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.")
       profiles ++= Seq("spark-ganglia-lgpl")
     }
     if (Properties.envOrNone("SPARK_HIVE").isDefined) {