Skip to content
Snippets Groups Projects
Commit 329ef34c authored by seanm's avatar seanm
Browse files

fixing autooffset.reset behavior when set to 'largest'

parent d61978d0
No related branches found
No related tags found
No related merge requests found
...@@ -75,9 +75,9 @@ class KafkaReceiver(kafkaParams: Map[String, String], ...@@ -75,9 +75,9 @@ class KafkaReceiver(kafkaParams: Map[String, String],
consumerConnector = Consumer.create(consumerConfig) consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + kafkaParams("zk.connect")) logInfo("Connected to " + kafkaParams("zk.connect"))
// When autooffset.reset is 'smallest', it is our responsibility to try and whack the // When autooffset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node. // consumer group zk node.
if (kafkaParams.get("autooffset.reset").exists(_ == "smallest")) { if (kafkaParams.contains("autooffset.reset")) {
tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid")) tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid"))
} }
...@@ -100,9 +100,11 @@ class KafkaReceiver(kafkaParams: Map[String, String], ...@@ -100,9 +100,11 @@ class KafkaReceiver(kafkaParams: Map[String, String],
} }
} }
// Delete consumer group from zookeeper. This effectivly resets the group so we can consume from the beginning again. // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
// Kafka 0.7.2 only honors this param when the group is not in zookeeper.
//
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
// ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest': // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
try { try {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment