diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index e20e2c8f269916e3efdefdfeccb33bb42ab87102..28ac5929df44ad650ab958679f00859523573efa 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -26,8 +26,6 @@ import java.util.concurrent.Executors
 import kafka.consumer._
 import kafka.serializer.Decoder
 import kafka.utils.VerifiableProperties
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient._
 
 import org.apache.spark.Logging
 import org.apache.spark.storage.StorageLevel
@@ -97,12 +95,6 @@ class KafkaReceiver[
     consumerConnector = Consumer.create(consumerConfig)
     logInfo("Connected to " + zkConnect)
 
-    // When auto.offset.reset is defined, it is our responsibility to try and whack the
-    // consumer group zk node.
-    if (kafkaParams.contains("auto.offset.reset")) {
-      tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
-    }
-
     val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
       .newInstance(consumerConfig.props)
       .asInstanceOf[Decoder[K]]
@@ -139,26 +131,4 @@ class KafkaReceiver[
       }
     }
   }
-
-  // It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
-  // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
-  //
-  // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
-  // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
-  // 'smallest'/'largest':
-  // scalastyle:off
-  // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
-  // scalastyle:on
-  private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
-    val dir = "/consumers/" + groupId
-    logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
-    val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
-    try {
-      zk.deleteRecursive(dir)
-    } catch {
-      case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
-    } finally {
-      zk.close()
-    }
-  }
 }
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 48668f763e41e4d04d4bf01c11930ce086af4efe..ec812e1ef3b04b7daf3adc08da2cf343fc51d734 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -17,19 +17,18 @@
 
 package org.apache.spark.streaming.kafka
 
-import scala.reflect.ClassTag
-import scala.collection.JavaConversions._
-
 import java.lang.{Integer => JInt}
 import java.util.{Map => JMap}
 
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
 import kafka.serializer.{Decoder, StringDecoder}
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
-import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
-
+import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
 object KafkaUtils {
   /**