From c62263340edb6976a10f274e716fde6cd2c5bf34 Mon Sep 17 00:00:00 2001 From: cody koeninger <cody@koeninger.org> Date: Thu, 30 Jun 2016 13:16:58 -0700 Subject: [PATCH] [SPARK-16212][STREAMING][KAFKA] code cleanup from review feedback ## What changes were proposed in this pull request? code cleanup in kafka-0-8 to match suggested changes for kafka-0-10 branch ## How was this patch tested? unit tests Author: cody koeninger <cody@koeninger.org> Closes #13908 from koeninger/kafka-0-8-cleanup. --- .../streaming/kafka/DirectKafkaInputDStream.scala | 12 ++++++------ .../org/apache/spark/streaming/kafka/KafkaRDD.scala | 9 ++++++--- .../streaming/kafka/JavaDirectKafkaStreamSuite.java | 5 ----- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index fb58ed7898..c3c799375b 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -34,7 +34,7 @@ import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} import org.apache.spark.streaming.scheduler.rate.RateEstimator /** - * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * A stream of [[KafkaRDD]] where * each given Kafka topic/partition corresponds to an RDD partition. * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number * of messages @@ -43,7 +43,7 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator * and this DStream is not responsible for committing offsets, * so that you can control exactly-once semantics. * For an easy interface to Kafka-managed offsets, - * see {@link org.apache.spark.streaming.kafka.KafkaCluster} + * see [[KafkaCluster]] * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> * configuration parameters</a>. * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), @@ -132,7 +132,7 @@ class DirectKafkaInputDStream[ if (retries <= 0) { throw new SparkException(err) } else { - log.error(err) + logError(err) Thread.sleep(kc.config.refreshLeaderBackoffMs) latestLeaderOffsets(retries - 1) } @@ -194,7 +194,7 @@ class DirectKafkaInputDStream[ data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]] } - override def update(time: Time) { + override def update(time: Time): Unit = { batchForTime.clear() generatedRDDs.foreach { kv => val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray @@ -202,9 +202,9 @@ class DirectKafkaInputDStream[ } } - override def cleanup(time: Time) { } + override def cleanup(time: Time): Unit = { } - override def restore() { + override def restore(): Unit = { // this is assuming that the topics don't change during execution, which is true currently val topics = fromOffsets.keySet val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics)) diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index d4881b140d..2b925774a2 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -129,7 +129,7 @@ class KafkaRDD[ val part = thePart.asInstanceOf[KafkaRDDPartition] assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) { - log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " + + logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + s"skipping ${part.topic} ${part.partition}") Iterator.empty } else { @@ -137,13 +137,16 @@ class KafkaRDD[ } } + /** + * An iterator that fetches messages directly from Kafka for the offsets in partition. + */ private class KafkaRDDIterator( part: KafkaRDDPartition, context: TaskContext) extends NextIterator[R] { context.addTaskCompletionListener{ context => closeIfNeeded() } - log.info(s"Computing topic ${part.topic}, partition ${part.partition} " + + logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + s"offsets ${part.fromOffset} -> ${part.untilOffset}") val kc = new KafkaCluster(kafkaParams) @@ -177,7 +180,7 @@ class KafkaRDD[ val err = resp.errorCode(part.topic, part.partition) if (err == ErrorMapping.LeaderNotAvailableCode || err == ErrorMapping.NotLeaderForPartitionCode) { - log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " + + logError(s"Lost leader for topic ${part.topic} partition ${part.partition}, " + s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms") Thread.sleep(kc.config.refreshLeaderBackoffMs) } diff --git a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index fa6b0dbc8c..71404a7331 100644 --- a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -135,11 +135,6 @@ public class JavaDirectKafkaStreamSuite implements Serializable { @Override public void call(JavaRDD<String> rdd) { result.addAll(rdd.collect()); - for (OffsetRange o : offsetRanges.get()) { - System.out.println( - o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() - ); - } } } ); -- GitLab