Skip to content
Snippets Groups Projects
Commit c8850a3d authored by jerryshao's avatar jerryshao Committed by Tathagata Das
Browse files

[SPARK-2492][Streaming] kafkaReceiver minor changes to align with Kafka 0.8

Update the KafkaReceiver's behavior when auto.offset.reset is set.

In Kafka 0.8, `auto.offset.reset` is a hint for out-range offset to seek to the beginning or end of the partition. While in the previous code `auto.offset.reset` is a enforcement to seek to the beginning or end immediately, this is different from Kafka 0.8 defined behavior.

Also deleting extesting ZK metadata in Receiver when multiple consumers are launched will introduce issue as mentioned in [SPARK-2383](https://issues.apache.org/jira/browse/SPARK-2383).

So Here we change to offer user to API to explicitly reset offset before create Kafka stream, while in the meantime keep the same behavior as Kafka 0.8 for parameter `auto.offset.reset`.

@tdas, would you please review this PR? Thanks a lot.

Author: jerryshao <saisai.shao@intel.com>

Closes #1420 from jerryshao/kafka-fix and squashes the following commits:

d6ae94d [jerryshao] Address the comment to remove the resetOffset() function
de3a4c8 [jerryshao] Fix compile error
4a1c3f9 [jerryshao] Doc changes
b2c1430 [jerryshao] Move offset reset to a helper function to let user explicitly delete ZK metadata by calling this API
fac8fd6 [jerryshao] Changes to align with Kafka 0.8
parent f8811a56
No related branches found
No related tags found
No related merge requests found
...@@ -26,8 +26,6 @@ import java.util.concurrent.Executors ...@@ -26,8 +26,6 @@ import java.util.concurrent.Executors
import kafka.consumer._ import kafka.consumer._
import kafka.serializer.Decoder import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties import kafka.utils.VerifiableProperties
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient._
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
...@@ -97,12 +95,6 @@ class KafkaReceiver[ ...@@ -97,12 +95,6 @@ class KafkaReceiver[
consumerConnector = Consumer.create(consumerConfig) consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + zkConnect) logInfo("Connected to " + zkConnect)
// When auto.offset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node.
if (kafkaParams.contains("auto.offset.reset")) {
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
}
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props) .newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]] .asInstanceOf[Decoder[K]]
...@@ -139,26 +131,4 @@ class KafkaReceiver[ ...@@ -139,26 +131,4 @@ class KafkaReceiver[
} }
} }
} }
// It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
// is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
//
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
// from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
// 'smallest'/'largest':
// scalastyle:off
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
// scalastyle:on
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
val dir = "/consumers/" + groupId
logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
try {
zk.deleteRecursive(dir)
} catch {
case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
} finally {
zk.close()
}
}
} }
...@@ -17,19 +17,18 @@ ...@@ -17,19 +17,18 @@
package org.apache.spark.streaming.kafka package org.apache.spark.streaming.kafka
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
import java.lang.{Integer => JInt} import java.lang.{Integer => JInt}
import java.util.{Map => JMap} import java.util.{Map => JMap}
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
import kafka.serializer.{Decoder, StringDecoder} import kafka.serializer.{Decoder, StringDecoder}
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream} import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} import org.apache.spark.streaming.dstream.ReceiverInputDStream
object KafkaUtils { object KafkaUtils {
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment