diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index fe55db6e2c6314739cf3c3e2fd4f34511aff7b6f..65d5da82fcccc2d76900fad66ab3db0c15c953e0 100644
--- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -13,19 +13,19 @@ import spark.streaming.util.RawTextHelper._
 object KafkaWordCount {
   def main(args: Array[String]) {
     
-    if (args.length < 6) {
-      System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>")
+    if (args.length < 5) {
+      System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
       System.exit(1)
     }
 
-    val Array(master, hostname, port, group, topics, numThreads) = args
+    val Array(master, zkQuorum, group, topics, numThreads) = args
 
     val sc = new SparkContext(master, "KafkaWordCount")
     val ssc =  new StreamingContext(sc, Seconds(2))
     ssc.checkpoint("checkpoint")
 
     val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
-    val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)
+    val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap)
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
     wordCounts.print()
@@ -38,16 +38,16 @@ object KafkaWordCount {
 object KafkaWordCountProducer {
 
   def main(args: Array[String]) {
-    if (args.length < 3) {
-      System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>")
+    if (args.length < 2) {
+      System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>")
       System.exit(1)
     }
 
-    val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args
+    val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args
 
     // Zookeper connection properties
     val props = new Properties()
-    props.put("zk.connect", hostname + ":" + port)
+    props.put("zk.connect", zkQuorum)
     props.put("serializer.class", "kafka.serializer.StringEncoder")
     
     val config = new ProducerConfig(props)
diff --git a/run b/run
index 060856007f039d80b18a414a0f2cd28f44f010d5..9015fdbff7537368d6e93d8f1166dfd550d4394a 100755
--- a/run
+++ b/run
@@ -84,6 +84,9 @@ CLASSPATH+=":$CORE_DIR/src/main/resources"
 CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes"
 CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
 CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
+for jar in `find "$STREAMING_DIR/lib" -name '*jar'`; do
+  CLASSPATH+=":$jar"
+done
 if [ -e "$FWDIR/lib_managed" ]; then
   CLASSPATH+=":$FWDIR/lib_managed/jars/*"
   CLASSPATH+=":$FWDIR/lib_managed/bundles/*"
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 5781b1cc720b9ea28045466f712c78104a1a3e08..db0461b98594c3ca091df37799ccbb693d7a6032 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -141,8 +141,7 @@ class StreamingContext private (
 
   /**
    * Create an input stream that pulls messages form a Kafka Broker.
-   * @param hostname Zookeper hostname.
-   * @param port Zookeper port.
+   * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
    * @param groupId The group id for this consumer.
    * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
    * in its own thread.
@@ -151,14 +150,13 @@ class StreamingContext private (
    * @param storageLevel RDD storage level. Defaults to memory-only.
    */
   def kafkaStream[T: ClassManifest](
-      hostname: String,
-      port: Int,
+      zkQuorum: String,
       groupId: String,
       topics: Map[String, Int],
       initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](),
       storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
     ): DStream[T] = {
-    val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, topics, initialOffsets, storageLevel)
+    val inputStream = new KafkaInputDStream[T](this, zkQuorum, groupId, topics, initialOffsets, storageLevel)
     registerInputStream(inputStream)
     inputStream
   }
@@ -288,17 +286,31 @@ class StreamingContext private (
   }
 
   /**
-   * Creates a input stream from an queue of RDDs. In each batch,
+   * Creates an input stream from a queue of RDDs. In each batch,
+   * it will process either one or all of the RDDs returned by the queue.
+   * @param queue      Queue of RDDs
+   * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+   * @tparam T         Type of objects in the RDD
+   */
+  def queueStream[T: ClassManifest](
+      queue: Queue[RDD[T]],
+      oneAtATime: Boolean = true
+    ): DStream[T] = {
+    queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))
+  }
+
+  /**
+   * Creates an input stream from a queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    * @param queue      Queue of RDDs
    * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
-   * @param defaultRDD Default RDD is returned by the DStream when the queue is empty
+   * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. Set as null if no RDD should be returned when empty
    * @tparam T         Type of objects in the RDD
    */
   def queueStream[T: ClassManifest](
       queue: Queue[RDD[T]],
-      oneAtATime: Boolean = true,
-      defaultRDD: RDD[T] = null
+      oneAtATime: Boolean,
+      defaultRDD: RDD[T]
     ): DStream[T] = {
     val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
     registerInputStream(inputStream)
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 760d9b5cf36e0c66a53406651980cf01f0610487..4f8c8b9d104bdeefd4730b1cb99df4a4497a4811 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -23,8 +23,7 @@ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, part
 /**
  * Input stream that pulls messages from a Kafka Broker.
  * 
- * @param host Zookeper hostname.
- * @param port Zookeper port.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
  * @param groupId The group id for this consumer.
  * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
  * in its own thread.
@@ -35,65 +34,21 @@ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, part
 private[streaming]
 class KafkaInputDStream[T: ClassManifest](
     @transient ssc_ : StreamingContext,
-    host: String,
-    port: Int,
+    zkQuorum: String,
     groupId: String,
     topics: Map[String, Int],
     initialOffsets: Map[KafkaPartitionKey, Long],
     storageLevel: StorageLevel
   ) extends NetworkInputDStream[T](ssc_ ) with Logging {
 
-  // Metadata that keeps track of which messages have already been consumed.
-  var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]()
-  
-  /* NOT USED - Originally intended for fault-tolerance
- 
-  // In case of a failure, the offets for a particular timestamp will be restored.
-  @transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null
-
- 
-  override protected[streaming] def addMetadata(metadata: Any) {
-    metadata match {
-      case x : KafkaInputDStreamMetadata =>
-        savedOffsets(x.timestamp) = x.data
-        // TOOD: Remove logging
-        logInfo("New saved Offsets: " + savedOffsets)
-      case _ => logInfo("Received unknown metadata: " + metadata.toString)
-    }
-  }
-
-  override protected[streaming] def updateCheckpointData(currentTime: Time) {
-    super.updateCheckpointData(currentTime)
-    if(savedOffsets.size > 0) {
-      // Find the offets that were stored before the checkpoint was initiated
-      val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last
-      val latestOffsets = savedOffsets(key)
-      logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString)
-      checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets)
-      // TODO: This may throw out offsets that are created after the checkpoint,
-      // but it's unlikely we'll need them.
-      savedOffsets.clear()
-    }
-  }
-
-  override protected[streaming] def restoreCheckpointData() {
-    super.restoreCheckpointData()
-    logInfo("Restoring KafkaDStream checkpoint data.")
-    checkpointData match { 
-      case x : KafkaDStreamCheckpointData => 
-        restoredOffsets = x.savedOffsets
-        logInfo("Restored KafkaDStream offsets: " + savedOffsets)
-    }
-  } */
-
   def createReceiver(): NetworkReceiver[T] = {
-    new KafkaReceiver(host, port,  groupId, topics, initialOffsets, storageLevel)
+    new KafkaReceiver(zkQuorum,  groupId, topics, initialOffsets, storageLevel)
         .asInstanceOf[NetworkReceiver[T]]
   }
 }
 
 private[streaming]
-class KafkaReceiver(host: String, port: Int, groupId: String,
+class KafkaReceiver(zkQuorum: String, groupId: String,
   topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], 
   storageLevel: StorageLevel) extends NetworkReceiver[Any] {
 
@@ -102,8 +57,6 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
 
   // Handles pushing data into the BlockManager
   lazy protected val blockGenerator = new BlockGenerator(storageLevel)
-  // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset
-  lazy val offsets = HashMap[KafkaPartitionKey, Long]()
   // Connection to Kafka
   var consumerConnector : ZookeeperConsumerConnector = null
 
@@ -118,24 +71,23 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
     // In case we are using multiple Threads to handle Kafka Messages
     val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
 
-    val zooKeeperEndPoint = host + ":" + port
     logInfo("Starting Kafka Consumer Stream with group: " + groupId)
     logInfo("Initial offsets: " + initialOffsets.toString)
     
     // Zookeper connection properties
     val props = new Properties()
-    props.put("zk.connect", zooKeeperEndPoint)
+    props.put("zk.connect", zkQuorum)
     props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString)
     props.put("groupid", groupId)
 
     // Create the connection to the cluster
-    logInfo("Connecting to Zookeper: " + zooKeeperEndPoint)
+    logInfo("Connecting to Zookeper: " + zkQuorum)
     val consumerConfig = new ConsumerConfig(props)
     consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector]
-    logInfo("Connected to " + zooKeeperEndPoint)
+    logInfo("Connected to " + zkQuorum)
 
-    // Reset the Kafka offsets in case we are recovering from a failure
-    resetOffsets(initialOffsets)
+    // If specified, set the topic offset
+    setOffsets(initialOffsets)
 
     // Create Threads for each Topic/Message Stream we are listening
     val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder())
