From 4786484076865c56c3fc23c49819b9be2933d287 Mon Sep 17 00:00:00 2001 From: cody koeninger <cody@koeninger.org> Date: Fri, 1 May 2015 17:54:56 -0700 Subject: [PATCH] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2 i don't think this should be merged until after 1.3.0 is final Author: cody koeninger <cody@koeninger.org> Author: Helena Edelson <helena.edelson@datastax.com> Closes #4537 from koeninger/wip-2808-kafka-0.8.2-upgrade and squashes the following commits: 803aa2c [cody koeninger] [SPARK-2808][Streaming][Kafka] code cleanup per TD e6dfaf6 [cody koeninger] [SPARK-2808][Streaming][Kafka] pointless whitespace change to trigger jenkins again 1770abc [cody koeninger] [SPARK-2808][Streaming][Kafka] make waitUntilLeaderOffset easier to call, call it from python tests as well d4267e9 [cody koeninger] [SPARK-2808][Streaming][Kafka] fix stderr redirect in python test script 30d991d [cody koeninger] [SPARK-2808][Streaming][Kafka] remove stderr prints since it breaks python 3 syntax 1d896e2 [cody koeninger] [SPARK-2808][Streaming][Kafka] add even even more logging to python test 4c4557f [cody koeninger] [SPARK-2808][Streaming][Kafka] add even more logging to python test 115aeee [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade 2712649 [cody koeninger] [SPARK-2808][Streaming][Kafka] add more logging to python test, see why its timing out in jenkins 2b92d3f [cody koeninger] [SPARK-2808][Streaming][Kafka] wait for leader offsets in the java test as well 3824ce3 [cody koeninger] [SPARK-2808][Streaming][Kafka] naming / comments per tdas 61b3464 [cody koeninger] [SPARK-2808][Streaming][Kafka] delay for second send in boundary condition test af6f3ec [cody koeninger] [SPARK-2808][Streaming][Kafka] delay test until latest leader offset matches expected value 9edab4c [cody koeninger] [SPARK-2808][Streaming][Kafka] more shots in the dark on jenkins failing test c70ee43 [cody koeninger] [SPARK-2808][Streaming][Kafka] add more asserts to test, try to figure out why it fails on jenkins but not locally 1d10751 [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade ed02d2c [cody koeninger] [SPARK-2808][Streaming][Kafka] move default argument for api version to overloaded method, for binary compat 407382e [cody koeninger] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2.1 77de6c2 [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade 6953429 [cody koeninger] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2 2e67c66 [Helena Edelson] #SPARK-2808 Update to Kafka 0.8.2.0 GA from beta. d9dc2bc [Helena Edelson] Merge remote-tracking branch 'upstream/master' into wip-2808-kafka-0.8.2-upgrade e768164 [Helena Edelson] #2808 update kafka to version 0.8.2 --- external/kafka/pom.xml | 2 +- .../spark/streaming/kafka/KafkaCluster.scala | 51 +++++++++++++++---- .../streaming/kafka/KafkaTestUtils.scala | 35 +++++++++++-- .../streaming/kafka/JavaKafkaRDDSuite.java | 3 ++ .../spark/streaming/kafka/KafkaRDDSuite.scala | 32 ++++++++---- python/pyspark/streaming/tests.py | 8 +-- 6 files changed, 104 insertions(+), 27 deletions(-) diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index f695cff410..243ce6eaca 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -44,7 +44,7 @@ <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> - <version>0.8.1.1</version> + <version>0.8.2.1</version> <exclusions> <exclusion> <groupId>com.sun.jmx</groupId> diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index bd767031c1..6cf254a7b6 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -20,9 +20,10 @@ package org.apache.spark.streaming.kafka import scala.util.control.NonFatal import scala.util.Random import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ import java.util.Properties import kafka.api._ -import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition} import kafka.consumer.{ConsumerConfig, SimpleConsumer} import org.apache.spark.SparkException @@ -220,12 +221,22 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI // scalastyle:on + // this 0 here indicates api version, in this case the original ZK backed api. + private def defaultConsumerApiVersion: Short = 0 + /** Requires Kafka >= 0.8.1.1 */ def getConsumerOffsets( groupId: String, topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, Long]] = + getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion) + + def getConsumerOffsets( + groupId: String, + topicAndPartitions: Set[TopicAndPartition], + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, Long]] = { - getConsumerOffsetMetadata(groupId, topicAndPartitions).right.map { r => + getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r => r.map { kv => kv._1 -> kv._2.offset } @@ -236,9 +247,16 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def getConsumerOffsetMetadata( groupId: String, topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = + getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion) + + def getConsumerOffsetMetadata( + groupId: String, + topicAndPartitions: Set[TopicAndPartition], + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = { var result = Map[TopicAndPartition, OffsetMetadataAndError]() - val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq) + val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion) val errs = new Err withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => val resp = consumer.fetchOffsets(req) @@ -266,24 +284,39 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def setConsumerOffsets( groupId: String, offsets: Map[TopicAndPartition, Long] + ): Either[Err, Map[TopicAndPartition, Short]] = + setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion) + + def setConsumerOffsets( + groupId: String, + offsets: Map[TopicAndPartition, Long], + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, Short]] = { - setConsumerOffsetMetadata(groupId, offsets.map { kv => - kv._1 -> OffsetMetadataAndError(kv._2) - }) + val meta = offsets.map { kv => + kv._1 -> OffsetAndMetadata(kv._2) + } + setConsumerOffsetMetadata(groupId, meta, consumerApiVersion) } /** Requires Kafka >= 0.8.1.1 */ def setConsumerOffsetMetadata( groupId: String, - metadata: Map[TopicAndPartition, OffsetMetadataAndError] + metadata: Map[TopicAndPartition, OffsetAndMetadata] + ): Either[Err, Map[TopicAndPartition, Short]] = + setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion) + + def setConsumerOffsetMetadata( + groupId: String, + metadata: Map[TopicAndPartition, OffsetAndMetadata], + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, Short]] = { var result = Map[TopicAndPartition, Short]() - val req = OffsetCommitRequest(groupId, metadata) + val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion) val errs = new Err val topicAndPartitions = metadata.keySet withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => val resp = consumer.commitOffsets(req) - val respMap = resp.requestInfo + val respMap = resp.commitStatus val needed = topicAndPartitions.diff(result.keySet) needed.foreach { tp: TopicAndPartition => respMap.get(tp).foreach { err: Short => diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 13e9475065..6dc4e9517d 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -29,10 +29,12 @@ import scala.language.postfixOps import scala.util.control.NonFatal import kafka.admin.AdminUtils +import kafka.api.Request +import kafka.common.TopicAndPartition import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.ZKStringSerializer +import kafka.utils.{ZKStringSerializer, ZkUtils} import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.I0Itec.zkclient.ZkClient @@ -227,12 +229,35 @@ private class KafkaTestUtils extends Logging { tryAgain(1) } - private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { + /** Wait until the leader offset for the given topic/partition equals the specified offset */ + def waitUntilLeaderOffset( + topic: String, + partition: Int, + offset: Long): Unit = { eventually(Time(10000), Time(100)) { + val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress)) + val tp = TopicAndPartition(topic, partition) + val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset assert( - server.apis.metadataCache.containsTopicAndPartition(topic, partition), - s"Partition [$topic, $partition] metadata not propagated after timeout" - ) + llo == offset, + s"$topic $partition $offset not reached after timeout") + } + } + + private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { + def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match { + case Some(partitionState) => + val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr + + ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined && + Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && + leaderAndInSyncReplicas.isr.size >= 1 + + case _ => + false + } + eventually(Time(10000), Time(100)) { + assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout") } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index a9dc6e5061..5cf3796353 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -72,6 +72,9 @@ public class JavaKafkaRDDSuite implements Serializable { HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); + kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length); + kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length); + OffsetRange[] offsetRanges = { OffsetRange.create(topic1, 0, 0, 1), OffsetRange.create(topic2, 0, 0, 1) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 7d26ce5087..39c3fb448f 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -53,14 +53,15 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { } test("basic usage") { - val topic = "topicbasic" + val topic = s"topicbasic-${Random.nextInt}" kafkaTestUtils.createTopic(topic) val messages = Set("the", "quick", "brown", "fox") kafkaTestUtils.sendMessages(topic, messages.toArray) - val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "group.id" -> s"test-consumer-${Random.nextInt(10000)}") + "group.id" -> s"test-consumer-${Random.nextInt}") + + kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size) val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) @@ -73,27 +74,38 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { test("iterator boundary conditions") { // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd - val topic = "topic1" + val topic = s"topicboundary-${Random.nextInt}" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) kafkaTestUtils.createTopic(topic) val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "group.id" -> s"test-consumer-${Random.nextInt(10000)}") + "group.id" -> s"test-consumer-${Random.nextInt}") val kc = new KafkaCluster(kafkaParams) // this is the "lots of messages" case kafkaTestUtils.sendMessages(topic, sent) + val sentCount = sent.values.sum + kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount) + // rdd defined from leaders after sending messages, should get the number sent val rdd = getRdd(kc, Set(topic)) assert(rdd.isDefined) - assert(rdd.get.count === sent.values.sum, "didn't get all sent messages") - val ranges = rdd.get.asInstanceOf[HasOffsetRanges] - .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap + val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges + val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum - kc.setConsumerOffsets(kafkaParams("group.id"), ranges) + assert(rangeCount === sentCount, "offset range didn't include all sent messages") + assert(rdd.get.count === sentCount, "didn't get all sent messages") + + val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap + + // make sure consumer offsets are committed before the next getRdd call + kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold( + err => throw new Exception(err.mkString("\n")), + _ => () + ) // this is the "0 messages" case val rdd2 = getRdd(kc, Set(topic)) @@ -101,6 +113,8 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { val sentOnlyOne = Map("d" -> 1) kafkaTestUtils.sendMessages(topic, sentOnlyOne) + kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1) + assert(rdd2.isDefined) assert(rdd2.get.count === 0, "got messages when there shouldn't be any") diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 7c06c20345..33ea8c9293 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -606,7 +606,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): result = {} for i in rdd.map(lambda x: x[1]).collect(): result[i] = result.get(i, 0) + 1 - self.assertEqual(sendData, result) def test_kafka_stream(self): @@ -616,6 +615,7 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(), "test-streaming-consumer", {topic: 1}, @@ -631,6 +631,7 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams) self._validateStreamResult(sendData, stream) @@ -645,6 +646,7 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets) self._validateStreamResult(sendData, stream) @@ -659,7 +661,7 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges) self._validateRddResult(sendData, rdd) @@ -675,7 +677,7 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders) self._validateRddResult(sendData, rdd) -- GitLab