diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f6bf552e6bb8ef7a33da739f60c8c89c9594b353
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Random, Success, Try}
+
+import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.regions.RegionUtils
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
+import com.amazonaws.services.dynamodbv2.document.DynamoDB
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.model._
+
+import org.apache.spark.Logging
+
+/**
+ * Shared utility methods for performing Kinesis tests that actually transfer data
+ */
+private class KinesisTestUtils(
+    val endpointUrl: String = "https://kinesis.us-west-2.amazonaws.com",
+    _regionName: String = "") extends Logging {
+
+  val regionName = if (_regionName.length == 0) {
+    RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+  } else {
+    RegionUtils.getRegion(_regionName).getName()
+  }
+
+  val streamShardCount = 2
+
+  private val createStreamTimeoutSeconds = 300
+  private val describeStreamPollTimeSeconds = 1
+
+  @volatile
+  private var streamCreated = false
+  private var _streamName: String = _
+
+  private lazy val kinesisClient = {
+    val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
+    client.setEndpoint(endpointUrl)
+    client
+  }
+
+  private lazy val dynamoDB = {
+    val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
+    dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
+    new DynamoDB(dynamoDBClient)
+  }
+
+  def streamName: String = {
+    require(streamCreated, "Stream not yet created, call createStream() to create one")
+    _streamName
+  }
+
+  def createStream(): Unit = {
+    logInfo("Creating stream")
+    require(!streamCreated, "Stream already created")
+    _streamName = findNonExistentStreamName()
+
+    // Create a stream. The number of shards determines the provisioned throughput.
+    val createStreamRequest = new CreateStreamRequest()
+    createStreamRequest.setStreamName(_streamName)
+    createStreamRequest.setShardCount(2)
+    kinesisClient.createStream(createStreamRequest)
+
+    // The stream is now being created. Wait for it to become active.
+    waitForStreamToBeActive(_streamName)
+    streamCreated = true
+    logInfo("Created stream")
+  }
+
+  /**
+   * Push data to Kinesis stream and return a map of
+   * shardId -> seq of (data, seq number) pushed to corresponding shard
+   */
+  def pushData(testData: Seq[Int]): Map[String, Seq[(Int, String)]] = {
+    require(streamCreated, "Stream not yet created, call createStream() to create one")
+    val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
+
+    testData.foreach { num =>
+      val str = num.toString
+      val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
+        .withData(ByteBuffer.wrap(str.getBytes()))
+        .withPartitionKey(str)
+
+      val putRecordResult = kinesisClient.putRecord(putRecordRequest)
+      val shardId = putRecordResult.getShardId
+      val seqNumber = putRecordResult.getSequenceNumber()
+      val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+        new ArrayBuffer[(Int, String)]())
+      sentSeqNumbers += ((num, seqNumber))
+    }
+
+    logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
+    shardIdToSeqNumbers.toMap
+  }
+
+  def describeStream(streamNameToDescribe: String = streamName): Option[StreamDescription] = {
+    try {
+      val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
+      val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
+      Some(desc)
+    } catch {
+      case rnfe: ResourceNotFoundException =>
+        None
+    }
+  }
+
+  def deleteStream(): Unit = {
+    try {
+      if (describeStream().nonEmpty) {
+        val deleteStreamRequest = new DeleteStreamRequest()
+        kinesisClient.deleteStream(streamName)
+      }
+    } catch {
+      case e: Exception =>
+        logWarning(s"Could not delete stream $streamName")
+    }
+  }
+
+  def deleteDynamoDBTable(tableName: String): Unit = {
+    try {
+      val table = dynamoDB.getTable(tableName)
+      table.delete()
+      table.waitForDelete()
+    } catch {
+      case e: Exception =>
+        logWarning(s"Could not delete DynamoDB table $tableName")
+    }
+  }
+
+  private def findNonExistentStreamName(): String = {
+    var testStreamName: String = null
+    do {
+      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+      testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
+    } while (describeStream(testStreamName).nonEmpty)
+    testStreamName
+  }
+
+  private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
+    val startTime = System.currentTimeMillis()
+    val endTime = startTime + TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
+    while (System.currentTimeMillis() < endTime) {
+      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+      describeStream(streamNameToWaitFor).foreach { description =>
+        val streamStatus = description.getStreamStatus()
+        logDebug(s"\t- current state: $streamStatus\n")
+        if ("ACTIVE".equals(streamStatus)) {
+          return
+        }
+      }
+    }
+    require(false, s"Stream $streamName never became active")
+  }
+}
+
+private[kinesis] object KinesisTestUtils {
+
+  val envVarName = "RUN_KINESIS_TESTS"
+
+  val shouldRunTests = sys.env.get(envVarName) == Some("1")
+
+  def isAWSCredentialsPresent: Boolean = {
+    Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
+  }
+
+  def getAWSCredentials(): AWSCredentials = {
+    assert(shouldRunTests,
+      "Kinesis test not enabled, should not attempt to get AWS credentials")
+    Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
+      case Success(cred) => cred
+      case Failure(e) =>
+        throw new Exception("Kinesis tests enabled, but could get not AWS credentials")
+    }
+  }
+}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..6d011f295e7f70a5f7d4d39eacec441262e87f47
--- /dev/null
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Helper class that runs Kinesis real data transfer tests or
+ * ignores them based on env variable is set or not.
+ */
+trait KinesisSuiteHelper { self: SparkFunSuite =>
+  import KinesisTestUtils._
+
+  /** Run the test if environment variable is set or ignore the test */
+  def testOrIgnore(testName: String)(testBody: => Unit) {
+    if (shouldRunTests) {
+      test(testName)(testBody)
+    } else {
+      ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody)
+    }
+  }
+}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 2103dca6b766f2d99e7af39d78b2a3d8db123242..98f2c7c4f1bfbd7b6e9222395dda6bd96d54bcc5 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -73,23 +73,6 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
       checkpointStateMock, currentClockMock)
   }
 