@@ -148,7 +100,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
   }
 
   // Overwrites the offets in Zookeper.
-  private def resetOffsets(offsets: Map[KafkaPartitionKey, Long]) {
+  private def setOffsets(offsets: Map[KafkaPartitionKey, Long]) {
     offsets.foreach { case(key, offset) =>
       val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic)
       val partitionName = key.brokerId + "-" + key.partId
@@ -164,28 +116,9 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
       stream.takeWhile { msgAndMetadata =>
         blockGenerator += msgAndMetadata.message
 
-        // Updating the offet. The key is (broker, topic, group, partition).
-        val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, 
-          groupId, msgAndMetadata.topicInfo.partition.partId)
-        val offset = msgAndMetadata.topicInfo.getConsumeOffset
-        offsets.put(key, offset)
-        // logInfo("Handled message: " + (key, offset).toString)
-
         // Keep on handling messages
         true
       }  
     }
   }
-
-  // NOT USED - Originally intended for fault-tolerance
-  // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel) 
-  // extends BufferingBlockCreator[Any](receiver, storageLevel) {
-
-  //   override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {
-  //     // Creates a new Block with Kafka-specific Metadata
-  //     new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap))
-  //   }
-
-  // }
-
 }
diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index b4506c74aa4e14daf720d560c52ae24b69c5aeac..db62955036e9c1a72a5e11571628a97ca858af53 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -48,8 +48,16 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest](
             //logDebug("Generating state RDD for time " + validTime)
             return Some(stateRDD)
           }
