diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 85693808d1046594b9c7e8b425e710844f18fead..17a5be3420d41a051e6aef9c70c802ad9f31c373 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -75,9 +75,9 @@ class KafkaReceiver(kafkaParams: Map[String, String], consumerConnector = Consumer.create(consumerConfig) logInfo("Connected to " + kafkaParams("zk.connect")) - // When autooffset.reset is 'smallest', it is our responsibility to try and whack the + // When autooffset.reset is defined, it is our responsibility to try and whack the // consumer group zk node. - if (kafkaParams.get("autooffset.reset").exists(_ == "smallest")) { + if (kafkaParams.contains("autooffset.reset")) { tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid")) } @@ -100,9 +100,11 @@ class KafkaReceiver(kafkaParams: Map[String, String], } } - // Delete consumer group from zookeeper. This effectivly resets the group so we can consume from the beginning again. + // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because + // Kafka 0.7.2 only honors this param when the group is not in zookeeper. + // // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' - // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest': + // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest': // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { try {