diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 25c67b279b7d2feb38a5b5d520f3ed0ab73d5f5c..bb7f216ca7dcd9b2c5e5e43a12b5850c3fadcd08 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -199,13 +199,11 @@ class StreamingContext private (
   }
 
   /**
-   * Create an input stream that pulls messages form a Kafka Broker.
+   * Create an input stream that pulls messages from a Kafka Broker.
    * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
    * @param groupId The group id for this consumer.
    * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
    * in its own thread.
-   * @param initialOffsets Optional initial offsets for each of the partitions to consume.
-   * By default the value is pulled from zookeper.
    * @param storageLevel  Storage level to use for storing the received objects
    *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
    */
@@ -213,10 +211,25 @@ class StreamingContext private (
       zkQuorum: String,
       groupId: String,
       topics: Map[String, Int],
-      initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](),
       storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
     ): DStream[T] = {
-    val inputStream = new KafkaInputDStream[T](this, zkQuorum, groupId, topics, initialOffsets, storageLevel)
+    val kafkaParams = Map[String, String]("zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000");
+    kafkaStream[T](kafkaParams, topics, storageLevel)
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kafka Broker.
+   * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
+   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+   * in its own thread.
+   * @param storageLevel  Storage level to use for storing the received objects
+   */
+  def kafkaStream[T: ClassManifest](
+      kafkaParams: Map[String, String],
+      topics: Map[String, Int],
+      storageLevel: StorageLevel
+    ): DStream[T] = {
+    val inputStream = new KafkaInputDStream[T](this, kafkaParams, topics, storageLevel)
     registerInputStream(inputStream)
     inputStream
   }
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index f3b40b5b88ef9dd80f2f717ccdd2bf151ddd6f00..7a8864614c95f8a39b68cc9709ed402a58bcdb44 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -80,52 +80,23 @@ class JavaStreamingContext(val ssc: StreamingContext) {
 
   /**
    * Create an input stream that pulls messages form a Kafka Broker.
+   * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
    * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
    * @param groupId The group id for this consumer.
    * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
    * in its own thread.
-   * @param initialOffsets Optional initial offsets for each of the partitions to consume.
-   * By default the value is pulled from zookeper.
-   */
-  def kafkaStream[T](
-    zkQuorum: String,
-    groupId: String,
-    topics: JMap[String, JInt],
-    initialOffsets: JMap[KafkaPartitionKey, JLong])
-  : JavaDStream[T] = {
-    implicit val cmt: ClassManifest[T] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
-    ssc.kafkaStream[T](
-      zkQuorum,
-      groupId,
-      Map(topics.mapValues(_.intValue()).toSeq: _*),
-      Map(initialOffsets.mapValues(_.longValue()).toSeq: _*))
-  }
-
-  /**
-   * Create an input stream that pulls messages form a Kafka Broker.
-   * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
-   * @param groupId The group id for this consumer.
-   * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
-   * in its own thread.
-   * @param initialOffsets Optional initial offsets for each of the partitions to consume.
-   * By default the value is pulled from zookeper.
    * @param storageLevel RDD storage level. Defaults to memory-only
    */
   def kafkaStream[T](
-    zkQuorum: String,
-    groupId: String,
+	kafkaParams: JMap[String, String],
     topics: JMap[String, JInt],
-    initialOffsets: JMap[KafkaPartitionKey, JLong],
     storageLevel: StorageLevel)
   : JavaDStream[T] = {
     implicit val cmt: ClassManifest[T] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
     ssc.kafkaStream[T](
-      zkQuorum,
-      groupId,
+      kafkaParams.toMap,
       Map(topics.mapValues(_.intValue()).toSeq: _*),
-      Map(initialOffsets.mapValues(_.longValue()).toSeq: _*),
       storageLevel)
   }
 
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index dc7139cc273cf75be9dd85e1caaa8579dc052d7c..17a5be3420d41a051e6aef9c70c802ad9f31c373 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -12,54 +12,45 @@ import kafka.message.{Message, MessageSet, MessageAndMetadata}
 import kafka.serializer.StringDecoder
 import kafka.utils.{Utils, ZKGroupTopicDirs}
 import kafka.utils.ZkUtils._
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient._
 
 import scala.collection.mutable.HashMap
 import scala.collection.JavaConversions._
 
 
-// Key for a specific Kafka Partition: (broker, topic, group, part)
-case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
-
 /**
  * Input stream that pulls messages from a Kafka Broker.
  * 
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
+ * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
  * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
  * in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
  * @param storageLevel RDD storage level.
  */
 private[streaming]
 class KafkaInputDStream[T: ClassManifest](
     @transient ssc_ : StreamingContext,
-    zkQuorum: String,
-    groupId: String,
+    kafkaParams: Map[String, String],
     topics: Map[String, Int],
-    initialOffsets: Map[KafkaPartitionKey, Long],
     storageLevel: StorageLevel
   ) extends NetworkInputDStream[T](ssc_ ) with Logging {
 
 
   def getReceiver(): NetworkReceiver[T] = {
-    new KafkaReceiver(zkQuorum,  groupId, topics, initialOffsets, storageLevel)
+    new KafkaReceiver(kafkaParams, topics, storageLevel)
         .asInstanceOf[NetworkReceiver[T]]
   }
 }
 
 private[streaming]
-class KafkaReceiver(zkQuorum: String, groupId: String,
-  topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], 
+class KafkaReceiver(kafkaParams: Map[String, String],
+  topics: Map[String, Int],
   storageLevel: StorageLevel) extends NetworkReceiver[Any] {
 
-  // Timeout for establishing a connection to Zookeper in ms.
-  val ZK_TIMEOUT = 10000
-
   // Handles pushing data into the BlockManager
   lazy protected val blockGenerator = new BlockGenerator(storageLevel)
   // Connection to Kafka
-  var consumerConnector : ZookeeperConsumerConnector = null
+  var consumerConnector : ConsumerConnector = null
 
   def onStop() {
     blockGenerator.stop()
@@ -72,23 +63,23 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
     // In case we are using multiple Threads to handle Kafka Messages
     val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
 
-    logInfo("Starting Kafka Consumer Stream with group: " + groupId)
-    logInfo("Initial offsets: " + initialOffsets.toString)
+    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid"))
 
-    // Zookeper connection properties
+    // Kafka connection properties
     val props = new Properties()
-    props.put("zk.connect", zkQuorum)
-    props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString)
-    props.put("groupid", groupId)
+    kafkaParams.foreach(param => props.put(param._1, param._2))
 
     // Create the connection to the cluster
-    logInfo("Connecting to Zookeper: " + zkQuorum)
+    logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect"))
     val consumerConfig = new ConsumerConfig(props)
-    consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector]
-    logInfo("Connected to " + zkQuorum)
+    consumerConnector = Consumer.create(consumerConfig)
+    logInfo("Connected to " + kafkaParams("zk.connect"))
 
-    // If specified, set the topic offset
-    setOffsets(initialOffsets)
+    // When autooffset.reset is defined, it is our responsibility to try and whack the
+    // consumer group zk node.
+    if (kafkaParams.contains("autooffset.reset")) {
+      tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid"))
+    }
 
     // Create Threads for each Topic/Message Stream we are listening
     val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder())
@@ -97,29 +88,33 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
     topicMessageStreams.values.foreach { streams =>
       streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
     }
-
-  }
-
-  // Overwrites the offets in Zookeper.
-  private def setOffsets(offsets: Map[KafkaPartitionKey, Long]) {
-    offsets.foreach { case(key, offset) =>
-      val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic)
-      val partitionName = key.brokerId + "-" + key.partId
-      updatePersistentPath(consumerConnector.zkClient,
-        topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString)
-    }
   }
 
   // Handles Kafka Messages
   private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
     def run() {
       logInfo("Starting MessageHandler.")
-      stream.takeWhile { msgAndMetadata =>
+      for (msgAndMetadata <- stream) {
         blockGenerator += msgAndMetadata.message
-        // Keep on handling messages
-
-        true
       }
     }
   }
+
+  // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
+  // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
+  //
+  // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
+  // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
+  // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+  private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
+    try {
+      val dir = "/consumers/" + groupId
+      logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
+      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
+      zk.deleteRecursive(dir)
+      zk.close()
+    } catch {
+      case _ => // swallow
+    }
+  }
 }