-  test("KinesisUtils API") {
-    val ssc = new StreamingContext(master, framework, batchDuration)
-    // Tests the API, does not actually test data receiving
-    val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
-      "https://kinesis.us-west-2.amazonaws.com", Seconds(2),
-      InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
-    val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
-      "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
-      InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
-    val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
-      "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
-      InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
-      "awsAccessKey", "awsSecretKey")
-
-    ssc.stop()
-  }
-
   test("check serializability of SerializableAWSCredentials") {
     Utils.deserialize[SerializableAWSCredentials](
       Utils.serialize(new SerializableAWSCredentials("x", "y")))
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..d3dd541fe437182c7a046ed45123f9c213f3dd3c
--- /dev/null
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.scalatest.concurrent.Eventually
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+
+class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper
+  with Eventually with BeforeAndAfter with BeforeAndAfterAll {
+
+  private val kinesisTestUtils = new KinesisTestUtils()
+
+  // This is the name that KCL uses to save metadata to DynamoDB
+  private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
+
+  private var ssc: StreamingContext = _
+  private var sc: SparkContext = _
+
+  override def beforeAll(): Unit = {
+    kinesisTestUtils.createStream()
+    val conf = new SparkConf()
+      .setMaster("local[4]")
+      .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
+    sc = new SparkContext(conf)
+  }
+
+  override def afterAll(): Unit = {
+    sc.stop()
+    // Delete the Kinesis stream as well as the DynamoDB table generated by
+    // Kinesis Client Library when consuming the stream
+    kinesisTestUtils.deleteStream()
+    kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+  }
+
+  before {
+    // Delete the DynamoDB table generated by Kinesis Client Library when
+    // consuming from the stream, so that each unit test can start from
+    // scratch without prior history of data consumption
+    kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+  }
+
+  after {
+    if (ssc != null) {
+      ssc.stop(stopSparkContext = false)
+      ssc = null
+    }
+  }
+
+  test("KinesisUtils API") {
+    ssc = new StreamingContext(sc, Seconds(1))
+    // Tests the API, does not actually test data receiving
+    val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
+      "https://kinesis.us-west-2.amazonaws.com", Seconds(2),
+      InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+    val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+      "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+      InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
+    val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+      "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+      InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
+      "awsAccessKey", "awsSecretKey")
+  }
+
+
+  /**
+   * Test the stream by sending data to a Kinesis stream and receiving from it.
+   * This test is not run by default as it requires AWS credentials that the test
+   * environment may not have. Even if there is AWS credentials available, the user
+   * may not want to run these tests to avoid the Kinesis costs. To enable this test,
+   * you must have AWS credentials available through the default AWS provider chain,
+   * and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
+   */
+  testOrIgnore("basic operation") {
+    ssc = new StreamingContext(sc, Seconds(1))
+    val aWSCredentials = KinesisTestUtils.getAWSCredentials()
+    val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
+      kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
+      Seconds(10), StorageLevel.MEMORY_ONLY,
+      aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey)
+
+    val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+    stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
+      collected ++= rdd.collect()
+      logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
+    }
+    ssc.start()
+
+    val testData = 1 to 10
+    eventually(timeout(120 seconds), interval(10 second)) {
+      kinesisTestUtils.pushData(testData)
+      assert(collected === testData.toSet, "\nData received does not match data sent")
+    }
+    ssc.stop()
+  }
+}