-          case None => {    // If parent RDD does not exist, then return old state RDD
-            return Some(prevStateRDD)
+          case None => {    // If parent RDD does not exist
+
+            // Re-apply the update function to the old state RDD
+            val updateFuncLocal = updateFunc
+            val finalFunc = (iterator: Iterator[(K, S)]) => {
+              val i = iterator.map(t => (t._1, Seq[V](), Option(t._2)))
+              updateFuncLocal(i)
+            }
+            val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
+            return Some(stateRDD)
           }
         }
       }
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index bfdf32c73eeeaa5f8d2368eeb654a80393f30a22..d98b840b8e884f2dfa7d3ec19243b3d97cd2fa9b 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -165,6 +165,51 @@ class BasicOperationsSuite extends TestSuiteBase {
     testOperation(inputData, updateStateOperation, outputData, true)
   }
 
+  test("updateStateByKey - object lifecycle") {
+    val inputData =
+      Seq(
+        Seq("a","b"),
+        null,
+        Seq("a","c","a"),
+        Seq("c"),
+        null,
+        null
+      )
+
+    val outputData =
+      Seq(
+        Seq(("a", 1), ("b", 1)),
+        Seq(("a", 1), ("b", 1)),
+        Seq(("a", 3), ("c", 1)),
+        Seq(("a", 3), ("c", 2)),
+        Seq(("c", 2)),
+        Seq()
+      )
+
+    val updateStateOperation = (s: DStream[String]) => {
+      class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable
+
+      // updateFunc clears a state when a StateObject is seen without new values twice in a row
+      val updateFunc = (values: Seq[Int], state: Option[StateObject]) => {
+        val stateObj = state.getOrElse(new StateObject)
+        values.foldLeft(0)(_ + _) match {
+          case 0 => stateObj.expireCounter += 1 // no new values
+          case n => { // has new values, increment and reset expireCounter
+            stateObj.counter += n
+            stateObj.expireCounter = 0
+          }
+        }
+        stateObj.expireCounter match {
+          case 2 => None // seen twice with no new values, give it the boot
+          case _ => Option(stateObj)
+        }
+      }
+      s.map(_ -> 1).updateStateByKey[StateObject](updateFunc).mapValues(_.counter)
+    }
+
+    testOperation(inputData, updateStateOperation, outputData, true)
+  }
+
   test("forgetting of RDDs - map and window operations") {
     assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
 
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 49129f39640c4d906dee6604fb0b7e1d84e76fee..c2733831b23de3728f9534ddc4d12223aef8b460 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -28,6 +28,11 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
     logInfo("Computing RDD for time " + validTime)
     val index = ((validTime - zeroTime) / slideDuration - 1).toInt
     val selectedInput = if (index < input.size) input(index) else Seq[T]()
+
+    // lets us test cases where RDDs are not created
+    if (selectedInput == null)
+      return None
+
     val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
     logInfo("Created RDD " + rdd.id + " with " + selectedInput)
     Some(rdd)